io_uring作为2019年的后起之秀,为linux异步网络编程新增一把倚天大剑,让我们简单学习一下!
数据结构:
a. sq (submition queue):提交队列,一个存放待执行事件的环形队列
b. cq (completion queue): 完成队列,存放已经完成的事件的环形队列
注:这两个队列是用户态和内核态之间共享的内存,使用mmap实现
c. sqe :sq中的一项
struct io_uring_sqe { // sq中的一项
… __u64 user_data; // 8 btyes } d. cqe:cq中的一项 struct io_uring_cqe { // cq中的一项 … __u64 user_data; // 8 btyes res; // 任务函数的返回值 eg. recv返回收到数据的长度、accept返回clientfd }
io_uring基本工作流程如下:(参照上图)
1.初始化ring,其中包含了sq和cq
2.向提交队列sq里注册(提交)一个事件(任务)sqe
3.将sq中的若干事件(任务)提交给内核处理(submit)
4.内核将处理成功的事件cqe放到完成队列cq里
5.用户取出cq里的完成事件cqe
需要注意的是:
注 : sq里注册的任务sqe一旦交给内核处理,就从sq里被移除了,是一次性的,和epoll不同
注 : cq里的 completion 事件cqe,不主动清除会一直存在
注 : cq队列里都是执行成功的任务,没有执行失败的任务,不用判断recv()返回值 < 0的情况
接下来进行接口分析
io_uring 的三个系统调用在 liburing 中的封装:
- io_uring_setup (初始化ring)被封装为 io_uring_queue_init_params(entries, &ring, ¶ms);
- io_uring_register(从sqe向sq里注册一个任务sqe)被封装为 io_uring_prep_recv,io_uring_prep_accept等
- io_uring_enter(将sq里的任务提交给内核处理)被封装为 io_uring_submit(&ring)
初始化:
struct io_uring_params params; // 用来初始化 ring memset(¶ms, 0, sizeof(params)); // 初始化 params struct io_uring ring; // sq and cq 两个环形队列 // 下面使用了系统调用io_uring_setup : 初始化ring io_uring_queue_init_params(1024, &ring, ¶ms); // 初始化两个环形队列:submition queue 和 completion queue
注册(添加)任务(事件):
struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe struct user_data user_data = { // sqe->user_data .fd = sockfd, .event = EVENT_ACCEPT, }; io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); // 向sq里加入一个任务sqe memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性
提交事件给内核处理:
// 将sq里的任务提交给内核处理 io_uring_submit(&ring);
获取完成的事件:
struct io_uring_cqe *cqe; // 指向cqe(cq中的一项)的指针 // 等待sq里的任务完成,并返回一个结果 io_uring_wait_cqe(&ring, &cqe); // 这个函数会阻塞当前线程,直到至少一个 cqe 可用为止。 // 一旦有一个 cqe 可用,它就会将其存储在提供的指针cqe中,并返回 // 从完成队列cq获取若干内核处理完成的结果,到cqes数组里,不会阻塞 struct io_uring_cqe *cqes[128]; int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // 和 epoll_wait() 类似 /* io_uring_wait_cqe 和 io_uring_peek_batch_cqe 一个是阻塞获取一个结果,一个是非阻塞获取多个结果 */
移除cq中的完成事件:
// 处理完结果后,将完成的任务从cq中移除 io_uring_cq_advance(&ring, nready);
最后附上一个io_uring实现的tcpserver,注释很详细
#include <stdio.h> #include <liburing.h> #include <netinet/in.h> #include <string.h> #include <unistd.h> /* * io_uring 的三个系统调用在 liburing 中的封装: * * io_uring_setup (初始化ring) -->被封装为 io_uring_queue_init_params(entries, &ring, ¶ms); * * io_uring_register(从sqe向sq里注册一个任务sqe) -->被封装为 io_uring_prep_recv,io_uring_prep_send,io_uring_prep_accept * * io_uring_enter(将sq里的任务提交给内核处理)-->被封装为 io_uring_submit(&ring) * * * * 注 : sq里注册的任务sqe一旦交给内核处理,就从sq里被移除了,是一次性的,和epoll不同 * 注 : cq里的 completion 事件cqe,不主动清除会一直存在 * * 注 : ##cq队列里都是执行成功的任务##,没有执行失败的任务,不用判断recv()返回值 < 0的情况 * * 注 : sq和cq都是环形队列,是用户态和内核态的共享内存空间,用mmap实现 (submition queue & completion queue ) * * 注 : sqe(submition queue entry), cqe(completion queue entry) --> 这两个分别代表sq、cq中的一项 * */ // 自定义事件类型 event #define EVENT_ACCEPT 0 #define EVENT_READ 1 #define EVENT_WRITE 2 /* * struct io_uring_cqe { // cq中的一项 * ... * * __u64 user_data; // 8 btyes * * res; // 任务函数的返回值 eg. recv返回收到数据的长度、accept返回clientfd * * } * * struct io_uring_sqe { // sq中的一项 * ... * * __u64 user_data; // 8 btyes * } * */ // 自定义 user_data 保证 8 btyes struct user_data { // sqe & cqe --> user_data int fd; int event; }; int init_server(unsigned short port) { // create a listener sockfd int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); return sockfd; } #define ENTRIES_LENGTH 1024 #define BUFFER_LENGTH 1024 int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) { // 向sqe中添加一个recv任务 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); //获取sq中下一个可用的位置sqe struct user_data user_data = { .fd = sockfd, .event = EVENT_READ, }; io_uring_prep_recv(sqe, sockfd, buf, len, flags); // 向sq里加入一个recv任务 memcpy(&sqe->use_data, &user_data, sizeof(struct user_data)); // 设置recv任务的属性 } int set_event_send(struct io_uring * ring, int sockfd, void *buf, size_t len, int flags) { // 向sq里添加一个send任务 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe struct user_data user_data = { .fd = sockfd, .event = EVENT_WRITE, }; io_uring_prep_send(sqe, sockfd, buf, len, flags); // 向sq里加入一个任务sqe memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性 } int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe struct user_data user_data = { // sqe->user_data .fd = sockfd, .event = EVENT_ACCEPT, }; io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); // 向sq里加入一个任务sqe memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性 } int main(int argc, char *argv[]) { // 服务端监听套接字 unsigned short port = 9999; int sockfd = init_server(port); // 客户端套接字地址 struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); struct io_uring_params params; // 用来初始化 ring memset(¶ms, 0, sizeof(params)); // 初始化 params struct io_uring ring; // sq and cq 两个环形队列 // 下面使用了系统调用io_uring_setup : 初始化ring io_uring_queue_init_params(1024, &ring, ¶ms); // 初始化两个环形队列:submition queue 和 completion queue // 向提交队列sq里添加一个accept任务 set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); char buffer[BUFFER_LENGTH] = {0}; while (1) { // 将sq里的任务提交给内核处理 io_uring_submit(&ring); // 存储内核处理完成的结果在 cq(completion queue)里 struct io_uring_cqe *cqe; // 指向cqe(cq中的一项)的指针 // 等待sq里的任务完成,并返回一个结果 io_uring_wait_cqe(&ring, &cqe); // 这个函数会阻塞当前线程,直到至少一个 cqe 可用为止。 // 一旦有一个 cqe 可用,它就会将其存储在提供的指针cqe中,并返回 // 从完成队列cq获取若干内核处理完成的结果,到cqes数组里,不会阻塞 struct io_uring_cqe *cqes[128]; int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // 和 epoll_wait() 类似 /* io_uring_wait_cqe 和 io_uring_peek_batch_cqe 一个是阻塞获取一个结果,一个是非阻塞获取多个结果 */ // 查看并处理结果 int i = 0; for (i = 0; i < nready; i++) { struct io_uring_cqe *entries = cqes[i]; // 已经完成的事件cqe struct user_data result; // 获取完成事件的 user_data memcpy(&result, &entries->user_data, sizeof(struct user_data)); if (result.event == EVENT_ACCEPT) { // accept任务完成 // 重新在sq注册一个accept任务 set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); int connfd = entries->res;// accept任务的执行结果:clientfd // 在sq注册一个任务:接收客户端数据 set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); } else if (result.event == EVENT_READ) { // read任务完成 int ret = entries->res; // read的返回值 if (ret == 0) { // 对方发送了fin包通知断开连接 close(result.fd); } else if (ret > 0) { // 在sq注册一个任务:向客户端send数据 set_event_send(&ring, result.fd, buffer, ret, 0); } else if (result.event == EVENT_WRITE) { // write任务完成 int ret = entries->res; // write的返回值 // 在sq注册一个任务:接收客户端数据 set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); } } } // 处理完结果后,将完成的任务从cq中移除 io_uring_cq_advance(&ring, nready); } }