高性能网络设计秘笈:深入剖析Linux网络IO与epoll

简介: 本文介绍了网络IO模型,引入了epoll作为Linux系统中高性能网络编程的核心工具。通过分析epoll的特点与优势,并给出使用epoll的注意事项和实践技巧,该文章为读者提供了宝贵的指导。通过掌握这些知识,读者能够构建高效、可扩展和稳定的网络应用,提供出色的用户体验。

一、epoll简介

epoll是Linux内核中一种可扩展的IO事件处理机制,可替代select和poll的系统调用。处理百万级并发访问性能更佳。

二、select的局限性

(1) 文件描述符越多,性能越差。 单个进程中能够监视的文件描述符存在最大的数量,默认是1024(在linux内核头文件中定义有 #define _FD_SETSIZE 1024),当然也可以修改,但是文件描述符数量越多,性能越差。
(2)开销巨大 ,select需要复制大量的句柄数据结构,产生了巨大的开销(内核/用户空间内存拷贝问题)。
(3)select需要遍历整个句柄数组才能知道哪些句柄有事件。
(4)如果没有完成对一个已经就绪的文件描述符的IO操作,那么每次调用select还是会将这些文件描述符通知进程,即水平触发。
(5)poll使用链表保存监视的文件描述符,虽然没有了监视文件数量的限制,但是其他缺点依旧存在。
由于以上缺点,基于select模型的服务器程序,要达到十万以上的并发访问,是很难完成的。因此,epoll出场了。

三、epoll的优点

(1)不需要轮询所有的文件描述符
(2)每次取就绪集合,都在固定位置
(3)事件的就绪和IO触发可以异步解耦

四、epoll函数原型

4.1、epoll_create(int size)

#include <sys/epoll.h>
int epoll_create(int size);

功能:创建epoll的文件描述符。
参数说明:size表示内核需要监控的最大数量,但是这个参数内核已经不会用到,只要传入一个大于0的值即可。 当size<=0时,会直接返回不可用,这是历史原因保留下来的,最早的epoll_create是需要定义一次性就绪的最大数量;后来使用了链表以便便维护和扩展,就不再需要使用传入的参数。
返回:返回该对象的描述符,注意要使用 close 关闭该描述符。

4.2、epoll_ctl

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// epoll_ctl对应系统调用sys_epoll_ctl

功能:操作epoll的文件描述符,主要是对epoll的红黑树节点进行操作,比如节点的增删改查。
参数说明:

参数 含义
epfd 通过 epoll_create 创建的文件描述符
op 对红黑树的操作,比如节点的增加、修改、删除,分别对应EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL
fd 需要添加监听的文件描述符
event 事件信息

4.2.1、event参数说明

struct epoll_event结构体原型

typedef union epoll_data{
   
   
    void* ptr;
    int fd;
    uint32_t u32;
    uint64_t u64
};

struct epoll_event{
   
   
    uint32_t events;
    epoll_data_t data;
}

events成员代表要监听的epoll事件类型
events成员:

成员变量 含义
EPOLLIN 监听fd的读事件
EPOLLOUT 监听fd的写事件
EPOLLRI 监听紧急数据可读事件(带外数据到来)
EPOLLRDHUP 监听套接字关闭或半关闭事件
EPOLLET 将EPOLL设为边缘触发(Edge Triggered)模式

data成员:
data 成员时一个联合体类型,可以在调用 epoll_ctl 给 fd 添加/修改描述符监听的事件时携带一些数据,方便后面的epoll_wait可以取出信息使用。

4.2.2、扩展说明:SYSCALL_DEFINE数字 的宏定义

跟着的数字代表函数需要的参数数量,比如SYSCALL_DEFINE1代表函数需要一个参数、SYSCALL_DEFINE4代表函数需要4个参数。

4.2.3、注意

epoll_ctl是非阻塞的,不会被挂起。

4.3、epoll_wait

函数原型

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

功能:阻塞一段时间,等待事件发生
返回:返回事件数量,事件集添加到events数组中。也就是遍历红黑树中的双向链表,把双向链表中的节点数据拷贝出来,拷贝完毕后把节点从双向链表中移除。

返回值 含义
大于0 事件个数
等于0 超时时间timeout到了
小于0 出错,可通过errno查看出错原因

参数说明:

参数 含义
epfd 通过 epoll_create 创建的文件描述符
events 存放就绪的事件集合,是输出参数
maxevents 最大可存放事件数量,events数组大小
timeout 阻塞等待的时间长短,单位是毫秒,-1表示一直阻塞等待

五、epoll使用步骤

step 1:创建epoll文件描述符

int epfd = epoll_create(1);

step 2:创建struct epoll_event结构体

struct epoll_event ev;
ev.data.fd=listenfd;//保存监听的fd,以便epoll_wait的后续操作
ev.events=EPOLLIN;//设置监听fd的可读事件

step 3:添加事件监听

epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

step 4:等待事件

struct epoll_event events[EVENTS_LENGTH];
char rbuffer[MAX_BUFF]={
   
    0 };
char wbuffer[MAX_BUFF]={
   
    0 };
while(1)
{
   
   
    int nready = epoll_wait(epfd,events,EVENTS_LENGTH,-1);//-1表示阻塞等待
    int i=0;
    for(i=0;i<nready;i++)
    {
   
   
        int clientfd=events[i].data.fd;
        if(clientfd==listenfd)
        {
   
   
            struct sockaddr_in client;
            int len=sizeof(client);
            int confd=accept(listenfd,(struct sockaddr*)&client,&len);
            //step 2:创建struct epoll_event结构体
            struct epoll_event evt;
            evt.data.fd=confd;//保存监听的fd,以便epoll_wait的后续操作
            evt.events=EPOLLIN;//设置监听fd的可读事件
            // step 3:添加事件监听
            epoll_ctl(epfd,EPOLL_CTL_ADD,confd,&evt);
        }
        else if(events[i].events &EPOLLIN)
        {
   
   
            int ret = recv(clientfd,rbuffer,MAX_BUFF,0);
            if(ret>0)
            {
   
   
                rbuffer[ret]='\0';//剔除干扰数据
                printf("recv: %s\n",rbuffer);
                memcpy(wbuffer,rbuffer,MAX_BUFF);//拷贝数据,做回传示例
                //step 2:创建struct epoll_event结构体
                struct epoll_event evt;
                evt.data.fd=clientfd;//保存监听的fd,以便epoll_wait的后续操作
                evt.events=EPOLLOUT;//设置监听fd的可写事件
                // step 3:修改事件监听
                epoll_ctl(epfd,EPOLL_CTL_MOD,clientfd,&evt);
            }
        }
        else if(events[i].events &EPOLLOUT)
        {
   
   
            int ret = send(clientfd,wbuffer,MAX_BUFF,0);
            printf("send: %s\n",wbuffer);
            //step 2:创建struct epoll_event结构体
            struct epoll_event evt;
            evt.data.fd=clientfd;//保存监听的fd,以便epoll_wait的后续操作
            evt.events=EPOLLIN;//设置监听fd的可读事件
            // step 3:修改事件监听
            epoll_ctl(epfd,EPOLL_CTL_MOD,clientfd,&evt);

        }
    }
}

六、完整示例代码

#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>

#include <unistd.h>

#include <pthread.h>

#include <sys/epoll.h>

#include <string.h>

#define BUFFER_LENGTH    128

#define EVENTS_LENGTH    128

char rbuff[BUFFER_LENGTH] = {
   
    0 };
char wbuff[BUFFER_LENGTH] = {
   
    0 };

int main() {
   
   

// block
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);  // 
    if (listenfd == -1) return -1;
// listenfd
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);

    if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
   
   
        return -2;
    }

#if 0 // nonblock
    int flag = fcntl(listenfd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(listenfd, F_SETFL, flag);
#endif

    listen(listenfd, 10);


    int epfd = epoll_create(1);
    struct epoll_event ev, events[EVENTS_LENGTH];
    ev.events = EPOLLIN;
    ev.data.fd = listenfd;

    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

    printf("epfd : %d\n", epfd);
    while (1)
    {
   
   
        int nready = epoll_wait(epfd, events, EVENTS_LENGTH, -1);
        printf("nready --> %d\n",nready);
        int i;
        for (i = 0; i < nready;i++)
        {
   
   
            int clientfd = events[i].data.fd;
            if (listenfd == clientfd)
            {
   
   
                // accept
                struct sockaddr_in client;
                int len = sizeof(client);
                int conffd = accept(clientfd, (struct sockaddr*)&client,&len);

                printf("conffd --> %d\n",conffd);
                ev.events = EPOLLIN;
                ev.data.fd = conffd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, conffd, &ev);
            }
            else if(events[i].events & EPOLLIN)//client
            {
   
   
                int ret=recv(clientfd, rbuff, BUFFER_LENGTH, 0);
                if (ret > 0)
                {
   
   
                    rbuff[ret] = '\0';
                    printf("recv buffer: %s\n", rbuff);
                    /*
                    int j;
                    for (j = 0; j < BUFFER_LENGTH;j++)
                    {
                        buff[j] = 'a' + (j % 26);
                    }
                    send(clientfd, buff, BUFFER_LENGTH, 0);
                    */
                    memcpy(wbuff, rbuff, BUFFER_LENGTH);
                    ev.events = EPOLLOUT;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
                }

            }
            else if (events[i].events & EPOLLOUT)
            {
   
   
                send(clientfd, wbuff, BUFFER_LENGTH, 0);
                printf("send --> %s\n",wbuff);
                ev.events = EPOLLIN;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
            }
        }
    }

    return 0;
}

七、epoll的缺点

读写使用相同的缓冲区。比如上述的示例中,wbuffer和rbuffer是使用同一个缓冲区的,所以需要rbuff[ret] = '\0';去除杂数据。

八、水平触发(LT)与边沿触发(ET)

8.1、两者差异

1、水平触发可以一次recv,边沿触发需要用循环来recv;
2、水平触发可以使用阻塞模式,边沿模式不能
3、两者性能差异非常小,一般小数据使用水平触发LT,大数据使用边沿触发ET
4、listen fd最好使用水平触发,尽量不要边沿触发
5、当当recv的buffer小于接受的数据时:
(1)水平触发是只要有数据就一直触发,直到数据读完;
(2)边沿触发是来一次连接触发一次,如果接受数据的buffer不够大,则数据会保留在缓冲区,下次触发继续从缓冲区读出来;
6、一般,水平触发只需要一个recv,边沿触发需要搭配while从缓冲区读完数据

8.2、设置触发模式

默认是水平触发模式,在事件中设置中 | EPOLLET 就可以设置边沿触发,不设置则默认是水平触发。
例如:

ev.events=EPOLL_IN | EPOLLET

九、常见疑惑问题

9.1、为什么提前先定义一个事件?

我们需要注册,内核才会有事件来的时候通知进程。比如生活中要退一个快递,那么我们需要注册一个快递公司的账户,然后发送一个退快递请求时快递公司才能找到你并取快递。

9.2、epoll events超出EVENTS_LENGTH?

epoll会循环拷贝红黑树结构体中的双向链表节点,读取节点数据,直到没有事件。

9.3、缓冲区有多大空间时才返回可读/可写?

只要缓冲区有空间就返回可读、可写,不管空间多少。比如缓冲区是1024,但是有1023有数据了,这种极端条件也会返回可读、可写。

9.4、recv和send放在一起时,有什么问题?

发送给客户端数据很大的时候(大于内核缓冲区),就可能出现send不全,客户端recv不全,最好用EPOLLOUT单独处理发送数据事件。

总结

本文介绍了网络IO模型,引入了epoll作为Linux系统中高性能网络编程的核心工具。通过分析epoll的特点与优势,并给出使用epoll的注意事项和实践技巧,该文章为读者提供了宝贵的指导。通过掌握这些知识,读者能够构建高效、可扩展和稳定的网络应用,提供出色的用户体验。

image.png

相关实践学习
CentOS 7迁移Anolis OS 7
龙蜥操作系统Anolis OS的体验。Anolis OS 7生态上和依赖管理上保持跟CentOS 7.x兼容,一键式迁移脚本centos2anolis.py。本文为您介绍如何通过AOMS迁移工具实现CentOS 7.x到Anolis OS 7的迁移。
目录
相关文章
|
1月前
|
监控 安全 Linux
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景,包括 ping(测试连通性)、traceroute(跟踪路由路径)、netstat(显示网络连接信息)、nmap(网络扫描)、ifconfig 和 ip(网络接口配置)。掌握这些命令有助于高效诊断和解决网络问题,保障网络稳定运行。
73 2
|
1月前
|
大数据 云计算
中国网络大会专题论坛 | 下一代超大规模高性能公共云网络
中国计算机学会ChinaNet上,阿里云洛神云网络将与知名学术届代表一起共话下一代超大规模高性能公共云网络的关键技术。
|
3天前
|
数据采集 监控 安全
公司网络监控软件:Zig 语言底层优化保障系统高性能运行
在数字化时代,Zig 语言凭借出色的底层控制能力和高性能特性,为公司网络监控软件的优化提供了有力支持。从数据采集、连接管理到数据分析,Zig 语言确保系统高效稳定运行,精准处理海量网络数据,保障企业信息安全与业务连续性。
19 4
|
1月前
|
域名解析 网络协议 安全
|
2月前
|
运维 监控 网络协议
|
1月前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
124 0
|
1月前
|
存储 Ubuntu Linux
2024全网最全面及最新且最为详细的网络安全技巧 (三) 之 linux提权各类技巧 上集
在本节实验中,我们学习了 Linux 系统登录认证的过程,文件的意义,并通过做实验的方式对 Linux 系统 passwd 文件提权方法有了深入的理解。祝你在接下来的技巧课程中学习愉快,学有所获~和文件是 Linux 系统登录认证的关键文件,如果系统运维人员对shadow或shadow文件的内容或权限配置有误,则可以被利用来进行系统提权。上一章中,我们已经学习了文件的提权方法, 在本章节中,我们将学习如何利用来完成系统提权。在本节实验中,我们学习了。
|
2月前
|
Ubuntu Linux 虚拟化
Linux虚拟机网络配置
【10月更文挑战第25天】在 Linux 虚拟机中,网络配置是实现虚拟机与外部网络通信的关键步骤。本文介绍了四种常见的网络配置方式:桥接模式、NAT 模式、仅主机模式和自定义网络模式,每种模式都详细说明了其原理和配置步骤。通过这些配置,用户可以根据实际需求选择合适的网络模式,确保虚拟机能够顺利地进行网络通信。
108 1
|
1月前
|
大数据 云计算
2024 CCF中国网络大会专题论坛丨下一代超大规模高性能公共云网络 精彩回顾
中国计算机学会ChinaNet上,阿里云洛神云网络将与知名学术届代表一起共话下一代超大规模高性能公共云网络的关键技术。