基于Reactor模式的高性能服务器之Acceptor组件(处理连接)

简介: 本节介绍了对底层 Socket 进行封装的设计与实现,通过 `Socket` 类隐藏系统调用细节,提供简洁、安全、可读性强的接口。重点包括 `Socket` 类的核心作用(管理 `sockfd_`)、成员函数的功能(如绑定地址、监听、接受连接等),以及 `Acceptor` 组件的职责:监听连接、接收新客户端连接并分发给上层处理。同时说明了 `Acceptor` 与 `EventLoop` 和 `TcpServer` 的协作关系,并展示了其成员变量和关键函数的工作机制。

对底层socket进行封装

Socket 类的核心作用是封装和管理套接字文件描述符 sockfd_,为服务端开发提供简洁、安全、可读性强的接口,隐藏底层系统调用细节。  

const   int  sockfd_;//socket 的文件描述符,用于网络通信

成员函数

    int  fd()const  {return   sockfd_;}返回socket文件描述符

  explicit  Socket(int sockfd)
        :sockfd_(sockfd)
        {}


      Socket::~Socket()
        {
            close(sockfd_);//关闭套接字
        } 


//法,用于将 socket 文件描述符 sockfd_ 绑定到一个本地地址(localaddr),(IP + 端口)
 void    Socket::bindaddress(const  InetAddress&localaddr)
        {
        //::bind(...) 是系统调用 bind,用于将一个 socket 绑定到一个本地地址。
            if(0!=::bind(sockfd_,(sockaddr*)localaddr.getSockAddr(),sizeof(sockaddr_in)))
              {
                    LOG_FATAL("bind  sockfd:%d fail\n",sockfd_);
              }
        }


          void   Socket::listen()//中封装 listen() 系统调用的部分,用于让 socket 进入 监听状态(Listen 状态),准备接收客户端连接
        {
            if(0!=::listen(sockfd_,1024))//内核允许排队等待 accept 的最大连接数是 1024。
            {
                LOG_FATAL("listen  sockfd:%d fail\n",sockfd_);

            }
        }



//接受一个客户端连接,返回新的连接套接字描述符,并且设置该套接字为非阻塞模式,同时把客户端地址保存到传入的 InetAddress 对象中
  int Socket::accept(InetAddress *peeraddr)
{
    sockaddr_in addr;          // 定义一个 sockaddr_in 结构体 addr,用来存储客户端的 IP 和端口信息。
    socklen_t len = sizeof(addr); // 地址长度
    bzero(&addr, sizeof(addr));   // 清零,避免脏数据

    int connfd = ::accept(sockfd_, (sockaddr*)&addr, &len);//客户端地址信息会写入 addr,长度存入 len,返回值 connfd 是新连接的 socket fd
    if (connfd < 0)
    {
        perror("accept error");  // accept 调用失败,打印错误信息
        return -1;
    }

    // 连接成功,保存客户端地址
    peeraddr->setSockAddr(addr);

    // 设置connfd为非阻塞
    int flags = fcntl(connfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl F_GETFL error");
        // 这里仍然返回connfd,但说明非阻塞设置失败
        return connfd;
    }
    if (fcntl(connfd, F_SETFL, flags | O_NONBLOCK) == -1)
    {
        perror("fcntl F_SETFL O_NONBLOCK error");
        // 设置非阻塞失败,返回connfd
        return connfd;
    }
    printf("accept new connection: fd=%d, set non-blocking OK\n", connfd);
    return connfd;
}




 void   Socket::showdownWrite()//关闭套接字的写端。


        {
            if(::shutdown(sockfd_,SHUT_WR)<0)//SHUT_WR 表示关闭写操作,即告诉对端“我不会再写数据了”,但仍然可以继续读取数据。
            {
                LOG_ERROR("shutdownWrite  error");
            }
        }



   void   Socket::setTcpNoDelay(bool on)//作用:启用或禁用 TCP 的 Nagle 算法。
        {
            int  optval=on?1:0;
            ::setsockopt(sockfd_,IPPROTO_TCP,TCP_NODELAY,&optval,sizeof  optval);
        }



 void   Socket::setReuseAddr(bool  on)//允许重用本地地址。
        {
              int  optval=on?1:0;
            ::setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof  optval);
        }

        

   void   Socket::setReusePort(bool  on)//允许多个套接字绑定同一个端口号
        {
             int  optval=on?1:0;
            ::setsockopt(sockfd_,SOL_SOCKET,SO_REUSEPORT,&optval,sizeof  optval);
        }




   void   Socket::setKeepAlive(bool on)//,启用 TCP 的保活机制
        {

             int  optval=on?1:0;
            ::setsockopt(sockfd_,SOL_SOCKET,SO_KEEPALIVE,&optval,sizeof  optval);
        }

Acceptor组件

监听 + 接收连接 + 分发连接  

封装并管理服务器监听 socket 的接收行为,专门负责接收客户端的新连接,并将新连接交给上层(如 TcpServer)进一步处理。  

Acceptor 与其他模块的关系

  • EventLoop(baseLoop):
  • Acceptor 在主线程的 EventLoop 上工作,监听 listenfd。
  • 把 listenfd 的读事件(通过acceptchannel)交给 poller(epoll)管理。
  • TcpServer
  • TcpServer 创建 Acceptor,并设置新连接回调 Acceptor::setNewConnectionCallback()
  • 当有新连接时,Acceptor 调用 TcpServer 提供的回调函数。

成员变量

using  NewconnectionCallback=std::function<void(int sockfd,const InetAddress&)>;//  表示新连接建立时的回调函数类型,可由用户传入处理逻辑
 EventLoop *loop_;//Acceptor用的就是用户定义的那个baseLoop,也称作mainLoop;
Socket   acceptSocket_;//封装了 监听 socket 的 fd(包括 bind、listen、accept 操作)。
Channel  acceptChannel_;//负责将监听 socket 注册到 loop_ 的 poller 中,并设置可读事件回调(新连接到来时触发 handleRead())。
NewconnectionCallback  newconnectionCallback_;//    用户设置的回调函数,用于处理已建立的新连接。
bool  listenning_;//表示当前是否已经调用了 listen() 并正在监听连接。

成员函数

void setNewconnectionCallback(const   NewconnectionCallback&cb)
    {
        newconnectionCallback_=cb;
    }


//createNonblocking() 是为事件驱动网络编程专门定制的安全 socket 创建函数,生成一个非阻塞、自动关闭的 TCP socket fd,用于监听或连接
  static  int   createNonblocking()
{
   int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
    if(sockfd<0)
    {
        LOG_FATAL("%s:%s:%d  listen socket  create err%d  \n:",__FILE__,__FUNCTION__,__LINE__,errno);
    }
    return   sockfd;
}



//在主线程(baseLoop)中创建监听 socket,并注册读事件回调,用于接受新连接。

 Acceptor::Acceptor(EventLoop  *loop,const  InetAddress &listenAddr,bool  reuseport)
    :loop_(loop),//指向主线程的 EventLoop(也就是 baseLoop)
    acceptSocket_(createNonblocking()),//   调用 createNonblocking() 创建的监听 socket 封装类
    acceptChannel_(loop,acceptSocket_.fd()),//  用于监听 acceptSocket_ 上是否有新连接(readable)的 Channel
    listenning_(false)//标记是否处于监听状态
    {
        acceptSocket_.setReuseAddr(true);//允许重用本地地址。
         acceptSocket_.setReusePort(true);//支持端口重用
        acceptSocket_.bindaddress(listenAddr);//将监听 socket 绑定到指定地址和端口
        acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this));
        //设置监听 socket 上的 读事件回调,当有新连接到来时,EventLoop 会回调 handleRead()。
    }





Acceptor::~Acceptor()
      {
        acceptChannel_.disableAll();//取消该 Channel 所关注的所有事件(读、写等),然后调用 update() 更新在 epoll 中的注册状态。
        acceptChannel_.remove();//从 Poller(epoll)中注销一个 Channel 对象
      }



//让监听 socket(acceptSocket_)开始监听,并将其对应的 Channel(acceptChannel_)注册到 Poller 中,等待 读事件(新连接)。
   void    Acceptor::listen()
    {
        listenning_=true;//标记当前 Acceptor 正在监听
        acceptSocket_.listen();
        acceptChannel_.enableReading();//acceptchannel=>poller(注册事件)监听是否有读事件发生,如果有调用readcallback_回调=》Acceptor::handleRead()
    }
    在这里调用 enableReading() 后,它会通过 EventLoop::updateChannel() 将其注册到 Poller(epoll)中。
一旦 listenfd 上有新连接(即变成 可读 状态),Poller 就会触发该 Channel  handleEvent()
 handleEvent() 又会调用绑定的回调函数:acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
最终触发的是 Acceptor::handleRead() 来调用 accept()




 // 这是 acceptChannel_ 在有“可读事件”时的回调函数,说明有新客户端连接到来,负责 accept 并把这个连接交给服务器处理(分发到 subloop)。
    void    Acceptor::handleRead()
    {
            InetAddress   peerAddr;//创建一个 InetAddress 类型对象,用来接收客户端的 IP 地址和端口信息
            int  connfd=acceptSocket_.accept(&peerAddr);//调用封装好的 Socket::accept(),尝试从 listenfd 中接收一个新的连接。,如果连接成功,会返回一个新的 connfd(客户端 socket)。并把客户端地址写入 peerAddr。
            if(connfd>=0)//表示成功拿到了一个有效连接 socket。
            {
                if(newconnectionCallback_)//判断是否设置了新连接回调函数(通常由 TcpServer 设置:用于处理新连接。)
                {
                    newconnectionCallback_(connfd,peerAddr);//传入新连接的 fd 和客户端地址,交给上层服务器,进行分发给subloop
                }
                else
                 {
                ::close(connfd);            //如果没有设置回调,就直接关闭这个连接,避免资源泄露。

                }
            }
            else
            {
                LOG_ERROR("%s:%s:%d accept  err:%d \n",__FILE__,__FUNCTION__,__LINE__,errno);
                if(errno==EMFILE)//如果是 EMFILE 错误,说明当前进程打开的文件描述符数已经达到上限
                {
                    LOG_ERROR("%s:%ssockfd  reached   limit!  err:%d \n",__FILE__,__FUNCTION__,__LINE__);
                }
            }
            
    }
Accept组件流程:因为底层socket的bind,listen,accept已经封装好,在这里accept持有一个loop,创建一个监听套接字acceptsocket,从而可以构造出事件通道acceptchannel.acceptchannel将读事件通过enablereading函数逐步注册到poller上面,然后设置回调函数handleRead;

accceptsocket绑定,监听,是否有新连接发生,如果一旦有新连接发生,poller会立即返回,acceptchannel->handleread()被调用,这里面acceptsocket的accept被调用,得到connfd,和客户端地址信息,通过新连接回调交给上层服务器进行分发处理;
相关文章
|
3月前
|
弹性计算 定位技术 数据中心
阿里云服务器选择方法:配置、地域及付费模式全解析
2025阿里云服务器选购指南:就近选择地域以降低延迟,企业用户优选2核4G5M带宽u1实例,仅199元/年;个人用户可选2核2G3M带宽ECS,99元/年起。长期稳定业务选包年包月,短期或波动场景用按量付费,轻松搭建网站首选高性价比配置。
|
4月前
|
弹性计算 运维 分布式计算
阿里云服务器付费模式选择:节省计划、预留实例券、抢占式实例详解
在我们选购阿里云服务器的时候,有多种计费方式可选,其中包年包月和按量付费是用户最熟悉也是选择最多的计费方式,除了这两种方式之外,阿里云还提供了抢占式实例、节省计划、预留实例券三种更为灵活且经济的付费模式,旨在满足不同业务场景下的长周期低成本与短周期高弹性需求,有的新手用户朋友不清楚这三种计费方式是什么,本文将为大家解析这三种付费模式的内在机制、优势对比及适用场景,以供您选择参考。
|
4月前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle客户端与服务器端连接建立的过程
Oracle数据库采用客户端-服务器架构,客户端通过其网络环境与服务器通信,实现数据库访问。监听程序负责建立连接,通过命令lsnrctl可启动、停止及查看监听状态。本文介绍了监听器的作用及相关基础管理操作。
216 0
|
5月前
|
弹性计算 关系型数据库 Nacos
低配阿里云 ECS 如何 docker 环境部署 NACOS : 单机版模式
NACOS 单机版 Docker 安装指南。使用指定端口和 custom.env 配置文件启动 Nacos 服务,适用于 2.X 版本,包含 gRPC 支持及 MySQL 数据源配置。 -e MODE=standalone \
439 5
|
8月前
|
存储 弹性计算 资源调度
阿里云服务器收费模式对比:包年包月与按量付费的适用场景与选择参考
在我们购买阿里云服务器的时候,云服务器的收费模式主要有多种收费模式,其中包年包月和按量付费两种主流模式。对于准备在阿里云上部署应用的用户来说,选择合适的收费模式至关重要,因为它直接关系到成本控制和资源使用的灵活性。本文将对这两种收费模式做一个对比,以供参考和选择。
1131 14
|
9月前
|
人工智能 安全 大数据
【限时特惠】阿里云服务器7折抢购!高性能+高性价比,开发者与企业必备攻略
阿里云服务器限时7折特惠,高性能、高性价比,为开发者和企业量身打造!新老用户均可参与,灵活配置满足多种需求,全球节点低延迟覆盖。自研神龙架构保障稳定性,安全防护全面,操作便捷,生态丰富。适用于个人开发、企业部署、跨境业务及AI计算等场景。点击专属链接立即抢购,活动名额有限,速来享受云端算力带来的高效体验!
245 0
|
2月前
|
弹性计算 运维 安全
阿里云轻量应用服务器与云服务器ECS啥区别?新手帮助教程
阿里云轻量应用服务器适合个人开发者搭建博客、测试环境等低流量场景,操作简单、成本低;ECS适用于企业级高负载业务,功能强大、灵活可扩展。二者在性能、网络、镜像及运维管理上差异显著,用户应根据实际需求选择。
242 10
|
2月前
|
运维 安全 Ubuntu
阿里云渠道商:服务器操作系统怎么选?
阿里云提供丰富操作系统镜像,涵盖Windows与主流Linux发行版。选型需综合技术兼容性、运维成本、安全稳定等因素。推荐Alibaba Cloud Linux、Ubuntu等用于Web与容器场景,Windows Server支撑.NET应用。建议优先选用LTS版本并进行测试验证,通过标准化镜像管理提升部署效率与一致性。
|
2月前
|
弹性计算 ice
阿里云4核8g服务器多少钱一年?1个月和1小时价格,省钱购买方法分享
阿里云4核8G服务器价格因实例类型而异,经济型e实例约159元/月,计算型c9i约371元/月,按小时计费最低0.45元。实际购买享折扣,1年最高可省至1578元,附主流ECS实例及CPU型号参考。
385 8
|
2月前
|
存储 监控 安全
阿里云渠道商:云服务器价格有什么变动?
阿里云带宽与存储费用呈基础资源降价、增值服务差异化趋势。企业应结合业务特点,通过阶梯计价、智能分层、弹性带宽等策略优化成本,借助云监控与预算预警机制,实现高效、可控的云资源管理。

热门文章

最新文章