2.10 高性能异步IO机制:io_uring

简介: 2.10 高性能异步IO机制:io_uring

一、io_uring的引入

为了方便说明io_uring的作用,先举一个通俗点的例子


1、通过异步提高读写的效率

假设有一批数量很大的货,需要分批次运到厂里处理。这个时候就有两种方式:

1)同步方式:运送一批到厂里,等厂里反馈OK了,再回来送下一批;

2)异步方式:送到厂里之后,不用等厂里反馈,直接再送下一批货。


哪一种方式比较好呢?当然如果货的数量不多,同步方式比较可靠,效率差别也不大。但如果面对数量很大,比如读取一个很大的数据包,那异步方式效率就会显著提高。


2、减少系统开销

另外,正常送货到工厂,需要开门进入,卸完货之后再出来。这样频繁的进出消耗很大。因此,可以在工厂门口开辟一块共享仓库,货车卸货进仓库,工厂从仓库里取货。


对应的,IO操作需要进行系统调用。而在调用系统调用时,会从用户态切换到内核态,进行上下文切换。在高 IOPS(Input/Output Per Second)的情况下,进行上下文切换会消耗大量的CPU时间。

io_uring为了减少系统调用带来的上下文切换,采用了用户态和内核态共享内存的方式。用户态对共享内存进行读写操作是不需要使用系统调用的,所以不会发生上下文切换的情况。


二、io_uring

1、内核接口

linux内核(5.10版本之后)为io_uring提供了三个接口

1)io_uring_setup:该函数用于初始化和配置 io_uring环境。该函数将返回一个文件描述符,称为 io_uring文件描述符,用于后续的 I/O 操作。

int io_uring_setup(unsigned entries, struct io_uring_params *p);
  • entries:指定io_uring 的入口数目,即同时处理的 I/O 事件数目。
  • p:指向 struct io_uring_params 结构的指针,用于传递其他配置参数。


2)io_uring_enter:该函数用于提交 I/O 事件并等待其完成。该函数将返回已完成的 I/O 事件数量。

int io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, unsigned flags, sigset_t *sig);
  • fd:io_uring文件描述符。
  • to_submit:要提交的 I/O 事件数量。
  • min_complete:指定在返回之前至少完成的 I/O 事件数量。
  • flags:用于指定操作行为的标志。
  • sig:用于传递信号集,以便在等待期间阻塞特定的信号。


3)io_uring_register:该函数用于将文件描述符或内存区域与io_uring关联起来。该函数将返回注册的文件描述符或内存区域的索引,以便后续的 I/O 操作可以使用。

int io_uring_register(int fd, unsigned int opcode, const void *arg, unsigned int nr_args);
  • fd:io_uring文件描述符。
  • opcode:指定注册操作的类型,如文件描述符的注册或内存区域的注册。
  • arg:指向相关数据结构的指针,用于传递需要注册的文件描述符或内存区域的信息。
  • nr_args:指定相关参数的数量。


2、提交队列SQ和完成队列CQ

1)提交队列(Submission Queues,SQ):环形队列,存放即将执行I/O操作的数据;

2)完成队列(Completion Queue, CQ):环形队列,存放I/O操作完成返回后的结果;

对于提交队列,内核会将io_sq_ring结构映射到应用程序的内存空间,这样应用程序可以直接向io_sq_ring结构的环形队列提交I/O操作,内核可以通过io_sq_ring结构的环形队列读取I/O操作。这样不用通过系统调用,避免了上下文切换。

同样对于完成队列也是如此。


2.1 io_sq_ringio_cq_ring

io_sq_ring结构用于向内核发起 I/O 操作请求,即将需要执行的 I/O 操作请求提交到队列中。

io_sq_ring 中的array指向一个环形队列的SQE数组,该数组用于存储应用程序提交的I/O请求信息。每个SQE对应一个io_uring_sqe结构体,代表一个具体的 I/O 操作请求,其中包含了执行该操作所需的各种参数和信息。

同样对于io_cq_ring结构也是如此。需要特别注意的是结构体中res 的含义会有所不同:

对于读取操作(如 read()),res 表示成功读取的字节数。

对于写入操作(如 write()),res 表示成功写入的字节数。

对于 accept 操作(如 accept()),res 表示接受到的新连接的文件描述符(即,已连接套接字)。


3、io_uring的流程

io_uring的操作流程如下:

1)应用程序向 提交队列io_sq_ring提交I/O操作

2)SQ内核线程从 提交队列 中读取 I/O 操作。

3)SQ内核线程发起 I/O 请求。

4)I/O 请求完成后,SQ内核线程会将 I/O 请求的结果写入到 io_uring 的 完成队列io_cq_ring中。

5)应用程序可以从 完成队列io_cq_ring 中读取到 I/O 操作的结果。


三、liburing库

liburing是已经封装好io_uring的库


1、安装liburing


1)下载源码

sudo apt-get install git
git clone git://git.kernel.dk/liburing

2)进入liburing

cd liburing/

3)配置

./configure

4)编译和安装

make && sudo make install

5)配置pkg-config文件,以便其他程序能够使用已安装的liburing库

echo "/usr/local/lib/pkgconfig" | sudo tee /etc/ld.so.conf.d/usr-local-lib.conf >/dev/null
sudo ldconfig -v

6)编译应用程序

gcc -o uring uring.c -luring -static

2、接口

2.1 io_uring_queue_init_params()

执行io_uring_setup()系统调用来初始化io_uring队列。

int io_uring_queue_init_params(unsigned entries, struct io_uring *ring, const struct io_uring_params *p);

1)entries:指定 I/O uring 的入口数目,即同时处理的 I/O 事件数目。

2)ring:指向 struct io_uring 结构的指针,用于接收初始化后的 I/O uring 环境。

3)p:指向 struct io_uring_params 结构的指针,包含了自定义的初始化参数。


2.2 io_uring_get_sqe()

用于从 I/O uring 环境的 submission queue (SQ) 中获取一个 Submission Queue Entry (SQE)。通过获取 SQE,你可以将 I/O 操作请求添加到 I/O uring 中,并提交给内核进行处理。

struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);

1)ring:指向 struct io_uring 结构的指针,表示要操作的 I/O uring 环境。

该函数返回一个指向 struct io_uring_sqe 结构的指针,该结构表示一个 Submission Queue Entry (SQE)。SQE 包含了要执行的 I/O 操作的详细信息


2.3 io_uring_prep_accept()

用于准备执行 accept 操作的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_accept 函数,可以设置要接受连接的套接字描述符、新连接的文件描述符和连接地址等参数。

void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要接受连接的套接字描述符。

3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。

4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。

5)flags:一个整数,用于设置一些额外的标志。


2.4 io_uring_prep_recv()

用于准备执行接收数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_recv 函数,你可以设置要接收数据的套接字描述符以及数据缓冲区等参数。

void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要接受连接的套接字描述符。

3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。

4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。

5)flags:一个整数,用于设置一些额外的标志。


2.5 io_uring_prep_send()

用于准备执行发送数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_send 函数,你可以设置要发送数据的套接字描述符以及数据缓冲区等参数。

void io_uring_prep_send(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned int len, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要发送数据的套接字描述符。

3)buf:一个指向数据缓冲区的指针,包含要发送的数据。

4)len:一个无符号整数,表示数据缓冲区中要发送的数据长度。

5)flags:一个整数,用于设置一些额外的标志。


2.6 io_uring_submit()

用于提交 SQE (Submission Queue Entry) 到 I/O uring 环境中进行处理。

int io_uring_submit(struct io_uring *ring);

1)ring:一个指向 struct io_uring 结构的指针,表示要提交的 I/O uring 环境。

2)io_uring_submit 函数将等待队列中的 SQEs 进行提交,并触发 I/O uring 环境的处理。


函数返回已提交的 SQE 数量,或者在出现错误时返回负数


2.7 io_uring_wait_cqe()

用于等待完成队列项(Completion Queue Entry,CQE)的到来。

int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

1)ring:指向 struct io_uring 的指针,表示I/O uring环境。

2)cqe_ptr:指向CQE指针的指针,用于存储返回的已完成CQE。


io_uring_wait_cqe() 函数会阻塞当前线程,直到有已完成的CQE可用。一旦有CQE可用,函数将填充 cqe_ptr 指向的指针,并返回0。


2.8 io_uring_peek_batch_cqe()

用于批量获取已完成的CQE(Completion Queue Entry)而无需等待.

int io_uring_peek_batch_cqe(struct io_uring *ring, struct io_uring_cqe **cqes, unsigned int count);

1)ring:指向 struct io_uring 的指针,表示I/O uring环境。

2)cqes:一个指向指针数组的指针,用于存储返回的已完成CQE。

3)count:要获取的CQE数量。


io_uring_peek_batch_cqe() 函数会尝试从I/O uring环境中立即获取指定数量的已完成CQE,并将它们存储到 cqes 指向的指针数组中。如果成功获取了指定数量的CQE,函数将返回成功接收到并处理完毕的I/O事件数量,否则将返回负值。


2.9 io_uring_cq_advance()

用于标记完成队列(Completion Queue,CQ)上已经处理的CQE数量。当一个或多个CQE被处理后,需要调用io_uring_cq_advance()函数来更新下一次读取CQE时应该从哪个位置开始。

void io_uring_cq_advance(struct io_uring *ring, unsigned int steps);

1)ring:指向 struct io_uring 的指针,表示 I/O uring 环境。

2)steps:要向前推进的步数,即要消耗的 CQE 数量。


io_uring_cq_advance() 函数将指定数量的已完成的 CQE 标记为已消耗,从而使 I/O uring 可以继续接收新的 CQE。


2.10 io_uring_cq_advance()

用于推进完成队列(Completion Queue)中的消费指针,即更新消费指针的位置。以告知 I/O uring 已经成功处理了一定数量的完成事件,并且可以释放这些事件所占用的资源。

void io_uring_cq_advance(struct io_uring *ring, unsigned int count);

1)ring 是指向 struct io_uring 的指针,代表 I/O uring 环境。

2)count 是要推进的完成事件数量。


三、liburing的TCP服务器实现

基于liburing,实现多个TCP服务器的收发

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <liburing.h>
#define ENTRIES_LENGTH    1024
enum {
  EVENT_ACCEPT = 0,
  EVENT_READ,
  EVENT_WRITE   
};
typedef struct _conninfo{
    int connfd;
    int event;
}conninfo;
//设置accept事件所需的信息,并将其与相应的 SQE 关联起来
void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr, 
                        socklen_t *addrlen, int flags){
    //获取一个SQE
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    //准备执行accept操作的SQE
    io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    //创建了一个结构体变量 info_accept,其中存储了与连接相关的信息
    conninfo info_accept = {
        .connfd = sockfd,
        .event = EVENT_ACCEPT
    };
    memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
//设置recv事件所需的信息,并将其与相应的 SQE 关联起来
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
    conninfo info_recv = {
        .connfd = sockfd,
        .event = EVENT_READ
    };
    memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}
//设置send事件所需的信息,并将其与相应的 SQE 关联起来
void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, sockfd, buf, len, flags);
    conninfo info_send = {
        .connfd = sockfd,
        .event = EVENT_WRITE
    };
    memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
int main(){
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(sockfd, (struct sockaddr *)&servaddr, sizeof(struct sockaddr))){
        printf("bind failed:%s\n",strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    //linuring
    struct io_uring_params params;
    memset(&params, 0 ,sizeof(struct io_uring_params));
    struct io_uring ring;
    //初始化io_uring队列
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
    //获取一个SQE
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct sockaddr_in clientaddr;
    socklen_t clilen = sizeof(struct sockaddr);
    //设置accept事件所需的信息,并将其与相应的 SQE 关联起来
    set_accept_event(&ring, sockfd, (struct sockaddr *)&clientaddr, &clilen, 0);
    char buffer[1024] = {0};
    while (1){
        //提交 SQE
        io_uring_submit(&ring);
        struct io_uring_cqe *cqe;
        //等待完成队列项cqe
        io_uring_wait_cqe(&ring, &cqe);
        struct io_uring_cqe *cqes[10];
        //批量获取已完成IO操作的CQE
        int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
        int i = 0;
        for (i = 0; i < cqecount; i++){
            cqe = cqes[i];
            conninfo ci;
            memcpy(&ci, &cqe->user_data, sizeof(ci));
            if (ci.event == EVENT_ACCEPT){ //此时已经完成accept
                if (cqe->res < 0) continue;
                int connfd = cqe->res;  
                //在已接受连接的connfd上设置recv事件处理器,以便处理已建立连接的数据接收操作。
                set_recv_event(&ring, connfd, buffer, 1024, 0); //此处的connfd表示已建立连接的fd,clientfd
                //重新设置接受连接的事件,以便处理新连接的接受操作。
                set_accept_event(&ring, ci.connfd, (struct sockaddr *)&clientaddr, &clilen, 0); //此处的connfd表示新连接的fd,listenfd
            }else if (ci.event == EVENT_READ){//此时已经完成read
                if (cqe->res < 0) continue;
                if (cqe->res == 0){
                    close(ci.connfd);
                }else{
                    printf("recv --> %s, %d\n", buffer, cqe->res);
                    //重新设置写的事件,以便回发
                    set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
                }
            } else if (ci.event == EVENT_WRITE){//此时已经完成write
                //重新设置读的事件
                set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
            }
        }
        io_uring_cq_advance(&ring, cqecount);
    }
    getchar();
}
目录
相关文章
|
1月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
在Python的并发编程世界中,没有万能的解决方案,只有最适合特定场景的方法。希望本文能够为你拨开迷雾,找到那条通往高效并发编程的光明大道。
42 2
|
2月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
46 4
|
2月前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
在数据驱动时代,高效处理大规模数据和高并发请求至关重要。Python凭借其优雅的语法和强大的库支持,成为开发者首选。本文将介绍Python中的并发与异步编程,涵盖并发与异步的基本概念、IO密集型任务的并发策略、CPU密集型任务的并发策略以及异步IO的应用。通过具体示例,展示如何使用`concurrent.futures`、`asyncio`和`multiprocessing`等库提升程序性能,帮助开发者构建高效、可扩展的应用程序。
113 0
|
4月前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
【7月更文挑战第17天】Python并发编程中,异步编程(如`asyncio`)在IO密集型任务中提高效率,利用等待时间执行其他任务。但对CPU密集型任务,由于GIL限制,多线程效率不高,此时应选用`multiprocessing`进行多进程并行计算以突破限制。选择合适的并发策略是关键:异步适合IO,多进程适合CPU。理解这些能帮助构建高效并发程序。
118 6
|
4月前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
【7月更文挑战第17天】在数据驱动时代,Python凭借其优雅语法和强大库支持成为并发处理大规模数据的首选。并发与异步编程是关键,包括多线程、多进程和异步IO。对于IO密集型任务,如网络请求,可使用`concurrent.futures`和`asyncio`;CPU密集型任务则推荐多进程,如`multiprocessing`;`asyncio`适用于混合任务,实现等待IO时执行CPU任务。通过这些工具,开发者能有效优化资源,提升系统性能。
93 4
|
4月前
|
开发框架 并行计算 .NET
从菜鸟到大神:Python并发编程深度剖析,IO与CPU的异步战争!
【7月更文挑战第18天】Python并发涉及多线程、多进程和异步IO(asyncio)。异步IO适合IO密集型任务,如并发HTTP请求,能避免等待提高效率。多进程在CPU密集型任务中更优,因可绕过GIL限制实现并行计算。通过正确选择并发策略,开发者能提升应用性能和响应速度。
106 3
|
4月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
【7月更文挑战第18天】Python并发编程中,异步IO适合IO密集型任务,如异步HTTP请求,利用`asyncio`和`aiohttp`实现并发抓取,避免等待延迟。而对于CPU密集型任务,如并行计算斐波那契数列,多进程通过`multiprocessing`库能绕过GIL限制实现并行计算。选择正确的并发模型能显著提升性能。
88 2
|
4月前
|
开发框架 数据挖掘 .NET
显微镜下的Python并发:细说IO与CPU密集型任务的异步差异,助你精准施策!
【7月更文挑战第16天】在Python并发编程中,理解和区分IO密集型与CPU密集型任务至关重要。IO密集型任务(如网络请求)适合使用异步编程(如`asyncio`),以利用等待时间执行其他任务,提高效率。CPU密集型任务(如计算)则推荐使用多进程(如`multiprocessing`),绕过GIL限制,利用多核CPU。正确选择并发策略能优化应用性能。
71 2
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在Flink算子内部使用异步IO可以通过什么办法实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
缓存 网络协议 算法
【Linux系统编程】深入剖析:四大IO模型机制与应用(阻塞、非阻塞、多路复用、信号驱动IO 全解读)
在Linux环境下,主要存在四种IO模型,它们分别是阻塞IO(Blocking IO)、非阻塞IO(Non-blocking IO)、IO多路复用(I/O Multiplexing)和异步IO(Asynchronous IO)。下面我将逐一介绍这些模型的定义:
219 2