教你使用io_uring来写一个并发回声服务器

简介: 教你使用io_uring来写一个并发回声服务器

io_uring的使用

什么是io_uring🍻

是内核版本5.10之后的产物,也就是你的内核版本要在5.10之后才能使用,用户空间的环形队列

看见其名字就知道,带队列,能够起到异步解耦的作用,它可以与epoll的性能相提并论,但是却与epoll的工作原理完全不同,下面就让我们来学习它

安装一个liburing

git clone https://github.com/axboe/liburing.git
./configure
make
make install

工作原理🍻

符号说明🔣

  • sqsubmit队列
  • cqcomplete队列
  • sqesq上某一节点
  • cqecq上某一节点

两个队列❓

内核中有两个环形队列,其中一个是submit queue一个是complete queue,简称他们为sqcq

  • sq:用于用户发起操作请求的队列,例如用户发起一个accept(这个异步acceptliburing实现)之后会将这个ACCEPT请求封装为一个节点,放进sq的队列中,之后通过调用一个submit函数来将sq队列中的节点放入内核中去处理
  • cq:用于内核处理完成后放入节点的位置,内核在异步处理完操作后会将节点放入cq队列中

注意:在上述描述中我用到将节点放进这个说法其实是不对的,这样说就好像有拷贝的动作,但是这整个过程中其实都没有拷贝的动作,sqcq都维护的指针,指向的是对应节点,只是什么时候他们该指向哪个节点,例如内核处理完成后cq就会指向完成节点

共享内存

io_uring底层也有共享内存的部分,sqcq中没有拷贝的动作,他们指向的都是一个内核与用户态共享的一块内存块

异步

通过队列,io_uringaccep、recv、send封装成了异步io

  • 例如accept,假设io_uring给出的接口是accept_prepare,调用他后直接返回,io_uringaccept请求放入sq中,内核取出处理完毕后的放进cq中,通过cq->res获取原本系统调用的返回值,通过一些附加信息获取原始sockfd

使用io_uring🍻

使用io_uring实现一个可供多个客户端连接的回声服务器

大概流程🎏

  • 初始化sqcq队列
  • accept操作注册进sq队列中
  • submitsq队列中的操作到内核去处理
  • cq中获取操作完成的操作们到用户态
  • 循环遍历判断状态来获取相应的返回值以及进行相应处理

**注意:**当状态通知到时 操作就已经是完成了的,我们只需要直接读结果就行,而不是像reactor那样事件通知然后执行相应的操作

使用io_uring

首先写一个没有acceptTCP服务器 hh伪代码

int sockfd = socket(AF_INET, SOCK_STREAM, 0); // io
bind -> Ip:0.0.0.0 Port:9999
listen

获取原始sockfd

什么是原始sockfd

  • 例如posixAPIaccpet,第一个参数是一个listenfd,其返回值是clientfd,其listenfd就是原始sockfd
  • 例如posixAPIrecv,第一个参数是clientfd,其返回值是实际读取到的字节,其clientfd就是原始sockfd
  • 为什么要专门说获取原始sockfd,因为如果不做任何附加信息,cq中取出来的节点其上面只有原始函数的返回值,无法获取原始sockfd,因而有一个场景,如果想要连接多个客户端,在第一次accpet状态触发后需要重新注册accept操作进sq队列,此时进凭cqe->res是无法操作的,我们需要利用cqe->user_data

epoll中用epoll_event来获取原始fd和注册的对应事件,而io_uring要获取原始fd,和设置操作状态 的话也是需要这样一个结构体,我们可以自己实现

enum {
  EVENT_ACCEPT = 0,
  EVENT_READ,
  EVENT_WRITE
};
typedef struct _conninfo {
  int connfd;
  int event;
} conninfo;
  • io_uring_sqe结构体中有一个64ull的成员user_data,所以我们设计一个conninfo的结构体来存储不同的操作状态和原始sockfd它后续是这样使用的
struct io_uring_sqe *sqe = get_sqe_from_ring();
io_uring_prep_accept(sqe, sockfd, addr, addrlen, flag);
conninfo info_accept = {
    .connfd = sockfd,
    .event = EVENT_ACCEPT,
};
// 将对应状态附给sqe
memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
  • cqe(完成队列某节点)中我们就可以通过其user_data字段得到对应的描述符,根据其状态来决定下一步操作,比如如果状态为EVENT_ACCEPT则说明有客户端连接,首先将返回的clientfd获取,然后先将listenfd的**accept操作注册进sq中(保证多个客户端可连接)然后再将clientfdrecv操作**注册进sq中(使服务器能够接收数据)
    注意:最后的处理是将他们的操作注册进sq中,在代码上的形式就是调用了io_uring中异步的acceptrecv,他们的返回值都是void,真正的返回值通过cq队列中节点的res字段获取
初始化sqcq队列🐝 <io_uring_queue_init_params>
#define ENTRIES_LENGTH    1024
struct io_uring_params params;
memset(&params, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

该函数执行后,sqcq队列被初始化

  • io_uring结构体中维护着sqcq队列
  • ENTRIES_LENGTH指定队列的长度
  • params被初始化为0值,表示属性全部使用默认
注册accept操作到sq队列🐝 io_uring_prep_accept

io_uring提供的异步accept只比accept4多了一个参数,也就是sq队列的地址

封装了一个函数

void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr,
                   socklen_t *addrlen, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    conninfo info_accept = {
        .connfd = sockfd,
        .event = EVENT_ACCEPT,
    };
    memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
  • 此函数主要是执行了异步的accept,并附加了状态信息以及描述符信息
  • 获取sq位置,
  • 调用异步api
  • 加入附加信息

还有两个操作被封装成了函数,注册read操作与注册write操作到sq队列

void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_send(sqe, sockfd, buf, len, flags);
  conninfo info_send = {
    .connfd = sockfd,
    .event = EVENT_WRITE,
  };
  memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  conninfo info_recv = {
    .connfd = sockfd,
    .event = EVENT_READ,
  };
  memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}

accept相似,只是内部调用api的不同与附加操作状态的不同

mainloop阶段➿ <while(1)>

下面的操作都包含在一个while(1)里面

提交sq上的操作到内核🚀<io_uring_submit>
io_uring_submit(&ring);
主程序等待cq中有节点⬅️<io_uring_wait_cqe>
struct io_uring_cqe *cqe_;
io_uring_wait_cqe(&ring, &cqe_);
探测到cq中有节点后取出cq中指定个数的节点⬅️<io_uring_peek_batch_cqe>
struct io_uring_cqe *cqes[10];
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
  • cqecount <= 第三个参数(这里是10)
循环遍历每个节点,根据操作状态来决定下一步⬅️<for(int i = 0; i < cqecount; i++)>

取出对应操作的原始sockfd与操作状态

for外面定义struct io_uring_cqe *cqe;供获取每个操作完成的节点

cqe = cqes[i];
// 取出里面的原始sockfd与操作状态
conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
  • 状态:ci.event,原始sockfdci.connfd
根据状态延伸出不同的操作
  • ci.event == EVENT_ACCEPT
int connfd = cqe->res;
set_accept_event(&ring, ci.connfd, (struct sockaddr*)&clientaddr, &clilen, 0);
set_recv_event(&ring, connfd, buffer, 1024, 0);
  • 重新注册accept是为了多个客户端可连接
    注册read是让服务器可以接受客户端发送数据
  • ci.event == EVENT_READ
if (cqe->res == 0) {
    close(ci.connfd);
} else {
    printf("recv --> %s, %d\n", buffer, cqe->res);
    set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
}
  • 通过cqe->res获取异步read操作的返回值,这里就能看出与reactor的区别,reactorread事件触发了才开始执行read操作,这里当read状态通知时是read操作已经调用完成了,接着就直接注册send
  • ci.event == EVENT_WRITE
set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
  • 能够执行到这里就说明send成功了,此时只需要再次设置recv客户端即可进行多次发送
推进I/O事件完成队列的指针✴️<io_uring_cq_advance>

for循环完成之后调用

io_uring_cq_advance(&ring, cqecount);
  • 这样下次调用io_uring_peek_batch_cqe获取cqe就不会出错了

完整代码

uring_io/uring_server.c at main · luopanforever/uring_io · GitHub

相关文章
|
3月前
|
数据采集 Java Python
python并发编程:Python异步IO实现并发爬虫
python并发编程:Python异步IO实现并发爬虫
53 1
|
5天前
|
开发框架 缓存 .NET
并发请求太多,服务器崩溃了?试试使用 ASP.NET Core Web API 操作筛选器对请求进行限流
并发请求太多,服务器崩溃了?试试使用 ASP.NET Core Web API 操作筛选器对请求进行限流
|
3月前
|
弹性计算
阿里云3M带宽云服务器并发多大?阿里云3M带宽云服务器测评参考
在探讨云服务器3M带宽能支持多大并发这一问题时,我们首先要明白一个关键点:并发量并非仅由带宽决定,还与网站本身的大小密切相关。一般来说,一个优化良好的普通网站页面大小可能只有几K,为便于计算,我们可以暂且假定每个页面大小为50K。
1108 1
|
2月前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
51 0
|
3月前
|
算法 Java
并发垃圾回收算法对于大规模服务器应用的优势
并发垃圾回收算法对于大规模服务器应用的优势
|
1月前
|
并行计算 监控 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
【7月更文挑战第16天】Python并发异步提升性能:使用`asyncio`处理IO密集型任务,如网络请求,借助事件循环实现非阻塞;`multiprocessing`模块用于CPU密集型任务,绕过GIL进行并行计算。通过任务类型识别、任务分割、避免共享状态、利用现代库和性能调优,实现高效编程。示例代码展示异步HTTP请求和多进程数据处理。
36 8
|
1月前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
【7月更文挑战第17天】在数据驱动时代,Python凭借其优雅语法和强大库支持成为并发处理大规模数据的首选。并发与异步编程是关键,包括多线程、多进程和异步IO。对于IO密集型任务,如网络请求,可使用`concurrent.futures`和`asyncio`;CPU密集型任务则推荐多进程,如`multiprocessing`;`asyncio`适用于混合任务,实现等待IO时执行CPU任务。通过这些工具,开发者能有效优化资源,提升系统性能。
52 4
|
1月前
|
分布式计算 并行计算 Java
Python并发风暴来袭!IO密集型与CPU密集型任务并发策略大比拼,你站哪队?
【7月更文挑战第17天】Python并发处理IO密集型(如网络请求)与CPU密集型(如数学计算)任务。IO密集型适合多线程和异步IO,如`ThreadPoolExecutor`进行网页下载;CPU密集型推荐多进程,如`multiprocessing`模块进行并行计算。选择取决于任务类型,理解任务特性是关键,以实现最佳效率。
34 4
|
1月前
|
开发框架 并行计算 .NET
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
【7月更文挑战第18天】在Python中,异步编程(如`asyncio`)适合处理IO密集型任务,通过非阻塞操作提高响应性,例如使用`aiohttp`进行异步HTTP请求。而对于CPU密集型任务,由于GIL的存在,多进程(`multiprocessing`)能实现并行计算,如使用进程池进行大量计算。明智选择并发模型是性能优化的关键,体现了对任务特性和编程哲学的深刻理解。
22 2
|
1月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
【7月更文挑战第18天】Python并发编程中,异步IO适合IO密集型任务,如异步HTTP请求,利用`asyncio`和`aiohttp`实现并发抓取,避免等待延迟。而对于CPU密集型任务,如并行计算斐波那契数列,多进程通过`multiprocessing`库能绕过GIL限制实现并行计算。选择正确的并发模型能显著提升性能。
46 2