2.10 高性能异步IO机制:io_uring

简介: 2.10 高性能异步IO机制:io_uring

一、io_uring的引入

为了方便说明io_uring的作用,先举一个通俗点的例子


1、通过异步提高读写的效率

假设有一批数量很大的货,需要分批次运到厂里处理。这个时候就有两种方式:

1)同步方式:运送一批到厂里,等厂里反馈OK了,再回来送下一批;

2)异步方式:送到厂里之后,不用等厂里反馈,直接再送下一批货。


哪一种方式比较好呢?当然如果货的数量不多,同步方式比较可靠,效率差别也不大。但如果面对数量很大,比如读取一个很大的数据包,那异步方式效率就会显著提高。


2、减少系统开销

另外,正常送货到工厂,需要开门进入,卸完货之后再出来。这样频繁的进出消耗很大。因此,可以在工厂门口开辟一块共享仓库,货车卸货进仓库,工厂从仓库里取货。


对应的,IO操作需要进行系统调用。而在调用系统调用时,会从用户态切换到内核态,进行上下文切换。在高 IOPS(Input/Output Per Second)的情况下,进行上下文切换会消耗大量的CPU时间。

io_uring为了减少系统调用带来的上下文切换,采用了用户态和内核态共享内存的方式。用户态对共享内存进行读写操作是不需要使用系统调用的,所以不会发生上下文切换的情况。


二、io_uring

1、内核接口

linux内核(5.10版本之后)为io_uring提供了三个接口

1)io_uring_setup:该函数用于初始化和配置 io_uring环境。该函数将返回一个文件描述符,称为 io_uring文件描述符,用于后续的 I/O 操作。

int io_uring_setup(unsigned entries, struct io_uring_params *p);
  • entries:指定io_uring 的入口数目,即同时处理的 I/O 事件数目。
  • p:指向 struct io_uring_params 结构的指针,用于传递其他配置参数。


2)io_uring_enter:该函数用于提交 I/O 事件并等待其完成。该函数将返回已完成的 I/O 事件数量。

int io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, unsigned flags, sigset_t *sig);
  • fd:io_uring文件描述符。
  • to_submit:要提交的 I/O 事件数量。
  • min_complete:指定在返回之前至少完成的 I/O 事件数量。
  • flags:用于指定操作行为的标志。
  • sig:用于传递信号集,以便在等待期间阻塞特定的信号。


3)io_uring_register:该函数用于将文件描述符或内存区域与io_uring关联起来。该函数将返回注册的文件描述符或内存区域的索引,以便后续的 I/O 操作可以使用。

int io_uring_register(int fd, unsigned int opcode, const void *arg, unsigned int nr_args);
  • fd:io_uring文件描述符。
  • opcode:指定注册操作的类型,如文件描述符的注册或内存区域的注册。
  • arg:指向相关数据结构的指针,用于传递需要注册的文件描述符或内存区域的信息。
  • nr_args:指定相关参数的数量。


2、提交队列SQ和完成队列CQ

1)提交队列(Submission Queues,SQ):环形队列,存放即将执行I/O操作的数据;

2)完成队列(Completion Queue, CQ):环形队列,存放I/O操作完成返回后的结果;

对于提交队列,内核会将io_sq_ring结构映射到应用程序的内存空间,这样应用程序可以直接向io_sq_ring结构的环形队列提交I/O操作,内核可以通过io_sq_ring结构的环形队列读取I/O操作。这样不用通过系统调用,避免了上下文切换。

同样对于完成队列也是如此。


2.1 io_sq_ringio_cq_ring

io_sq_ring结构用于向内核发起 I/O 操作请求,即将需要执行的 I/O 操作请求提交到队列中。

io_sq_ring 中的array指向一个环形队列的SQE数组,该数组用于存储应用程序提交的I/O请求信息。每个SQE对应一个io_uring_sqe结构体,代表一个具体的 I/O 操作请求,其中包含了执行该操作所需的各种参数和信息。

同样对于io_cq_ring结构也是如此。需要特别注意的是结构体中res 的含义会有所不同:

对于读取操作(如 read()),res 表示成功读取的字节数。

对于写入操作(如 write()),res 表示成功写入的字节数。

对于 accept 操作(如 accept()),res 表示接受到的新连接的文件描述符(即,已连接套接字)。


3、io_uring的流程

io_uring的操作流程如下:

1)应用程序向 提交队列io_sq_ring提交I/O操作

2)SQ内核线程从 提交队列 中读取 I/O 操作。

3)SQ内核线程发起 I/O 请求。

4)I/O 请求完成后,SQ内核线程会将 I/O 请求的结果写入到 io_uring 的 完成队列io_cq_ring中。

5)应用程序可以从 完成队列io_cq_ring 中读取到 I/O 操作的结果。


三、liburing库

liburing是已经封装好io_uring的库


1、安装liburing


1)下载源码

sudo apt-get install git
git clone git://git.kernel.dk/liburing

2)进入liburing

cd liburing/

3)配置

./configure

4)编译和安装

make && sudo make install

5)配置pkg-config文件,以便其他程序能够使用已安装的liburing库

echo "/usr/local/lib/pkgconfig" | sudo tee /etc/ld.so.conf.d/usr-local-lib.conf >/dev/null
sudo ldconfig -v

6)编译应用程序

gcc -o uring uring.c -luring -static

2、接口

2.1 io_uring_queue_init_params()

执行io_uring_setup()系统调用来初始化io_uring队列。

int io_uring_queue_init_params(unsigned entries, struct io_uring *ring, const struct io_uring_params *p);

1)entries:指定 I/O uring 的入口数目,即同时处理的 I/O 事件数目。

2)ring:指向 struct io_uring 结构的指针,用于接收初始化后的 I/O uring 环境。

3)p:指向 struct io_uring_params 结构的指针,包含了自定义的初始化参数。


2.2 io_uring_get_sqe()

用于从 I/O uring 环境的 submission queue (SQ) 中获取一个 Submission Queue Entry (SQE)。通过获取 SQE,你可以将 I/O 操作请求添加到 I/O uring 中,并提交给内核进行处理。

struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);

1)ring:指向 struct io_uring 结构的指针,表示要操作的 I/O uring 环境。

该函数返回一个指向 struct io_uring_sqe 结构的指针,该结构表示一个 Submission Queue Entry (SQE)。SQE 包含了要执行的 I/O 操作的详细信息


2.3 io_uring_prep_accept()

用于准备执行 accept 操作的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_accept 函数,可以设置要接受连接的套接字描述符、新连接的文件描述符和连接地址等参数。

void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要接受连接的套接字描述符。

3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。

4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。

5)flags:一个整数,用于设置一些额外的标志。


2.4 io_uring_prep_recv()

用于准备执行接收数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_recv 函数,你可以设置要接收数据的套接字描述符以及数据缓冲区等参数。

void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要接受连接的套接字描述符。

3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。

4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。

5)flags:一个整数,用于设置一些额外的标志。


2.5 io_uring_prep_send()

用于准备执行发送数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_send 函数,你可以设置要发送数据的套接字描述符以及数据缓冲区等参数。

void io_uring_prep_send(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned int len, int flags);

1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。

2)fd:要发送数据的套接字描述符。

3)buf:一个指向数据缓冲区的指针,包含要发送的数据。

4)len:一个无符号整数,表示数据缓冲区中要发送的数据长度。

5)flags:一个整数,用于设置一些额外的标志。


2.6 io_uring_submit()

用于提交 SQE (Submission Queue Entry) 到 I/O uring 环境中进行处理。

int io_uring_submit(struct io_uring *ring);

1)ring:一个指向 struct io_uring 结构的指针,表示要提交的 I/O uring 环境。

2)io_uring_submit 函数将等待队列中的 SQEs 进行提交,并触发 I/O uring 环境的处理。


函数返回已提交的 SQE 数量,或者在出现错误时返回负数


2.7 io_uring_wait_cqe()

用于等待完成队列项(Completion Queue Entry,CQE)的到来。

int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

1)ring:指向 struct io_uring 的指针,表示I/O uring环境。

2)cqe_ptr:指向CQE指针的指针,用于存储返回的已完成CQE。


io_uring_wait_cqe() 函数会阻塞当前线程,直到有已完成的CQE可用。一旦有CQE可用,函数将填充 cqe_ptr 指向的指针,并返回0。


2.8 io_uring_peek_batch_cqe()

用于批量获取已完成的CQE(Completion Queue Entry)而无需等待.

int io_uring_peek_batch_cqe(struct io_uring *ring, struct io_uring_cqe **cqes, unsigned int count);

1)ring:指向 struct io_uring 的指针,表示I/O uring环境。

2)cqes:一个指向指针数组的指针,用于存储返回的已完成CQE。

3)count:要获取的CQE数量。


io_uring_peek_batch_cqe() 函数会尝试从I/O uring环境中立即获取指定数量的已完成CQE,并将它们存储到 cqes 指向的指针数组中。如果成功获取了指定数量的CQE,函数将返回成功接收到并处理完毕的I/O事件数量,否则将返回负值。


2.9 io_uring_cq_advance()

用于标记完成队列(Completion Queue,CQ)上已经处理的CQE数量。当一个或多个CQE被处理后,需要调用io_uring_cq_advance()函数来更新下一次读取CQE时应该从哪个位置开始。

void io_uring_cq_advance(struct io_uring *ring, unsigned int steps);

1)ring:指向 struct io_uring 的指针,表示 I/O uring 环境。

2)steps:要向前推进的步数,即要消耗的 CQE 数量。


io_uring_cq_advance() 函数将指定数量的已完成的 CQE 标记为已消耗,从而使 I/O uring 可以继续接收新的 CQE。


2.10 io_uring_cq_advance()

用于推进完成队列(Completion Queue)中的消费指针,即更新消费指针的位置。以告知 I/O uring 已经成功处理了一定数量的完成事件,并且可以释放这些事件所占用的资源。

void io_uring_cq_advance(struct io_uring *ring, unsigned int count);

1)ring 是指向 struct io_uring 的指针,代表 I/O uring 环境。

2)count 是要推进的完成事件数量。


三、liburing的TCP服务器实现

基于liburing,实现多个TCP服务器的收发

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <liburing.h>
#define ENTRIES_LENGTH    1024
enum {
  EVENT_ACCEPT = 0,
  EVENT_READ,
  EVENT_WRITE   
};
typedef struct _conninfo{
    int connfd;
    int event;
}conninfo;
//设置accept事件所需的信息,并将其与相应的 SQE 关联起来
void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr, 
                        socklen_t *addrlen, int flags){
    //获取一个SQE
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    //准备执行accept操作的SQE
    io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    //创建了一个结构体变量 info_accept,其中存储了与连接相关的信息
    conninfo info_accept = {
        .connfd = sockfd,
        .event = EVENT_ACCEPT
    };
    memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
//设置recv事件所需的信息,并将其与相应的 SQE 关联起来
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
    conninfo info_recv = {
        .connfd = sockfd,
        .event = EVENT_READ
    };
    memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}
//设置send事件所需的信息,并将其与相应的 SQE 关联起来
void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, sockfd, buf, len, flags);
    conninfo info_send = {
        .connfd = sockfd,
        .event = EVENT_WRITE
    };
    memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
int main(){
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(sockfd, (struct sockaddr *)&servaddr, sizeof(struct sockaddr))){
        printf("bind failed:%s\n",strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    //linuring
    struct io_uring_params params;
    memset(&params, 0 ,sizeof(struct io_uring_params));
    struct io_uring ring;
    //初始化io_uring队列
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
    //获取一个SQE
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct sockaddr_in clientaddr;
    socklen_t clilen = sizeof(struct sockaddr);
    //设置accept事件所需的信息,并将其与相应的 SQE 关联起来
    set_accept_event(&ring, sockfd, (struct sockaddr *)&clientaddr, &clilen, 0);
    char buffer[1024] = {0};
    while (1){
        //提交 SQE
        io_uring_submit(&ring);
        struct io_uring_cqe *cqe;
        //等待完成队列项cqe
        io_uring_wait_cqe(&ring, &cqe);
        struct io_uring_cqe *cqes[10];
        //批量获取已完成IO操作的CQE
        int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
        int i = 0;
        for (i = 0; i < cqecount; i++){
            cqe = cqes[i];
            conninfo ci;
            memcpy(&ci, &cqe->user_data, sizeof(ci));
            if (ci.event == EVENT_ACCEPT){ //此时已经完成accept
                if (cqe->res < 0) continue;
                int connfd = cqe->res;  
                //在已接受连接的connfd上设置recv事件处理器,以便处理已建立连接的数据接收操作。
                set_recv_event(&ring, connfd, buffer, 1024, 0); //此处的connfd表示已建立连接的fd,clientfd
                //重新设置接受连接的事件,以便处理新连接的接受操作。
                set_accept_event(&ring, ci.connfd, (struct sockaddr *)&clientaddr, &clilen, 0); //此处的connfd表示新连接的fd,listenfd
            }else if (ci.event == EVENT_READ){//此时已经完成read
                if (cqe->res < 0) continue;
                if (cqe->res == 0){
                    close(ci.connfd);
                }else{
                    printf("recv --> %s, %d\n", buffer, cqe->res);
                    //重新设置写的事件,以便回发
                    set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
                }
            } else if (ci.event == EVENT_WRITE){//此时已经完成write
                //重新设置读的事件
                set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
            }
        }
        io_uring_cq_advance(&ring, cqecount);
    }
    getchar();
}
目录
相关文章
|
27天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
2天前
|
人工智能 算法 调度
uvloop,一个强大的 Python 异步IO编程库!
uvloop,一个强大的 Python 异步IO编程库!
11 2
|
3天前
|
API 调度 开发者
Python中的并发编程:使用asyncio库实现异步IO
传统的Python编程模式中,使用多线程或多进程实现并发操作可能存在性能瓶颈和复杂性问题。而随着Python 3.5引入的asyncio库,开发者可以利用异步IO来更高效地处理并发任务。本文将介绍如何利用asyncio库实现异步IO,提升Python程序的并发性能。
|
1月前
|
NoSQL Java Linux
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
65 0
|
1月前
|
调度 数据库 Python
Python中的并发编程:使用asyncio库实现异步IO
传统的Python程序在面对IO密集型任务时,往往会遇到性能瓶颈。本文将介绍如何利用Python中的asyncio库,通过异步IO的方式来提升程序的效率和性能,让你的Python程序能够更好地处理并发任务。
|
2月前
|
程序员 调度 云计算
Python并发编程的未来趋势:协程、异步IO与多进程的融合
Python并发编程的未来趋势:协程、异步IO与多进程的融合
|
2月前
|
数据采集 Python
Python多线程与异步IO的对比:何时选择哪种并发模型
Python多线程与异步IO的对比:何时选择哪种并发模型
|
2月前
|
开发者 Python
Python中的并发编程与异步IO
在当今快节奏的互联网时代,如何提高程序的执行效率成为了开发者们关注的焦点。本文将探讨Python中的并发编程与异步IO技术,介绍其原理、应用场景以及优缺点,帮助读者更好地理解和运用这些技术来提升程序性能。
|
2月前
|
并行计算 开发者 Python
Python中的并发编程:异步IO与多线程比较
本文将探讨Python中的并发编程方法,着重比较异步IO和多线程两种不同的实现方式。通过对它们的特点、优缺点以及适用场景进行分析,帮助读者更好地理解并发编程在Python中的应用。
26 1