对底层socket进行封装
Socket 类的核心作用是封装和管理套接字文件描述符 sockfd_,为服务端开发提供简洁、安全、可读性强的接口,隐藏底层系统调用细节。
const int sockfd_;//socket 的文件描述符,用于网络通信
成员函数
int fd()const {return sockfd_;}返回socket文件描述符 explicit Socket(int sockfd) :sockfd_(sockfd) {} Socket::~Socket() { close(sockfd_);//关闭套接字 } //法,用于将 socket 文件描述符 sockfd_ 绑定到一个本地地址(localaddr),(IP + 端口) void Socket::bindaddress(const InetAddress&localaddr) { //::bind(...) 是系统调用 bind,用于将一个 socket 绑定到一个本地地址。 if(0!=::bind(sockfd_,(sockaddr*)localaddr.getSockAddr(),sizeof(sockaddr_in))) { LOG_FATAL("bind sockfd:%d fail\n",sockfd_); } } void Socket::listen()//中封装 listen() 系统调用的部分,用于让 socket 进入 监听状态(Listen 状态),准备接收客户端连接 { if(0!=::listen(sockfd_,1024))//内核允许排队等待 accept 的最大连接数是 1024。 { LOG_FATAL("listen sockfd:%d fail\n",sockfd_); } } //接受一个客户端连接,返回新的连接套接字描述符,并且设置该套接字为非阻塞模式,同时把客户端地址保存到传入的 InetAddress 对象中 int Socket::accept(InetAddress *peeraddr) { sockaddr_in addr; // 定义一个 sockaddr_in 结构体 addr,用来存储客户端的 IP 和端口信息。 socklen_t len = sizeof(addr); // 地址长度 bzero(&addr, sizeof(addr)); // 清零,避免脏数据 int connfd = ::accept(sockfd_, (sockaddr*)&addr, &len);//客户端地址信息会写入 addr,长度存入 len,返回值 connfd 是新连接的 socket fd if (connfd < 0) { perror("accept error"); // accept 调用失败,打印错误信息 return -1; } // 连接成功,保存客户端地址 peeraddr->setSockAddr(addr); // 设置connfd为非阻塞 int flags = fcntl(connfd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL error"); // 这里仍然返回connfd,但说明非阻塞设置失败 return connfd; } if (fcntl(connfd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl F_SETFL O_NONBLOCK error"); // 设置非阻塞失败,返回connfd return connfd; } printf("accept new connection: fd=%d, set non-blocking OK\n", connfd); return connfd; } void Socket::showdownWrite()//关闭套接字的写端。 { if(::shutdown(sockfd_,SHUT_WR)<0)//SHUT_WR 表示关闭写操作,即告诉对端“我不会再写数据了”,但仍然可以继续读取数据。 { LOG_ERROR("shutdownWrite error"); } } void Socket::setTcpNoDelay(bool on)//作用:启用或禁用 TCP 的 Nagle 算法。 { int optval=on?1:0; ::setsockopt(sockfd_,IPPROTO_TCP,TCP_NODELAY,&optval,sizeof optval); } void Socket::setReuseAddr(bool on)//允许重用本地地址。 { int optval=on?1:0; ::setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof optval); } void Socket::setReusePort(bool on)//允许多个套接字绑定同一个端口号 { int optval=on?1:0; ::setsockopt(sockfd_,SOL_SOCKET,SO_REUSEPORT,&optval,sizeof optval); } void Socket::setKeepAlive(bool on)//,启用 TCP 的保活机制 { int optval=on?1:0; ::setsockopt(sockfd_,SOL_SOCKET,SO_KEEPALIVE,&optval,sizeof optval); }
Acceptor组件
监听 + 接收连接 + 分发连接
封装并管理服务器监听 socket 的接收行为,专门负责接收客户端的新连接,并将新连接交给上层(如 TcpServer)进一步处理。
Acceptor 与其他模块的关系
- 与
EventLoop(baseLoop):
- Acceptor 在主线程的 EventLoop 上工作,监听 listenfd。
- 把 listenfd 的读事件(通过acceptchannel)交给 poller(epoll)管理。
- 与
TcpServer:
- TcpServer 创建 Acceptor,并设置新连接回调
Acceptor::setNewConnectionCallback()。 - 当有新连接时,Acceptor 调用 TcpServer 提供的回调函数。
成员变量
using NewconnectionCallback=std::function<void(int sockfd,const InetAddress&)>;// 表示新连接建立时的回调函数类型,可由用户传入处理逻辑 EventLoop *loop_;//Acceptor用的就是用户定义的那个baseLoop,也称作mainLoop; Socket acceptSocket_;//封装了 监听 socket 的 fd(包括 bind、listen、accept 操作)。 Channel acceptChannel_;//负责将监听 socket 注册到 loop_ 的 poller 中,并设置可读事件回调(新连接到来时触发 handleRead())。 NewconnectionCallback newconnectionCallback_;// 用户设置的回调函数,用于处理已建立的新连接。 bool listenning_;//表示当前是否已经调用了 listen() 并正在监听连接。
成员函数
void setNewconnectionCallback(const NewconnectionCallback&cb) { newconnectionCallback_=cb; } //createNonblocking() 是为事件驱动网络编程专门定制的安全 socket 创建函数,生成一个非阻塞、自动关闭的 TCP socket fd,用于监听或连接 static int createNonblocking() { int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if(sockfd<0) { LOG_FATAL("%s:%s:%d listen socket create err%d \n:",__FILE__,__FUNCTION__,__LINE__,errno); } return sockfd; } //在主线程(baseLoop)中创建监听 socket,并注册读事件回调,用于接受新连接。 Acceptor::Acceptor(EventLoop *loop,const InetAddress &listenAddr,bool reuseport) :loop_(loop),//指向主线程的 EventLoop(也就是 baseLoop) acceptSocket_(createNonblocking()),// 调用 createNonblocking() 创建的监听 socket 封装类 acceptChannel_(loop,acceptSocket_.fd()),// 用于监听 acceptSocket_ 上是否有新连接(readable)的 Channel listenning_(false)//标记是否处于监听状态 { acceptSocket_.setReuseAddr(true);//允许重用本地地址。 acceptSocket_.setReusePort(true);//支持端口重用 acceptSocket_.bindaddress(listenAddr);//将监听 socket 绑定到指定地址和端口 acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this)); //设置监听 socket 上的 读事件回调,当有新连接到来时,EventLoop 会回调 handleRead()。 } Acceptor::~Acceptor() { acceptChannel_.disableAll();//取消该 Channel 所关注的所有事件(读、写等),然后调用 update() 更新在 epoll 中的注册状态。 acceptChannel_.remove();//从 Poller(epoll)中注销一个 Channel 对象 } //让监听 socket(acceptSocket_)开始监听,并将其对应的 Channel(acceptChannel_)注册到 Poller 中,等待 读事件(新连接)。 void Acceptor::listen() { listenning_=true;//标记当前 Acceptor 正在监听 acceptSocket_.listen(); acceptChannel_.enableReading();//acceptchannel=>poller(注册事件)监听是否有读事件发生,如果有调用readcallback_回调=》Acceptor::handleRead() } 在这里调用 enableReading() 后,它会通过 EventLoop::updateChannel() 将其注册到 Poller(epoll)中。 一旦 listenfd 上有新连接(即变成 可读 状态),Poller 就会触发该 Channel 的 handleEvent()。 而 handleEvent() 又会调用绑定的回调函数:acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); 最终触发的是 Acceptor::handleRead() 来调用 accept()。 // 这是 acceptChannel_ 在有“可读事件”时的回调函数,说明有新客户端连接到来,负责 accept 并把这个连接交给服务器处理(分发到 subloop)。 void Acceptor::handleRead() { InetAddress peerAddr;//创建一个 InetAddress 类型对象,用来接收客户端的 IP 地址和端口信息 int connfd=acceptSocket_.accept(&peerAddr);//调用封装好的 Socket::accept(),尝试从 listenfd 中接收一个新的连接。,如果连接成功,会返回一个新的 connfd(客户端 socket)。并把客户端地址写入 peerAddr。 if(connfd>=0)//表示成功拿到了一个有效连接 socket。 { if(newconnectionCallback_)//判断是否设置了新连接回调函数(通常由 TcpServer 设置:用于处理新连接。) { newconnectionCallback_(connfd,peerAddr);//传入新连接的 fd 和客户端地址,交给上层服务器,进行分发给subloop } else { ::close(connfd); //如果没有设置回调,就直接关闭这个连接,避免资源泄露。 } } else { LOG_ERROR("%s:%s:%d accept err:%d \n",__FILE__,__FUNCTION__,__LINE__,errno); if(errno==EMFILE)//如果是 EMFILE 错误,说明当前进程打开的文件描述符数已经达到上限 { LOG_ERROR("%s:%ssockfd reached limit! err:%d \n",__FILE__,__FUNCTION__,__LINE__); } } } Accept组件流程:因为底层socket的bind,listen,accept已经封装好,在这里accept持有一个loop,创建一个监听套接字acceptsocket,从而可以构造出事件通道acceptchannel.acceptchannel将读事件通过enablereading函数逐步注册到poller上面,然后设置回调函数handleRead; accceptsocket绑定,监听,是否有新连接发生,如果一旦有新连接发生,poller会立即返回,acceptchannel->handleread()被调用,这里面acceptsocket的accept被调用,得到connfd,和客户端地址信息,通过新连接回调交给上层服务器进行分发处理;