C++ 服务端进阶(三)—— Reactor + 协程:现代异步模型(附完整项目结构与代码)

张开发
2026/4/16 14:11:21 15 分钟阅读

分享文章

C++ 服务端进阶(三)—— Reactor + 协程:现代异步模型(附完整项目结构与代码)
引言先校准一下预期这一篇如果真的要做“C20 原生协程 epoll Reactor”工业级会非常复杂因为 C 原生协程只给你co_awaitco_returnsuspend / resume 机制它不给你 runtime也就是说你得自己把epoll、事件注册、协程恢复、任务生命周期 全部串起来。所以这一篇做的是最小可理解、可运行、能体现本质的版本目标不是上来就对标 Boost.Asio / libuv而是真正打通epoll事件↓Reactor调度↓Coroutine挂起 / 恢复正文一、为什么要从 Reactor 再走到协程在前两篇里我们已经完成了第一篇Connection抽象把连接对象化第二篇多 Reactor 线程模型把架构做出来这时候你其实已经能写出一个“像样的服务端”了。但是还有一个经典问题代码写起来还是不够优雅比如在 Reactor 模型里常见写法会变成onRead() ↓ 解析状态 ↓ 决定下次等读还是等写 ↓ 回调继续一旦逻辑稍微复杂一点就会出现状态变量越来越多读写流程被拆碎回调 / handler 里到处是分支同步思维变成“事件状态机思维”代码不直观所以才需要协程。一旦逻辑稍微复杂一点就会出现状态变量越来越多读写流程被拆碎回调 / handler 里到处是分支同步思维变成“事件状态机思维”代码不直观所以才需要协程。二、协程到底解决什么问题协程解决的不是有没有事件这个是epoll干的。协程解决的是代码执行到一半怎么暂停事件到了之后怎么从原地继续执行换句话说Reactor 负责什么时候可以读什么时候可以写协程负责现在先停一下等可读/可写了再继续往下跑所以这三层关系一定要记住epoll谁 ready 了Reactor把 ready 事件分发出去协程让处理逻辑可以挂起再恢复三、这一篇要做什么这一篇我们做一个最小可运行版本目标不是工业级而是彻底打通模型。我们要实现1单线程 Reactorepoll_wait()统一监听事件2C20 协程co_await reactor.readable(fd)co_await reactor.writable(fd)3最小 echo serveraccept 新连接协程里读数据协程里写回数据连接关闭时回收也就是说最终代码形态会像这样co_await reactor.readable(fd); ssize_t n read(fd, buffer, sizeof(buffer)); co_await reactor.writable(fd); write(fd, buffer, n);这就是同步写法 异步执行四、最终项目结构这一篇我建议拆成下面这些文件. ├── Task.h ├── Reactor.h ├── Reactor.cpp ├── SocketUtil.h ├── SocketUtil.cpp ├── Server.h ├── Server.cpp └── main.cpp各文件职责Task.h定义最小协程返回类型DetachedTaskReactor.h / Reactor.cpp定义 Reactor 和 awaiterreadable(fd)writable(fd)waitReadable()waitWritable()loop()SocketUtil.h / SocketUtil.cpp负责setNonBlockingcreateListenFdServer.h / Server.cpp负责acceptLoophandleConnectionmain.cpp负责初始化和启动五、代码总览之前先讲清最核心的设计1为什么要有DetachedTaskC20 协程本身只是语言原语它不会自动帮你调度。我们这里实现一个最小的协程返回类型DetachedTask含义就是协程创建后立即开始执行执行到co_await时挂起Reactor 事件到来后恢复执行结束后自动销毁它非常适合这种顶层连接处理协程2为什么 Reactor 里要保存 coroutine handle因为当你写co_await reactor.readable(fd);本质就是当前协程先挂起Reactor 把这个协程句柄记住等fd可读时再把这个协程恢复所以 Reactor 这一层不仅要管 epoll还要管哪个 fd 对应哪个挂起的协程3为什么这是 Reactor 协程而不是纯协程因为协程自己不会知道fd 什么时候 ready这件事只有内核 epoll 知道。所以必须是协程挂起↓Reactor 把 handle 挂到 fd 上↓epoll_wait 返回 ready fd↓Reactor 恢复对应协程这就是现代异步系统的核心链路。六、完整代码1Task.h#ifndef TASK_H #define TASK_H #include coroutine #include exception #include utility struct DetachedTask { struct promise_type { DetachedTask get_return_object() noexcept { return {}; } std::suspend_never initial_suspend() noexcept { return {}; } struct FinalAwaiter { bool await_ready() noexcept { return false; } template typename Promise void await_suspend(std::coroutine_handlePromise h) noexcept { h.destroy(); } void await_resume() noexcept {} }; FinalAwaiter final_suspend() noexcept { return {}; } void return_void() noexcept {} void unhandled_exception() { std::terminate(); } }; }; #endif2Reactor.h#ifndef REACTOR_H #define REACTOR_H #include coroutine #include cstdint #include unordered_map class Reactor { public: Reactor(); ~Reactor(); Reactor(const Reactor) delete; Reactor operator(const Reactor) delete; void waitReadable(int fd, std::coroutine_handle handle); void waitWritable(int fd, std::coroutine_handle handle); void removeFd(int fd); void loop(); class ReadableAwaiter { public: ReadableAwaiter(Reactor reactor, int fd) : reactor_(reactor), fd_(fd) {} bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) { reactor_.waitReadable(fd_, handle); } void await_resume() const noexcept {} private: Reactor reactor_; int fd_; }; class WritableAwaiter { public: WritableAwaiter(Reactor reactor, int fd) : reactor_(reactor), fd_(fd) {} bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) { reactor_.waitWritable(fd_, handle); } void await_resume() const noexcept {} private: Reactor reactor_; int fd_; }; ReadableAwaiter readable(int fd) { return ReadableAwaiter(*this, fd); } WritableAwaiter writable(int fd) { return WritableAwaiter(*this, fd); } private: struct Entry { std::coroutine_handle readHandle{}; std::coroutine_handle writeHandle{}; uint32_t interests{0}; bool registered{false}; }; void updateInterest(int fd); private: int epollFd_; std::unordered_mapint, Entry entries_; }; #endif3Reactor.cpp#include Reactor.h #include sys/epoll.h #include unistd.h #include iostream #include vector #include cstdlib Reactor::Reactor() { epollFd_ epoll_create1(0); if (epollFd_ -1) { std::cerr epoll_create1 failed std::endl; std::exit(1); } } Reactor::~Reactor() { if (epollFd_ 0) { close(epollFd_); epollFd_ -1; } } void Reactor::waitReadable(int fd, std::coroutine_handle handle) { Entry entry entries_[fd]; entry.readHandle handle; entry.interests | EPOLLIN; updateInterest(fd); } void Reactor::waitWritable(int fd, std::coroutine_handle handle) { Entry entry entries_[fd]; entry.writeHandle handle; entry.interests | EPOLLOUT; updateInterest(fd); } void Reactor::removeFd(int fd) { auto it entries_.find(fd); if (it entries_.end()) { return; } if (it-second.registered) { epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr); } entries_.erase(it); } void Reactor::updateInterest(int fd) { auto it entries_.find(fd); if (it entries_.end()) { return; } Entry entry it-second; if (entry.interests 0) { if (entry.registered) { epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr); entry.registered false; } return; } epoll_event ev{}; ev.events entry.interests | EPOLLERR | EPOLLHUP | EPOLLRDHUP; ev.data.fd fd; if (!entry.registered) { if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, ev) 0) { entry.registered true; } } else { epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, ev); } } void Reactor::loop() { constexpr int MAX_EVENTS 64; std::vectorepoll_event events(MAX_EVENTS); while (true) { int n epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1); if (n -1) { continue; } for (int i 0; i n; i) { int fd events[i].data.fd; uint32_t ev events[i].events; auto it entries_.find(fd); if (it entries_.end()) { continue; } Entry entry it-second; std::coroutine_handle readHandle{}; std::coroutine_handle writeHandle{}; if (ev (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { readHandle entry.readHandle; entry.readHandle {}; entry.interests ~EPOLLIN; } if (ev (EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { writeHandle entry.writeHandle; entry.writeHandle {}; entry.interests ~EPOLLOUT; } updateInterest(fd); if (readHandle) { readHandle.resume(); } if (writeHandle (!readHandle || writeHandle.address() ! readHandle.address())) { writeHandle.resume(); } } } }4SocketUtil.h#ifndef SOCKET_UTIL_H #define SOCKET_UTIL_H int setNonBlocking(int fd); int createListenFd(int port); #endif5SocketUtil.cpp#include SocketUtil.h #include arpa/inet.h #include fcntl.h #include sys/socket.h #include unistd.h int setNonBlocking(int fd) { int flags fcntl(fd, F_GETFL, 0); if (flags -1) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } int createListenFd(int port) { int serverFd socket(AF_INET, SOCK_STREAM, 0); if (serverFd -1) { return -1; } int opt 1; setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt)); if (setNonBlocking(serverFd) -1) { close(serverFd); return -1; } sockaddr_in addr{}; addr.sin_family AF_INET; addr.sin_port htons(port); addr.sin_addr.s_addr INADDR_ANY; if (bind(serverFd, reinterpret_castsockaddr*(addr), sizeof(addr)) -1) { close(serverFd); return -1; } if (listen(serverFd, 128) -1) { close(serverFd); return -1; } return serverFd; }6Server.h#ifndef SERVER_H #define SERVER_H #include Task.h class Reactor; DetachedTask acceptLoop(int listenFd, Reactor reactor); DetachedTask handleConnection(int clientFd, Reactor reactor); #endif7Server.cpp#include Server.h #include Reactor.h #include SocketUtil.h #include sys/socket.h #include unistd.h #include errno.h #include iostream #include string DetachedTask handleConnection(int clientFd, Reactor reactor) { std::string writeBuffer; char buffer[1024]; while (true) { co_await reactor.readable(clientFd); while (true) { ssize_t n ::read(clientFd, buffer, sizeof(buffer)); if (n 0) { std::string msg(buffer, n); std::cout [recv] fd clientFd msg msg std::endl; // demo: echo writeBuffer.append(buffer, n); } else if (n 0) { std::cout [close] peer closed fd clientFd std::endl; reactor.removeFd(clientFd); ::close(clientFd); co_return; } else { if (errno EAGAIN || errno EWOULDBLOCK) { break; } std::cerr [error] read failed fd clientFd std::endl; reactor.removeFd(clientFd); ::close(clientFd); co_return; } } while (!writeBuffer.empty()) { co_await reactor.writable(clientFd); ssize_t wn ::write(clientFd, writeBuffer.data(), writeBuffer.size()); if (wn 0) { writeBuffer.erase(0, wn); } else { if (errno EAGAIN || errno EWOULDBLOCK) { continue; } std::cerr [error] write failed fd clientFd std::endl; reactor.removeFd(clientFd); ::close(clientFd); co_return; } } } } DetachedTask acceptLoop(int listenFd, Reactor reactor) { while (true) { co_await reactor.readable(listenFd); while (true) { int clientFd ::accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK); if (clientFd 0) { std::cout [accept] new client fd clientFd std::endl; handleConnection(clientFd, reactor); } else { if (errno EAGAIN || errno EWOULDBLOCK) { break; } if (errno EINTR) { continue; } std::cerr [error] accept4 failed std::endl; break; } } } }8main.cpp#include Reactor.h #include SocketUtil.h #include Server.h #include iostream #include unistd.h int main() { int listenFd createListenFd(8080); if (listenFd -1) { std::cerr createListenFd failed std::endl; return 1; } Reactor reactor; // 启动 accept 协程 acceptLoop(listenFd, reactor); std::cout server running on :8080 std::endl; reactor.loop(); close(listenFd); return 0; }七、怎么编译运行因为用了 C20 协程所以编译命令建议这样g -stdc20 -O2 Reactor.cpp SocketUtil.cpp Server.cpp main.cpp -o server运行./server测试nc 127.0.0.1 8080输入hello你会看到服务端输出日志并且客户端收到回显。八、这套代码的核心控制流一定要看懂这套代码真正的运行方式是main() ↓ acceptLoop(listenFd, reactor) // 启动 accept 协程 ↓ 协程第一次执行到 co_await reactor.readable(listenFd) ↓ 挂起 ↓ reactor.loop() ↓ epoll_wait() ↓ listenFd ready ↓ Reactor 恢复 acceptLoop 协程 ↓ accept 新连接 ↓ 为每个 clientFd 启动 handleConnection 协程 ↓ handleConnection 执行到 co_await reactor.readable(clientFd) ↓ 挂起 ↓ epoll_wait() ↓ clientFd ready ↓ Reactor 恢复 handleConnection 协程 ↓ read / write / close这就是事件驱动 协程恢复九、这一版到底比纯 Reactor handler 好在哪如果你不用协程逻辑通常会散在onRead()onWrite()状态机切换回调链而现在你可以写成co_await reactor.readable(fd); read(...); co_await reactor.writable(fd); write(...);也就是说表面上像同步代码底层实际上还是异步调度这就是协程最值钱的地方。十、这一篇你最该记住的 5 句话epoll 负责“谁 ready 了”Reactor 负责“恢复哪个协程”协程负责“从挂起点继续执行”协程不是替代 Reactor而是运行在 Reactor 之上Reactor 协程 现代异步系统的核心组合十一、这一版还不是工业级差在哪必须实话实说这是一版最小打通模型的代码还不是工业级。还缺Connection对象化的协程版本写缓冲区更细致的管理半包 / 粘包处理timer / timeout多 Reactor 协程任务派发到业务线程池智能指针管理协程相关对象生命周期更安全的 close / remove 时机控制但是这一版已经把最关键的一层打通了epoll / Reactor / 协程 到底怎么协同工作这比单纯会背概念强太多了。十二、本篇一句话总结前面的 Reactor 模型解决的是“事件来了怎么分发”而这一篇进一步解决的是“分发之后代码怎么优雅地继续执行”。也就是说epoll 负责发现事件Reactor 负责调度事件协程负责优雅执行事件。从这一篇开始我们已经不再只是写 socket 代码而是在逐步搭建一个现代异步服务端运行时。十三、完整项目结构最终版. ├── Task.h ├── Reactor.h ├── Reactor.cpp ├── SocketUtil.h ├── SocketUtil.cpp ├── Server.h ├── Server.cpp └── main.cpp十四、下一步最顺的方向核心升级到这一篇为止我们已经分别完成了三件事情第一篇Connection结构抽象第二篇多 Reactor并发模型第三篇单 Reactor 协程执行模型也就是说到目前为止这三篇分别解决了结构 ✔并发 ✔执行 ✔但这里要特别强调一点第三篇的协程模型仍然是单 Reactor 版本。它的价值在于先把最关键的执行流问题讲清楚协程怎么挂起Reactor 怎么恢复协程epoll 怎么驱动协程继续执行这一篇解决的是“Reactor 协程”这套执行模型到底是怎么工作的而不是最终的高并发架构形态。下一步主线核心把并发模型和执行模型融合起来真正的高并发服务端不会停在单 Reactor 协程因为这仍然只能使用一个线程、一个事件循环。所以本系列真正的主线下一步就是《C 服务端进阶四—— 多 Reactor 协程真正的高并发模型》第四篇要解决什么第四篇要做的事情非常明确把第二篇和第三篇真正合起来多 Reactor第二篇协程第三篇也就是把第二篇解决的“多线程并发架构”第三篇解决的“协程执行模型”融合成一套完整运行方式。最终架构形态第四篇的目标架构会变成Main Reactoraccept ↓ Sub Reactor线程1 协程 Sub Reactor线程2 协程 Sub Reactor线程3 协程 ...也就是说主 Reactor 负责 accept子 Reactor 负责事件循环每个子 Reactor 内部通过协程处理连接逻辑这才是真正接近现代高并发服务端的核心模型

更多文章