教你使用io_uring来写一个并发回声服务器

简介: 教你使用io_uring来写一个并发回声服务器

io_uring的使用

什么是io_uring🍻

是内核版本5.10之后的产物,也就是你的内核版本要在5.10之后才能使用,用户空间的环形队列

看见其名字就知道,带队列,能够起到异步解耦的作用,它可以与epoll的性能相提并论,但是却与epoll的工作原理完全不同,下面就让我们来学习它

安装一个liburing

git clone https://github.com/axboe/liburing.git
./configure
make
make install

工作原理🍻

符号说明🔣

  • sqsubmit队列
  • cqcomplete队列
  • sqesq上某一节点
  • cqecq上某一节点

两个队列❓

内核中有两个环形队列,其中一个是submit queue一个是complete queue,简称他们为sqcq

  • sq:用于用户发起操作请求的队列,例如用户发起一个accept(这个异步acceptliburing实现)之后会将这个ACCEPT请求封装为一个节点,放进sq的队列中,之后通过调用一个submit函数来将sq队列中的节点放入内核中去处理
  • cq:用于内核处理完成后放入节点的位置,内核在异步处理完操作后会将节点放入cq队列中

注意:在上述描述中我用到将节点放进这个说法其实是不对的,这样说就好像有拷贝的动作,但是这整个过程中其实都没有拷贝的动作,sqcq都维护的指针,指向的是对应节点,只是什么时候他们该指向哪个节点,例如内核处理完成后cq就会指向完成节点

共享内存

io_uring底层也有共享内存的部分,sqcq中没有拷贝的动作,他们指向的都是一个内核与用户态共享的一块内存块

异步

通过队列,io_uringaccep、recv、send封装成了异步io

  • 例如accept,假设io_uring给出的接口是accept_prepare,调用他后直接返回,io_uringaccept请求放入sq中,内核取出处理完毕后的放进cq中,通过cq->res获取原本系统调用的返回值,通过一些附加信息获取原始sockfd

使用io_uring🍻

使用io_uring实现一个可供多个客户端连接的回声服务器

大概流程🎏

  • 初始化sqcq队列
  • accept操作注册进sq队列中
  • submitsq队列中的操作到内核去处理
  • cq中获取操作完成的操作们到用户态
  • 循环遍历判断状态来获取相应的返回值以及进行相应处理

**注意:**当状态通知到时 操作就已经是完成了的,我们只需要直接读结果就行,而不是像reactor那样事件通知然后执行相应的操作

使用io_uring

首先写一个没有acceptTCP服务器 hh伪代码

int sockfd = socket(AF_INET, SOCK_STREAM, 0); // io
bind -> Ip:0.0.0.0 Port:9999
listen

获取原始sockfd

什么是原始sockfd

  • 例如posixAPIaccpet,第一个参数是一个listenfd,其返回值是clientfd,其listenfd就是原始sockfd
  • 例如posixAPIrecv,第一个参数是clientfd,其返回值是实际读取到的字节,其clientfd就是原始sockfd
  • 为什么要专门说获取原始sockfd,因为如果不做任何附加信息,cq中取出来的节点其上面只有原始函数的返回值,无法获取原始sockfd,因而有一个场景,如果想要连接多个客户端,在第一次accpet状态触发后需要重新注册accept操作进sq队列,此时进凭cqe->res是无法操作的,我们需要利用cqe->user_data

epoll中用epoll_event来获取原始fd和注册的对应事件,而io_uring要获取原始fd,和设置操作状态 的话也是需要这样一个结构体,我们可以自己实现

enum {
  EVENT_ACCEPT = 0,
  EVENT_READ,
  EVENT_WRITE
};
typedef struct _conninfo {
  int connfd;
  int event;
} conninfo;
  • io_uring_sqe结构体中有一个64ull的成员user_data,所以我们设计一个conninfo的结构体来存储不同的操作状态和原始sockfd它后续是这样使用的
struct io_uring_sqe *sqe = get_sqe_from_ring();
io_uring_prep_accept(sqe, sockfd, addr, addrlen, flag);
conninfo info_accept = {
    .connfd = sockfd,
    .event = EVENT_ACCEPT,
};
// 将对应状态附给sqe
memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
  • cqe(完成队列某节点)中我们就可以通过其user_data字段得到对应的描述符,根据其状态来决定下一步操作,比如如果状态为EVENT_ACCEPT则说明有客户端连接,首先将返回的clientfd获取,然后先将listenfd的**accept操作注册进sq中(保证多个客户端可连接)然后再将clientfdrecv操作**注册进sq中(使服务器能够接收数据)
    注意:最后的处理是将他们的操作注册进sq中,在代码上的形式就是调用了io_uring中异步的acceptrecv,他们的返回值都是void,真正的返回值通过cq队列中节点的res字段获取
初始化sqcq队列🐝 <io_uring_queue_init_params>
#define ENTRIES_LENGTH    1024
struct io_uring_params params;
memset(&params, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

该函数执行后,sqcq队列被初始化

  • io_uring结构体中维护着sqcq队列
  • ENTRIES_LENGTH指定队列的长度
  • params被初始化为0值,表示属性全部使用默认
注册accept操作到sq队列🐝 io_uring_prep_accept

io_uring提供的异步accept只比accept4多了一个参数,也就是sq队列的地址

封装了一个函数

void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr,
                   socklen_t *addrlen, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    conninfo info_accept = {
        .connfd = sockfd,
        .event = EVENT_ACCEPT,
    };
    memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
  • 此函数主要是执行了异步的accept,并附加了状态信息以及描述符信息
  • 获取sq位置,
  • 调用异步api
  • 加入附加信息

还有两个操作被封装成了函数,注册read操作与注册write操作到sq队列

void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_send(sqe, sockfd, buf, len, flags);
  conninfo info_send = {
    .connfd = sockfd,
    .event = EVENT_WRITE,
  };
  memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  conninfo info_recv = {
    .connfd = sockfd,
    .event = EVENT_READ,
  };
  memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}

accept相似,只是内部调用api的不同与附加操作状态的不同

mainloop阶段➿ <while(1)>

下面的操作都包含在一个while(1)里面

提交sq上的操作到内核🚀<io_uring_submit>
io_uring_submit(&ring);
主程序等待cq中有节点⬅️<io_uring_wait_cqe>
struct io_uring_cqe *cqe_;
io_uring_wait_cqe(&ring, &cqe_);
探测到cq中有节点后取出cq中指定个数的节点⬅️<io_uring_peek_batch_cqe>
struct io_uring_cqe *cqes[10];
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
  • cqecount <= 第三个参数(这里是10)
循环遍历每个节点,根据操作状态来决定下一步⬅️<for(int i = 0; i < cqecount; i++)>

取出对应操作的原始sockfd与操作状态

for外面定义struct io_uring_cqe *cqe;供获取每个操作完成的节点

cqe = cqes[i];
// 取出里面的原始sockfd与操作状态
conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
  • 状态:ci.event,原始sockfdci.connfd
根据状态延伸出不同的操作
  • ci.event == EVENT_ACCEPT
int connfd = cqe->res;
set_accept_event(&ring, ci.connfd, (struct sockaddr*)&clientaddr, &clilen, 0);
set_recv_event(&ring, connfd, buffer, 1024, 0);
  • 重新注册accept是为了多个客户端可连接
    注册read是让服务器可以接受客户端发送数据
  • ci.event == EVENT_READ
if (cqe->res == 0) {
    close(ci.connfd);
} else {
    printf("recv --> %s, %d\n", buffer, cqe->res);
    set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
}
  • 通过cqe->res获取异步read操作的返回值,这里就能看出与reactor的区别,reactorread事件触发了才开始执行read操作,这里当read状态通知时是read操作已经调用完成了,接着就直接注册send
  • ci.event == EVENT_WRITE
set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
  • 能够执行到这里就说明send成功了,此时只需要再次设置recv客户端即可进行多次发送
推进I/O事件完成队列的指针✴️<io_uring_cq_advance>

for循环完成之后调用

io_uring_cq_advance(&ring, cqecount);
  • 这样下次调用io_uring_peek_batch_cqe获取cqe就不会出错了

完整代码

uring_io/uring_server.c at main · luopanforever/uring_io · GitHub

相关文章
|
9天前
|
算法 Java
并发垃圾回收算法对于大规模服务器应用的优势
并发垃圾回收算法对于大规模服务器应用的优势
|
13天前
|
数据采集 缓存 算法
使用Python打造爬虫程序之Python中的并发与异步IO:解锁高效数据处理之道
【4月更文挑战第19天】本文探讨了Python中的并发与异步IO,区分了并发(同时处理任务)与并行(同时执行任务)的概念。Python的多线程受限于GIL,适合IO密集型任务,而多进程适用于CPU密集型任务。异步IO通过非阻塞和回调/协程实现高效IO,Python的asyncio库提供了支持。应用场景包括Web开发和网络爬虫等。实践指南包括理解任务类型、使用asyncio、避免阻塞操作、合理设置并发度和优化性能。理解并运用这些技术能提升Python程序的效率和性能。
|
13天前
|
数据采集 Java Python
python并发编程:Python异步IO实现并发爬虫
python并发编程:Python异步IO实现并发爬虫
24 1
|
13天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
86 2
|
13天前
|
弹性计算
阿里云3M带宽云服务器并发多大?阿里云3M带宽云服务器测评参考
在探讨云服务器3M带宽能支持多大并发这一问题时,我们首先要明白一个关键点:并发量并非仅由带宽决定,还与网站本身的大小密切相关。一般来说,一个优化良好的普通网站页面大小可能只有几K,为便于计算,我们可以暂且假定每个页面大小为50K。
882 1
|
13天前
|
存储 弹性计算 云计算
9M带宽的阿里云服务器支持多少用户并发访问?阿里云9M带宽服务器测评
随着云计算技术的飞速进步与日益完善,云服务器已经逐渐成为了众多企业与个人的首选服务器类型。它以其出色的弹性扩展、高可用性以及灵活的管理方式,赢得了广大用户的青睐。那么,对于一款拥有9M带宽的云服务器来说,到了2024年,它究竟能够支持多少用户进行并发访问呢?这无疑是许多准备使用云服务的用户非常关心的问题。
153 0
|
13天前
|
弹性计算 缓存 测试技术
2核4g服务器能支持多少人访问?阿里云2核4G服务器并发数测试
2核4g服务器能支持多少人访问?阿里云2核4G服务器并发数测试,2核4G服务器并发数性能测试,阿小云账号下的2核4G服务器支持20人同时在线访问,然而应用不同、类型不同、程序效率不同实际并发数也不同,2核4G服务器的在线访问人数取决于多个变量因素
|
13天前
|
弹性计算 缓存 测试技术
云服务器2核4G能支持多少人同时访问?2核4G5M并发量评测!
阿里云2核4g服务器能支持多少人访问?2核4G服务器并发数性能测试,阿小云账号下的2核4G服务器支持20人同时在线访问,然而应用不同、类型不同、程序效率不同实际并发数也不同,2核4G服务器的在线访问人数取决于多个变量
|
13天前
|
弹性计算 大数据 测试技术
阿里云8核16G云服务器并发承载量多少?2024年阿里云8核16G云服务器测评
阿里云8核16G云服务器采用了高性能的处理器和大容量内存,具备强大的计算能力和内存带宽,可以满足多个应用程序的同时运行和访问需求。阿里云8核16G云服务器的并发承载量同样受到多种因素的影响,如服务器配置、网络环境、应用程序的架构和优化等。选择云服务器时,除了考虑服务器的性能表现,还需要考虑其他因素,如云服务提供商的服务质量、技术支持、价格等。因此,建议在购买前进行充分的调研和测试,选择最适合自己需求的云服务器。
|
13天前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
54 2

热门文章

最新文章