既然 Tcp 是面向字节流的所以我们可以使用 write 和 read 系统调用来进行读写。那么 write 和 read 对在使用 Tcp 方式通信时会有 bug问题write 是吧数据写给了网络吗答不是Tcp 协议在 OS 内核中这个协议里面有两个缓冲区发送缓冲区和接收缓冲区write 是往发送缓冲区里面写数据结论write 本质不是把数据往网络里面发write 的本质是一个拷贝函数所以 sockfd 对应两个缓冲区那么 write 之后什么时候发送这些数据发送了多少出错了怎么办完全由 TCP 自主控制所以 TCP 协议称为传输控制协议结论TCP 网络发送数据。本质是把数据从发送缓冲区通过网络拷贝到对端的接收缓冲区TCP 支持全双工既能读又能写本质是有一对接收和发送缓冲区结论在用户往发送缓冲区里面写数据然后 OS 往发送缓冲区里面拿数据发送到网络这个过程就是一个典型的生产者和消费者模型是用户和OS之间进行生产和消费既然TCP是面向字节流的所以一个缓冲区里面有10个字节的数据但是对端的接收缓冲区最多只能收5个字节所以往对端缓冲区里面发送了5个字节的数据最终导致数据不一致问题数据粘报问题。所以数据有没有读完整tcp 并不关心而是由用户自己控制维护如果我们要使用结构体来发送信息例如strucut request { int x; int yl; int z; };struct request req {1,2,3}我们要把 req 发给对端的接收缓冲区行吗答不行三个问题内存对齐、大小端问题、跨语言问题显然在应用层不能这么干但是 OS 内部是这么干的因为协议本身就是结构体为什么能这么干答OS 内核会自动考虑内存对齐问题、大小端问题OS 都是用 C语言写的。解决方案想办法把结构体里面的数据弄成字符串以空格为分隔符例如struct req { 1, 2 , 3}弄成字符串”1 2 3“把这个字符串发送到对端然后对端再把这个字符串的内容填写到结构体里面这个过程就是序列和反序列化为什么要进行序列化答1.方便网络发送2.方便协议用户定义的结构体的可扩展性和维护性为什么要进行反序列化答方便上层处理问题为什么 OSI 的七层模型落地的时候变成了四或五层答OSI 的七层模型的应用层、表示层、会话层合并成一层应用层因为这三层由用户来写根据不同的场景定义问题为什么把这三层放到内核里面答应用场景太多、需求不同解决数据粘报问题1.定义用户层协议2.对数据进行解析和分析例如代码示例对一个把数据放到结构体strucut request { int x; int yl; int oper; };进行加减法运算struct request wq { 1,2,}想办法弄成字符串1 2 不过要防止数据报粘报问题我们还要把这个字符串的长度放到字符串的前面51 2 这样我们就能解析这个数据报时就能看看是否是完整的数据报#pragma once #include iostream #include string #include jsoncpp/json/json.h //应用层协议 class Request { public: Request() { _x _y _oper 0; } //序列化对象 bool Serialize(std::string* out) { Json::Value root; root[x] _x; root[y] _y; root[oper] _oper; Json::StyledWriter writer; *out writer.write(root); if(out-empty()) return false; return true; } //反序列化对象 bool Deserialize(std::string in) { Json::Value root; Json::Reader reader; bool ret reader.parse(in,root); if(!ret) return false; _x root[x].asInt(); _y root[y].asInt(); _oper root[oper].asInt(); return true; } ~Request() { } int X() { return _x; } int Y() { return _y; } char Oper() { return _oper; } // private: public: //约定 x oper y int _x; int _y; char _oper; }; class Response { public: Response():_result(0),_code(0) { } //序列化对象 bool Serialize(std::string* out) { Json::Value root; root[result] _result; root[code] _code; Json::StyledWriter writer; *out writer.write(root); if(out-empty()) return false; return true; } //反序列化对象 bool Deserialize(std::string in) { Json::Value root; Json::Reader reader; bool ret reader.parse(in,root); if(!ret) return false; _result root[result].asInt(); _code root[code].asInt(); return true; } void SetResult(int r) { _result r; } void SetCode(int c) { _code c; } void Print() { std::cout _result [ _code ] std::endl; } ~Response() { } private: int _result; int _code;//可信度 }; //解决数据粘报问题 //有效载荷长度数据的长度int \r\n //jsonstr\r\n static const std::string sep \r\n; class Protocol { public: static std::string Package(const std::string jsonstr) { // jsonstr - len\r\njsonstr\r\n if(jsonstr.empty()) return std::string(); std::string json_length std::to_string(jsonstr.size()); return json_length sep jsonstr sep; } static bool DigitSafeCheck(const std::string str) { for(int i 0; i str.size();i) { if(!(str[i] 0 str[i] 9)) { return false; } } return true; } //len\r\njsonstr\r\n //len\r\njsonstr\r\nlen\r\njsonstr\r\n //len //str:从网络读取字符串输入 //package输出参数如果有完整的 json 报文就返回 static int Unpack(std::string origin_str,std::string* package) { if(!package) return 0; auto pos origin_str.find(sep); if(pos std::string::npos) { return false; } //收到一个报文的长度 std::string len_str origin_str.substr(0,pos); if(!DigitSafeCheck(len_str)) return -1; int digit_len std::stoi(len_str); //根据这个 digit_len 得到一个报文的长度 int target_len len_str.size() digit_len 2 * sep.size(); if(origin_str.size() target_len) { return 0; } //至少一个完整的报文 *package origin_str.substr(pos sep.size(),digit_len); origin_str.erase(0,target_len); return package-size(); } };完整代码Protocol.hpp#pragma once #include iostream #include string #include jsoncpp/json/json.h //应用层协议 class Request { public: Request() { _x _y _oper 0; } //序列化对象 bool Serialize(std::string* out) { Json::Value root; root[x] _x; root[y] _y; root[oper] _oper; Json::StyledWriter writer; *out writer.write(root); if(out-empty()) return false; return true; } //反序列化对象 bool Deserialize(std::string in) { Json::Value root; Json::Reader reader; bool ret reader.parse(in,root); if(!ret) return false; _x root[x].asInt(); _y root[y].asInt(); _oper root[oper].asInt(); return true; } ~Request() { } int X() { return _x; } int Y() { return _y; } char Oper() { return _oper; } // private: public: //约定 x oper y int _x; int _y; char _oper; }; class Response { public: Response():_result(0),_code(0) { } //序列化对象 bool Serialize(std::string* out) { Json::Value root; root[result] _result; root[code] _code; Json::StyledWriter writer; *out writer.write(root); if(out-empty()) return false; return true; } //反序列化对象 bool Deserialize(std::string in) { Json::Value root; Json::Reader reader; bool ret reader.parse(in,root); if(!ret) return false; _result root[result].asInt(); _code root[code].asInt(); return true; } void SetResult(int r) { _result r; } void SetCode(int c) { _code c; } void Print() { std::cout _result [ _code ] std::endl; } ~Response() { } private: int _result; int _code;//可信度 }; //解决数据粘报问题 //有效载荷长度数据的长度int \r\n //jsonstr\r\n static const std::string sep \r\n; class Protocol { public: static std::string Package(const std::string jsonstr) { // jsonstr - len\r\njsonstr\r\n if(jsonstr.empty()) return std::string(); std::string json_length std::to_string(jsonstr.size()); return json_length sep jsonstr sep; } static bool DigitSafeCheck(const std::string str) { for(int i 0; i str.size();i) { if(!(str[i] 0 str[i] 9)) { return false; } } return true; } //len\r\njsonstr\r\n //len\r\njsonstr\r\nlen\r\njsonstr\r\n //len //str:从网络读取字符串输入 //package输出参数如果有完整的 json 报文就返回 static int Unpack(std::string origin_str,std::string* package) { if(!package) return 0; auto pos origin_str.find(sep); if(pos std::string::npos) { return false; } //收到一个报文的长度 std::string len_str origin_str.substr(0,pos); if(!DigitSafeCheck(len_str)) return -1; int digit_len std::stoi(len_str); //根据这个 digit_len 得到一个报文的长度 int target_len len_str.size() digit_len 2 * sep.size(); if(origin_str.size() target_len) { return 0; } //至少一个完整的报文 *package origin_str.substr(pos sep.size(),digit_len); origin_str.erase(0,target_len); return package-size(); } };TcpServer.hpp#pragma once #include Socket.hpp #include InetAddr.hpp #include memory #include functional #include iostream #include signal.h #include unistd.h using callback_t std::functionstd::string(std::string); class TcpServer { public: TcpServer(int port,callback_t cb) :_port(port) ,_cb(cb) ,_listensocket(std::make_uniqueTcpSock()) { _listensocket-BuildListenSocketMethod(_port); } void HandlerRequeset(std::shared_ptrSocket sockfd,InetAddr addr) { //长服务 std::string inbuffer;//字节流式的队列 while(true) { ssize_t n sockfd-Recv(inbuffer); if(n 0) { LOG(LogLevel::DEBUG) addr.ToString() # inbuffer; std::string send_str _cb(inbuffer); if(send_str.empty()) continue; sockfd-Send(send_str); } else if(n 0) { LOG(LogLevel::DEBUG) addr.ToString() quit,me too; break; } else { LOG(LogLevel::DEBUG) addr.ToString() read error,me too; break; } } sockfd-Close(); } void Run() { signal(SIGCHLD,SIG_IGN); while (true) { InetAddr addr; auto sockfd _listensocket-Accept(addr); if(sockfd nullptr) continue; LOG(LogLevel::DEBUG) 获取一个新连接 addr.ToString() , sockfd: sockfd-SockFd(); if(fork() 0) { _listensocket-Close(); HandlerRequeset(sockfd,addr); exit(0); } sockfd-Close(); } } ~TcpServer() {} private: int _port; std::unique_ptrSocket _listensocket; callback_t _cb; };Socket.hpp#pragma once #include iostream #include string #include unistd.h #include memory #include cstdlib #include sys/types.h #include sys/socket.h #include netinet/in.h #include arpa/inet.h #include InetAddr.hpp #include Logger.hpp using namespace NS_LOG_MODULE; enum { OK, CREATE_ERR, BIND_ERR, LISTEN_ERR }; static int gbacklog 16; class Socket { public: virtual ~Socket(){} virtual void CreateSocketOrDie() 0; virtual void BindSocketOrDie(int port) 0; virtual void ListenSocketOrDie(int backlog) 0; virtual std::shared_ptrSocket Accept(InetAddr* clientaddr) 0; virtual void Close() 0; virtual int SockFd() 0; virtual ssize_t Recv(std::string* out) 0; virtual ssize_t Send(const std::string in) 0; virtual bool Connect(InetAddr peer) 0; public: void BuildListenSocketMethod(int port) { CreateSocketOrDie(); BindSocketOrDie(port); ListenSocketOrDie(gbacklog); } void BuildClientSocketMethod() { CreateSocketOrDie(); } // void BuildUdpSocketMethod() // { // CreateSocketOrDie(); // BindSocketOrDie(); // } }; static const int gsockfd -1; class TcpSock:public Socket { public: TcpSock():_sockfd(gsockfd) { } TcpSock(int sockfd):_sockfd(sockfd) { } void CreateSocketOrDie() override { _sockfd socket(AF_INET,SOCK_STREAM,0); if(_sockfd 0) { LOG(LogLevel::FATAL) 创建套接字失败; exit(CREATE_ERR); } LOG(LogLevel::INFO) 创建套接字成功; } void BindSocketOrDie(int port) override { InetAddr local(port); if(bind(_sockfd,local.Addr(),local.length()) ! 0) { LOG(LogLevel::FATAL) 绑定套接字失败; exit(BIND_ERR); } LOG(LogLevel::INFO) 绑定套接字成功; } void ListenSocketOrDie(int backlog) override { if(listen(_sockfd,backlog) ! 0) { LOG(LogLevel::FATAL) bind socket error; exit(LISTEN_ERR); } LOG(LogLevel::INFO) Listen 套接字成功; } std::shared_ptrSocket Accept(InetAddr* clientaddr) override { struct sockaddr_in peer; socklen_t len sizeof(peer); int fd accept(_sockfd,(struct sockaddr*)peer,len); if(fd 0) { LOG(LogLevel::WARNING) accept socket error; return nullptr; } LOG(LogLevel::INFO) Accept 套接字成功; clientaddr-Init(peer); return std::make_sharedTcpSock(fd); } int SockFd() override { return _sockfd; } void Close() override { if(_sockfd 0) close(_sockfd); } ssize_t Recv(std::string* out) override { char buffer[1024]; ssize_t n recv(_sockfd,buffer,sizeof(buffer) - 1,0);//读取数据 if(n 0) { buffer[n] 0; *out buffer; } return n; } ssize_t Send(const std::string in) override { //写入缓冲区 return send(_sockfd,in.c_str(),in.size(),0); } bool Connect(InetAddr peer) override { int n connect(_sockfd,peer.Addr(),peer.length()); if(n 0) return true; else return false; } ~TcpSock() { } private: int _sockfd; }; class UdpSocket:public Socket { };Parser.hpp#pragma once #include Protocol.hpp #include iostream #include Logger.hpp #include string #include functional using handler_t std::functionResponse(Request); class Parser { public: Parser(handler_t handler):_handler(handler) { } std::string Parse(std::string inbuffer) { LOG(LogLevel::DEBUG) inbuffer: \r\n inbuffer; std::string send_str; while (true) { std::string jsonstr; // 解析报文 int n Protocol::Unpack(inbuffer, jsonstr); if (n 0) { exit(0); } else if (n 0) break;// inbuffer 里面所有的报文处理完毕 else { LOG(LogLevel::DEBUG) jsonstr: \r\n jsonstr; Request req; // 反序列化 if (!req.Deserialize(jsonstr)) { return std::string(); } Response resp _handler(req); // 对 resp 进行序列化 std::string resp_json; if (!resp.Serialize(resp_json)) { return std::string(); } // 打包 send_str Protocol::Package(resp_json); } } return send_str; } ~Parser() {} private: handler_t _handler; };Mutex.hpp#pragma once #include iostream #include pthread.h class Mutex { public: Mutex() { pthread_mutex_init(_lock,nullptr); } void Lock()//加锁 { pthread_mutex_lock(_lock); } void Unlock()//解锁 { pthread_mutex_unlock(_lock); } pthread_mutex_t* Origin() { return _lock; } ~Mutex() { pthread_mutex_destroy(_lock);//销毁锁 } private: pthread_mutex_t _lock; }; class LockGuard//RAII风格 { public: LockGuard(Mutex lock):_lockref(lock) { _lockref.Lock(); } ~LockGuard() { _lockref.Unlock(); } private: Mutex _lockref; };Main.cc#include TcpServer.hpp #include Protocol.hpp #include Calculator.hpp #include Parser.hpp//解析报文序列化反序列化 #include memory void Usage(std::string proc) { std::cerr Usage : proc localport std::endl; } int main(int argc,char* argv[]) { if (argc ! 2) { Usage(argv[0]); exit(1); } uint16_t serverport std::stoi(argv[1]); ENABLE_CONSOLE_LOG_STRATEGY(); //计算机对象 std::unique_ptrCallculator cal std::make_uniqueCallculator(); //协议解析模块 std::unique_ptrParser parser std::make_uniqueParser([cal](Request req)-Response{ return cal-Exec(req); }); std::unique_ptrTcpServer tsock std::make_uniqueTcpServer(serverport, [parser](std::string inbuffer)-std::string{ return parser-Parse(inbuffer); }); tsock-Run(); return 0; }Logger.hpp#pragma once #include cstdio #include unistd.h #include memory #include sstream #include iostream #include fstream #include ctime #include string #include sys/time.h #include filesystem #include Mutex.hpp namespace NS_LOG_MODULE { enum class LogLevel //日志等级 { INFO,//常规 WARNING,//警告 ERROR,//错误 FATAL,//致命 DEBUG }; std::string LogLevel2Message(LogLevel level) { switch(level) { case LogLevel::INFO: return Info; case LogLevel::WARNING: return Warning; case LogLevel::ERROR: return Error; case LogLevel::FATAL: return Fatal; case LogLevel::DEBUG: return Debug; default: return Unknown; } } //获取时间戳 、日期 时间 std::string GetCurrentTime() { //获取时间戳 struct timeval current_time; int n gettimeofday(current_time,nullptr); (void)n; //获取时间 struct tm struct_time; localtime_r((current_time.tv_sec),struct_time);// r 可重入函数保证线程安全 char timestr[128]; snprintf(timestr,sizeof(timestr),%04d-%02d-%02d %02d:%02d:%02d.%ld, struct_time.tm_year 1900, struct_time.tm_mon 1, struct_time.tm_mday, struct_time.tm_hour, struct_time.tm_min, struct_time.tm_sec, current_time.tv_usec ); return timestr; } //从输出角度看 ———— 刷新策略 //1.显示器打印 //2.文件写入 //日志的生成 //1.构建日志字符串 //2.根据不同的策略进行刷新 //策略模式 策略接口 class LogStrategy { public: virtual ~LogStrategy() default; virtual void SyncLog(const std::string message) 0; }; //控制台日志刷新策略日志在显示器打印 class ConsoleStrategy : public LogStrategy { public: void SyncLog(const std::string message) override { LockGuard lockguard(_mutex); std::cerr message std::endl; } ~ConsoleStrategy() {} private: Mutex _mutex; }; const std::string defaultpath ./log; const std::string defaultfilename log.txt; //文件策略 class FileLogStrategy : public LogStrategy { public: FileLogStrategy(const std::string path defaultpath,const std::string name defaultfilename) :_logpath(path) ,_logfilename(defaultfilename) { LockGuard lockguard(_mutex); if(std::filesystem::exists(_logpath))//判断文件是否存在 { return; } try { std::filesystem::create_directories(_logpath); } catch(const std::filesystem::filesystem_error e) { std::cerr e.what() \n; } } virtual void SyncLog(const std::string message) override { { LockGuard lockguard(_mutex); if (!_logpath.empty() _logpath.back() ! /) { _logpath /; } std::string targetlog _logpath _logfilename;// ./log/log.txt std::ofstream out(targetlog,std::ios::app);//以追加的方式打开文件 if(!out.is_open())//打开文件失败 return; out message \n;//把字符串写入文件 out.close();//关闭文件 } } private: std::string _logpath; std::string _logfilename; Mutex _mutex; }; //日志的生成 //1.构建日志字符串 //2.根据不同的策略进行刷新 //日志类 //1。日志的生成 //2.根据不同的策略进行刷新 class Logger { public: Logger(){ UseConsoleStrategy(); } void UseConsoleStrategy() { _strategy std::make_uniqueConsoleStrategy(); } void UseFileStrategy() { _strategy std::make_uniqueFileLogStrategy(); } // void Debug(const std::string message) // { // if(_strategy ! nullptr) // { // _strategy-SyncLog(message); // } // } //用一个内部类来标识一条完整的日志信息 //LogMessage RAII 风格的方式进行刷新 class LogMessage { public: LogMessage(LogLevel level,std::string filename,int line,Logger logger) :_level(level) ,_curr_time(GetCurrentTime()) ,_pid(getpid()) ,_filename(filename) ,_line(line) ,_logger(logger) { //构建日志信息字符串固定的左半部分 std::stringstream ss; ss [ _curr_time ] [ LogLevel2Message(_level) ] [ _pid ] [ _filename ] [ _line ] - ; _loginfo ss.str(); } templateclass T LogMessage operator (const T info) { std::stringstream ss; ss info; _loginfo ss.str(); return *this;//方便下次 } ~LogMessage() { if(_logger._strategy) { _logger._strategy-SyncLog(_loginfo); } } private: LogLevel _level;//等级 std::string _curr_time;//当前时间 pid_t _pid;//进程的 pid std::string _filename; int _line; std::string _loginfo;//一条完整的日志信息 Logger _logger;// 方便后续进行策略式刷新 }; //返回值以拷贝 LogMessage 的方式来拷贝 LogMessage operator()(LogLevel level,std::string filename,int line) { return LogMessage(level,filename,line,*this); } ~Logger(){} private: std::unique_ptrLogStrategy _strategy; //刷新策略 }; //日志对象全局使用 Logger logger; #define ENABLE_CONSOLE_LOG_STRATEGY() logger.UseConsoleStrategy(); #define ENABLE_FILE_LOG_STRATEGY() logger.UseFileStrategy(); #define LOG(level) logger(level,__FILE__,__LINE__) }InetAddr.hpp#pragma once //描述 client socket 的类 #include sys/types.h #include sys/socket.h #include cstring #include arpa/inet.h #include netinet/in.h #include iostream #include string #define Conv(addr) ((struct sockaddr*)addr) class InetAddr { public: InetAddr(const struct sockaddr_in addr):_addr(addr) { Net2Host(); } InetAddr() {} void Init(const struct sockaddr_in addr) { _addr addr; Net2Host(); } std::string Ip() { return _ip; } uint16_t Port() { return _port; } struct sockaddr* Addr() { return Conv(_addr); } std::string ToString() { return _ip - std::to_string(_port); } bool operator(const InetAddr addr) { return (_ip addr._ip _port addr._port); } InetAddr(uint16_t port,const std::string ip 0.0.0.0) :_ip(ip) ,_port(port) { Host2Net(); } socklen_t length() { return sizeof(_addr); } ~InetAddr(){} private: void Host2Net() { memset(_addr,0,sizeof(_addr)); _addr.sin_family AF_INET; _addr.sin_port htons(_port); // _addr.sin_addr.s_addr inet_addr(_ip.c_str()); // inet_addr 函数不安全存在覆盖数据的风险 //最佳实现 inet_pton(AF_INET,_ip.c_str(),(_addr.sin_addr.s_addr)); } void Net2Host() { _port ntohs(_addr.sin_port);//把网络序列弄成主机序列 // _ip inet_ntoa(_addr.sin_addr);//转成主机序列, inet_ntoa 存在数据不一致问题 //建议使用 char ipbuffer[64]; inet_ntop(AF_INET,(_addr.sin_addr.s_addr),ipbuffer,sizeof(ipbuffer)); _ip ipbuffer; } private: struct sockaddr_in _addr;// 网络风格的地址 // 主机风格的地址 std::string _ip; uint16_t _port; };Client.cc#include iostream #include string #include Socket.hpp #include Protocol.hpp #include InetAddr.hpp #include memory void Usage(std::string proc) { std::cerr Usage : proc localip localport std::endl; } int main(int argc,char* argv[]) { if (argc ! 3) { Usage(argv[0]); exit(1); } std::string serverip argv[1]; uint16_t serverport std::stoi(argv[2]); std::unique_ptrSocket sockptr std::make_uniqueTcpSock(); sockptr-BuildClientSocketMethod(); InetAddr server(serverport,serverip); if(sockptr-Connect(server)) { std::string buffer; while(true) { //1.构建请求 Request req; std::cout Please Enter X: ; std::cin req._x; std::cout Please Enter Y: ; std::cin req._y; std::cout Please Enter oper: ; std::cin req._oper; //2.序列化 std::string jsonstr; req.Serialize(jsonstr); //3.打包 std::string sender Protocol::Package(jsonstr); //4.发送 sockptr-Send(sender); //5.接收 sockptr-Recv(buffer); //6.报文解析 std::string package; int n Protocol::Unpack(buffer,package); if(n 0) { Response resp; //7.反序列 bool r resp.Deserialize(package); if(r) { resp.Print(); } } } } return 0; }Calculator.hpp#pragma once #include iostream #include Protocol.hpp #include string class Callculator { public: Callculator() {} Response Exec(Request req) { Response resp; switch (req.Oper()) { case : resp.SetResult(req.X() req.Y()); break; case -: resp.SetResult(req.X() - req.Y()); break; case *: resp.SetResult(req.X() * req.Y()); break; case /: { if (req.Y() 0) { resp.SetCode(1); // 1 : dev 0 } else { resp.SetResult(req.X() / req.Y()); } break; } default: resp.SetCode(3); // 3 : 非法操作 break; } return resp; } ~Callculator() {} };