Linux异步io机制 io_uring

简介: Linux异步io机制 io_uring

io_uring作为2019年的后起之秀,为linux异步网络编程新增一把倚天大剑,让我们简单学习一下!

数据结构:

a. sq (submition queue):提交队列,一个存放待执行事件的环形队列

b. cq (completion queue): 完成队列,存放已经完成的事件的环形队列

:这两个队列是用户态和内核态之间共享的内存,使用mmap实现

c. sqe :sq中的一项

struct io_uring_sqe { // sq中的一项

__u64 user_data; // 8 btyes
}
d. cqe:cq中的一项
struct io_uring_cqe { // cq中的一项
__u64 user_data; // 8 btyes
res; // 任务函数的返回值 eg. recv返回收到数据的长度、accept返回clientfd
}

io_uring基本工作流程如下:(参照上图)

1.初始化ring,其中包含了sq和cq

2.向提交队列sq里注册(提交)一个事件(任务)sqe

3.将sq中的若干事件(任务)提交给内核处理(submit)

4.内核将处理成功的事件cqe放到完成队列cq里

5.用户取出cq里的完成事件cqe

需要注意的是:

注 : sq里注册的任务sqe一旦交给内核处理,就从sq里被移除了,是一次性的,和epoll不同

注 : cq里的 completion 事件cqe,不主动清除会一直存在

注 : cq队列里都是执行成功的任务,没有执行失败的任务,不用判断recv()返回值 < 0的情况

接下来进行接口分析

io_uring 的三个系统调用liburing 中的封装

  • io_uring_setup (初始化ring)被封装为 io_uring_queue_init_params(entries, &ring, &params);
  • io_uring_register(从sqe向sq里注册一个任务sqe)被封装为 io_uring_prep_recv,io_uring_prep_accept等
  • io_uring_enter(将sq里的任务提交给内核处理)被封装为 io_uring_submit(&ring)

初始化:

struct io_uring_params params; // 用来初始化 ring
memset(&params, 0, sizeof(params)); // 初始化 params
struct io_uring ring; // sq and cq 两个环形队列
// 下面使用了系统调用io_uring_setup : 初始化ring
io_uring_queue_init_params(1024, &ring, &params); // 初始化两个环形队列:submition queue 和 completion queue

注册(添加)任务(事件):

struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe
  struct user_data user_data = {  // sqe->user_data
    .fd = sockfd,
    .event = EVENT_ACCEPT,
  };
  
  io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); // 向sq里加入一个任务sqe
  memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性

提交事件给内核处理:

// 将sq里的任务提交给内核处理
        io_uring_submit(&ring);

获取完成的事件:

struct io_uring_cqe *cqe; // 指向cqe(cq中的一项)的指针
        // 等待sq里的任务完成,并返回一个结果
        io_uring_wait_cqe(&ring, &cqe); // 这个函数会阻塞当前线程,直到至少一个 cqe 可用为止。
                                        // 一旦有一个 cqe 可用,它就会将其存储在提供的指针cqe中,并返回
        
        // 从完成队列cq获取若干内核处理完成的结果,到cqes数组里,不会阻塞
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);  // 和 epoll_wait() 类似
/*
  io_uring_wait_cqe 和 io_uring_peek_batch_cqe 一个是阻塞获取一个结果,一个是非阻塞获取多个结果
*/

移除cq中的完成事件:

// 处理完结果后,将完成的任务从cq中移除
        io_uring_cq_advance(&ring, nready);

最后附上一个io_uring实现的tcpserver,注释很详细

#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
/* 
 * io_uring 的三个系统调用在 liburing 中的封装:
 *
 * io_uring_setup (初始化ring) -->被封装为 io_uring_queue_init_params(entries, &ring, &params); 
 *
 * io_uring_register(从sqe向sq里注册一个任务sqe) -->被封装为 io_uring_prep_recv,io_uring_prep_send,io_uring_prep_accept
 * 
 * io_uring_enter(将sq里的任务提交给内核处理)-->被封装为 io_uring_submit(&ring)
 * 
 * 
 * 
 * 注 : sq里注册的任务sqe一旦交给内核处理,就从sq里被移除了,是一次性的,和epoll不同
 * 注 : cq里的 completion 事件cqe,不主动清除会一直存在
 * 
 * 注 : ##cq队列里都是执行成功的任务##,没有执行失败的任务,不用判断recv()返回值 < 0的情况
 * 
 * 注 : sq和cq都是环形队列,是用户态和内核态的共享内存空间,用mmap实现 (submition queue & completion queue )
 * 
 * 注 : sqe(submition queue entry), cqe(completion queue entry) --> 这两个分别代表sq、cq中的一项
 * 
 */
// 自定义事件类型 event
#define EVENT_ACCEPT    0
#define EVENT_READ    1
#define EVENT_WRITE   2
/*
 * struct io_uring_cqe { // cq中的一项
 *     ...
 *  
 *     __u64 user_data; // 8 btyes
 * 
 *     res;  // 任务函数的返回值   eg. recv返回收到数据的长度、accept返回clientfd
 *    
 * }
 * 
 * struct io_uring_sqe { // sq中的一项
 *     ...
 * 
 *     __u64 user_data; // 8 btyes
 * }
 * 
 */
// 自定义 user_data 保证 8 btyes
struct user_data {   // sqe & cqe --> user_data
  int fd;
  int event;
};
int init_server(unsigned short port) {    // create a listener sockfd
 
  int sockfd = socket(AF_INET, SOCK_STREAM, 0); 
  struct sockaddr_in serveraddr;  
  memset(&serveraddr, 0, sizeof(struct sockaddr_in)); 
  serveraddr.sin_family = AF_INET;  
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); 
  serveraddr.sin_port = htons(port);  
  if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {   
    perror("bind");   
    return -1;  
  } 
  listen(sockfd, 10);
  
  return sockfd;
}
#define ENTRIES_LENGTH    1024
#define BUFFER_LENGTH   1024
int set_event_recv(struct io_uring *ring, int sockfd,
                    void *buf, size_t len, int flags) { // 向sqe中添加一个recv任务
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring); //获取sq中下一个可用的位置sqe
    struct user_data user_data = {
        .fd = sockfd,
        .event = EVENT_READ,
    };
    io_uring_prep_recv(sqe, sockfd, buf, len, flags); // 向sq里加入一个recv任务
    memcpy(&sqe->use_data, &user_data, sizeof(struct user_data)); // 设置recv任务的属性
}
int set_event_send(struct io_uring * ring, int sockfd,
                    void *buf, size_t len, int flags) { // 向sq里添加一个send任务
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe
    struct user_data user_data = {
        .fd = sockfd,
        .event = EVENT_WRITE,
    };
    io_uring_prep_send(sqe, sockfd, buf, len, flags); // 向sq里加入一个任务sqe
    memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性
}   
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
          socklen_t *addrlen, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring); // 获取sq中下一个可用的位置sqe
  struct user_data user_data = {  // sqe->user_data
    .fd = sockfd,
    .event = EVENT_ACCEPT,
  };
  
  io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags); // 向sq里加入一个任务sqe
  memcpy(&sqe->user_data, &user_data, sizeof(struct user_data)); // 设置任务sqe的属性
}
int main(int argc, char *argv[]) {
    // 服务端监听套接字
    unsigned short port = 9999;
  int sockfd = init_server(port);
    // 客户端套接字地址
    struct sockaddr_in clientaddr;  
  socklen_t len = sizeof(clientaddr);
    struct io_uring_params params; // 用来初始化 ring
    memset(&params, 0, sizeof(params)); // 初始化 params
    struct io_uring ring; // sq and cq 两个环形队列
    // 下面使用了系统调用io_uring_setup : 初始化ring
    io_uring_queue_init_params(1024, &ring, &params); // 初始化两个环形队列:submition queue 和 completion queue
    
    // 向提交队列sq里添加一个accept任务
    set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
    char buffer[BUFFER_LENGTH] = {0};
    while (1) {
        // 将sq里的任务提交给内核处理
        io_uring_submit(&ring);
        // 存储内核处理完成的结果在 cq(completion queue)里
        struct io_uring_cqe *cqe; // 指向cqe(cq中的一项)的指针
        // 等待sq里的任务完成,并返回一个结果
        io_uring_wait_cqe(&ring, &cqe); // 这个函数会阻塞当前线程,直到至少一个 cqe 可用为止。
                                        // 一旦有一个 cqe 可用,它就会将其存储在提供的指针cqe中,并返回
        
        // 从完成队列cq获取若干内核处理完成的结果,到cqes数组里,不会阻塞
        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);  // 和 epoll_wait() 类似
/*
  io_uring_wait_cqe 和 io_uring_peek_batch_cqe 一个是阻塞获取一个结果,一个是非阻塞获取多个结果
*/
        // 查看并处理结果
        int i = 0;
        for (i = 0; i < nready; i++) {
            struct io_uring_cqe *entries = cqes[i]; // 已经完成的事件cqe
            struct user_data result;
            // 获取完成事件的 user_data
            memcpy(&result, &entries->user_data, sizeof(struct user_data));
            if (result.event == EVENT_ACCEPT) { // accept任务完成
                // 重新在sq注册一个accept任务
                set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
                int connfd = entries->res;// accept任务的执行结果:clientfd
                // 在sq注册一个任务:接收客户端数据
                set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
            } else if (result.event == EVENT_READ) { // read任务完成
                int ret = entries->res; // read的返回值
                if (ret == 0) { // 对方发送了fin包通知断开连接
                    close(result.fd);
                } else if (ret > 0) {
                    //  在sq注册一个任务:向客户端send数据
                    set_event_send(&ring, result.fd, buffer, ret, 0);
                } else if (result.event == EVENT_WRITE) { // write任务完成
                    int ret = entries->res; // write的返回值
                    // 在sq注册一个任务:接收客户端数据
                    set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
                }
            }
        }
        // 处理完结果后,将完成的任务从cq中移除
        io_uring_cq_advance(&ring, nready);
    }
}


目录
相关文章
|
2月前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
99 0
|
1月前
|
缓存 Linux 开发者
Linux内核中的并发控制机制
本文深入探讨了Linux操作系统中用于管理多线程和进程的并发控制的关键技术,包括原子操作、锁机制、自旋锁、互斥量以及信号量。通过详细分析这些技术的原理和应用,旨在为读者提供一个关于如何有效利用Linux内核提供的并发控制工具以优化系统性能和稳定性的综合视角。
|
2月前
|
存储 Linux C语言
Linux C/C++之IO多路复用(aio)
这篇文章介绍了Linux中IO多路复用技术epoll和异步IO技术aio的区别、执行过程、编程模型以及具体的编程实现方式。
115 1
Linux C/C++之IO多路复用(aio)
|
2月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
47 2
|
3月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
55 4
|
13天前
|
存储 编译器 Linux
动态链接的魔法:Linux下动态链接库机制探讨
本文将深入探讨Linux系统中的动态链接库机制,这其中包括但不限于全局符号介入、延迟绑定以及地址无关代码等内容。
187 19
|
4月前
|
缓存 安全 Linux
Linux 五种IO模型
Linux 五种IO模型
|
21天前
|
监控 算法 Linux
Linux内核锁机制深度剖析与实践优化####
本文作为一篇技术性文章,深入探讨了Linux操作系统内核中锁机制的工作原理、类型及其在并发控制中的应用,旨在为开发者提供关于如何有效利用这些工具来提升系统性能和稳定性的见解。不同于常规摘要的概述性质,本文将直接通过具体案例分析,展示在不同场景下选择合适的锁策略对于解决竞争条件、死锁问题的重要性,以及如何根据实际需求调整锁的粒度以达到最佳效果,为读者呈现一份实用性强的实践指南。 ####
|
25天前
|
消息中间件 安全 Linux
深入探索Linux操作系统的内核机制
本文旨在为读者提供一个关于Linux操作系统内核机制的全面解析。通过探讨Linux内核的设计哲学、核心组件、以及其如何高效地管理硬件资源和系统操作,本文揭示了Linux之所以成为众多开发者和组织首选操作系统的原因。不同于常规摘要,此处我们不涉及具体代码或技术细节,而是从宏观的角度审视Linux内核的架构和功能,为对Linux感兴趣的读者提供一个高层次的理解框架。
|
1月前
|
算法 Linux 开发者
Linux内核中的锁机制:保障并发控制的艺术####
本文深入探讨了Linux操作系统内核中实现的多种锁机制,包括自旋锁、互斥锁、读写锁等,旨在揭示这些同步原语如何高效地解决资源竞争问题,保证系统的稳定性和性能。通过分析不同锁机制的工作原理及应用场景,本文为开发者提供了在高并发环境下进行有效并发控制的实用指南。 ####