实战Linux I/O多路复用:借助epoll,单线程高效管理10,000+并发连接
引言
在应对高并发连接的传统策略中,普遍采取为每个连接配置单独线程或进程的直接方式,管理其I/O操作。此法虽直观易行,但随业务规模扩张,线程资源需求急剧上升。相反,Linux下的I/O多路复用技术,尤其是epoll
,展示了一种高效路径:单一线程即可监控成千上万的文件描述符,极大提升了资源使用效率。
I/O 多路复用的场景有很多,也比较实用。通常用法epoll
线程 + 线程/协程池处理并发场景,这里做一个简单的实例使用,以便后续查阅。
概述
select
与poll
同样能够满足多路复用的需求,在特定场景下各有千秋。不过,当面对需监控大量文件句柄的场景时,epoll
凭借其高效的设计和更高的性能表现,成为更为优选的解决方案。其不仅在资源管理和事件处理上展现出明显优势,而且编程接口的灵活性也更为优雅。本文主要聚焦于epoll
的实践应用,实例学习其高效而精炼的使用方法。
epoll常用接口
epoll
的描述man
手册已经记录比较详细了,这里列举一下常用的接口:
- epoll_create / epoll_create1
- 原型:
int epoll_create(int size)
/int epoll_create1(int flags)
- 功能: 创建一个新的epoll实例,返回一个文件描述符,该描述符代表epoll对象。
- 参数:
- size: 接受一个参数 size,在Linux 2.6.8以后这个参数被忽略,但仍要求传递一个大于0的值;
- flags: 接收一个标志。为0作用与
epoll_create
相同;为EPOLL_CLOEXEC
时,会在execve()
调用后自动关闭epoll
文件描述符,避免子进程继承。
- 返回值
-1
:发生错误,设置errno
;> 0
:epoll文件描述符。
- epoll_ctl
- 原型:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
- 功能: 用于控制已经创建好的epoll实例中的文件描述符事件集合。
- 参数:
- epfd:
epoll_create()
返回的文件描述符。 - op:操作类型,可以是
EPOLL_CTL_ADD
(添加)、EPOLL_CTL_MOD
(修改)、EPOLL_CTL_DEL
(删除)。 - fd:要操作的文件描述符。
- event:一个指向
struct epoll_event
的指针,定义了关注的事件类型(如 EPOLLIN, EPOLLOUT)及其它数据。
- 返回值
-1
:发生错误,设置errno
;0
:成功。
- epoll_wait
- 原型:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
- 功能: 阻塞等待直到epoll实例中的一个或多个文件描述符变为就绪状态(可读、可写或出现错误)。
- 参数:
- epfd:epoll实例的文件描述符。
- events:指向struct epoll_event结构体数组的指针,用于存储就绪事件。
- maxevents:
events
数组的最大容量。 - timeout:等待超时时间,单位为毫秒,-1表示无限等待,0 表示立即返回,正值为等待的最长时间。
- 返回值:
-1
:发生错误,设置errno
;0
:超时;>0
: 准备好的文件描述符数量。
应用场景
在高并发TCP服务场景中,服务端通过部署epoll
+ 线程/协程池
机制,构建高效服务框架。epoll
作为核心监听器,统一管理并快速响应来自不同客户端的连接请求,其事件驱动特性确保了对socket
就绪状态的即时检测。与此同时,这些请求被异步地分发至线程/协程池中,利用任务队列和工作线程(或轻量级协程)并发执行,提升数据处理能力。
类图
EpollEventHandler类图
- EpollEventHandler (Epoll事件调度器类)
该类负责注册并管理监听句柄,实时监控Epoll事件,确保对每个就绪连接的快速响应与处理。 - IEpollEvent (监听接口类)
此类定义了句柄注册与事件处理的标准操作,使EpollEventHandler
能统一管理不同类型的监听对象,实现接口的标准化与句柄处理的灵活性。 - PSocket (可被监听的Socket实现类)
继承自IEpollEvent
的实现类,封装标准的Socket
操作,同时定义针对Epoll
事件的响应逻辑,实现Socket
交互的统一管理和定制化处理。 - PUart (可被监听的Uart实现类)
继承自IEpollEvent
的实现类,封装了标准Uart
操作,同时定义针对Epoll
事件的响应逻辑,实现Uart
交互的统一管理和定制化处理。 - 其他可被监听的实现类
还可以实现其他可被epoll监听的类型类,通过继承IEpollEvent
实现可被EpollEventHandler
统一注册,再通过内部EpollEvent
实现差异化响应处理。
源码实现
编程环境
① 编译环境: Linux环境
② 语言: C++语言
接口定义
- EpollEventHandler
class EpollEventHandler { public: virtual ~EpollEventHandler(); static EpollEventHandler* GetInstance(); void AddPoll(IEpollEvent* p); void DelPoll(IEpollEvent* p); void EpollLoop(bool bRun); private: EpollEventHandler(int size = 0); private: int mHandle; bool mRun; std::map<int, IEpollEvent*> mEpollMap; // fd, type, IEpollEvent };
EpollEventHandler
主要封装了epoll
接口,集中管理并监听所有IEpollEvent
实例。在EpollLoop
循环中,阻塞等待并处理各类句柄事件,一旦事件触发,即通过多态调用IEpollEvent
的虚函数来EpollEvent
执行特定的事件处理逻辑,从而实现差异化的处理需求。
void EpollEventHandler::EpollLoop(bool bRun) { struct epoll_event ep[32]; mRun = bRun; do { if (!mRun) { break; } // 无事件时, epoll_wait阻塞, 等待 int count = epoll_wait(mHandle, ep, sizeof(ep)/sizeof(ep[0]), -1); if (count <= 0) { continue; } for (int i = 0; i < count; i++) { IEpollEvent* p = (IEpollEvent*)ep[i].data.ptr; if (p == nullptr) { continue; } // TODO: 丢到线程/协程池响应 p->EpollEvent(p->GetEpollFd(), p->GetEpollType(), p->GetArgs()); } } while(mRun); SPR_LOGD("EpollLoop exit\n"); }
- IEpollEvent
class IEpollEvent { public: IEpollEvent(int fd, EpollType eType = EPOLL_TYPE_BEGIN, void* arg = nullptr) : mEpollFd(fd), mEpollType(eType), mArgs(arg) {}; virtual ~IEpollEvent() = default; virtual ssize_t Write(int fd, const std::string& bytes); virtual ssize_t Read(int fd, std::string& bytes); virtual void* EpollEvent(int fd, EpollType eType, void* arg) = 0; int GetEpollFd() { return mEpollFd; } EpollType GetEpollType() { return mEpollType; } void* GetArgs() { return mArgs; } protected: int mEpollFd; EpollType mEpollType; void* mArgs; };
IEpollEvent
主要统一句柄注册与事件处理的标准操作,方便EpollEventHandler
统一监听,通过EpollEvent
实现差异化响应。
- PSocket
class PSocket : public IEpollEvent { public: PSocket(int domain, int type, int protocol, std::function<void(int, void*)> cb, void* arg = nullptr); PSocket(int sock, std::function<void(int, void*)> cb, void* arg = nullptr); virtual ~PSocket(); void Close(); int AsTcpServer(short bindPort, int backlog); int AsTcpClient(bool con = false, const std::string& srvAddr = "", short srvPort = 0, int rcvLen = 512 * 1024, int sndLen = 512 * 1024); int AsUdpServer(short bindPort, int rcvLen = 512 * 1024); int AsUdpClient(const std::string& srvAddr, short srvPort, int sndLen = 512 * 1024); int AsUnixStreamServer(const std::string& serverName, int backlog); int AsUnixStreamClient(bool con = false, const std::string& serverName = "", const std::string& clientName = ""); int AsUnixDgramServer(const std::string& serverName); int AsUnixDgramClient(const std::string& serverName); virtual void* EpollEvent(int fd, EpollType eType, void* arg) override; private: bool mEnable; PSocketType mSockType; std::function<void(int, void*)> mCb; };
- PUart
class PUart : public IEpollEvent { public: PUart(const std::string& devPath, std::function<void(int, char *, long, void*)> cb, void* arg = nullptr, speed_t rate = B115200, int parity = 0, int stopbit = 1 ); virtual ~PUart(); void* EpollEvent(int fd, EpollType eType, void* arg) override; bool SetupPort(speed_t rate, int parity, int stopbit); void Close(); private: std::function<void(int, char *, long, void*)> mCb; std::string mDevFile; };
测试效果
- 测试代码这里实现一个TCP server的功能,响应多个客户端请求。
int main(int argc, const char *argv[]) { std::mutex epFdMutex; EpollEventHandler *pEpoll = EpollEventHandler::GetInstance(); auto tcpClient = make_shared<PSocket>(AF_INET, SOCK_STREAM, 0, [&](int sock, void *arg) { PSocket* pCliObj = (PSocket*)arg; if (pCliObj == nullptr) { SPR_LOGE("PSocket is nullptr\n"); return; } std::string rBuf; int rc = pCliObj->Read(sock, rBuf); if (rc > 0) { SPR_LOGD("# RECV [%d]> %s\n", sock, rBuf.c_str()); } else { pEpoll->DelPoll(pCliObj); SPR_LOGD("## CLOSE [%d]\n", sock); std::lock_guard<std::mutex> lock(epFdMutex); pCliObj->Close(); } }); tcpClient->AsTcpClient(true, "127.0.0.1", 8080); pEpoll->AddPoll(tcpClient.get()); std::thread wThread([&]{ while(true) { std::lock_guard<std::mutex> lock(epFdMutex); tcpClient->Write(tcpClient->GetEpollFd(), "Hello World"); sleep(1); } }); pEpoll->EpollLoop(true); wThread.join(); return 0; }
- 测试结果
$ ./sample_tcpserver 81 EpollEvent D: Add epoll fd 4 81 EpollEvent D: Add epoll fd 5 81 EpollEvent D: Add epoll fd 6 54 TcpServer D: # RECV [6]> I'm Client A 58 TcpServer D: # SEND [6]> ACK 54 TcpServer D: # RECV [5]> I'm Client B 58 TcpServer D: # SEND [5]> ACK 54 TcpServer D: # RECV [6]> I'm Client A 58 TcpServer D: # SEND [6]> ACK 54 TcpServer D: # RECV [5]> I'm Client B 58 TcpServer D: # SEND [5]> ACK
测试结果上看,sample_tcpserver能够实现一个线程同时监听两个客户端的请求和应答。
总结
- 本篇主要操练一下
epoll
的常规使用,简单做一下封装能够实现epoll
监听各个类型的句柄事件。其实epoll
还可以监听消息队列、串口等其他文件句柄,深入挖掘一下,能够实现很多优雅的操作。 - 本实践深受先前一位导师兼朋友所分享代码的启发,其创新性地提出了采用epoll结合协程机制来替代传统多线程架构的方法,让我受益匪浅。
epoll
的妙用远不止于此,后续的代码会不断挖掘,并集成到个人的开源项目中。