运作流程
iouring的运作流程:首先tcp_server发起读请求,通过liburing库将读请求放入到sqe中,然后内核异步操作读操作,读完后就会把结果放到完成队列cqe,然后tcp_server从cqe中取出结果进行处理。
基本方法
#include <liburing.h>
//初始化iouring,以及在内核中创建提交队列和完成队列,并映射到iouring中,iouring是用户空间,并传出iouring参数
int io_uring_queue_init_params(unsigned entries, struct io_uring *ring, struct io_uring_params *params);
//获取下一个可用的提交队列实例
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);
//设置监听accept事件给sqe条目
void io_uring_prep_accept(struct io_uring_sqe *sqe, int sockfd,struct sockaddr *addr,socklen_t *addrlen,int flags);
//将属于ring的提交队列条目sqe提交
int io_uring_submit(struct io_uring *ring);
//等待ring中的完成IO事件
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);
//获取已经就绪的完成队列
unsigned io_uring_peek_batch_cqe(struct io_uring *ring,struct io_uring_cqe **cqes, unsigned count);
//准备一个提交队列条目sqe,准备一个发送请求,提交队列使用文件描述符sockfd从buf发送过去
void io_uring_prep_send(struct io_uring_sqe *sqe,int sockfd,const void *buf,size_t len,int flags);
//准备一个提交队列条目sqe,准备一个接收请求,提交队列使用文件描述符sockfd给buf发送过去
void io_uring_prep_recv(struct io_uring_sqe *sqe,int sockfd, void *buf,size_t len,int flags);
//标记ring中已消耗的IO完成队列的数量
void io_uring_cq_advance(struct io_uring *ring,unsigned nr);
代码
#include "socketwrap.h"
#include <stdio.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <liburing.h>
#include <netinet/in.h>
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
#define EVENT_ACCEPT 0
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
struct conn_info
{
int fd;
int event;
};
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_READ,
};
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
}
int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_WRITE,
};
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
io_uring_prep_send(sqe, sockfd, buf, len, flags);
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT,
};
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)addr, addrlen, flags);
}
/*
初始化socket参数
port:端口
*/
int init_server(unsigned short port)
{
if (port < 0)
{
return -1;
}
int sfd = Socket(AF_INET, SOCK_STREAM, 0);
// 设置端口复用
int opt = 1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));
struct sockaddr_in soaddr;
bzero(&soaddr, sizeof(soaddr));
soaddr.sin_family = AF_INET;
soaddr.sin_port = htons(port);
soaddr.sin_addr.s_addr = htonl(INADDR_ANY);
Bind(sfd, (struct sockaddr *)&soaddr, sizeof(soaddr));
// 监听-listen
Listen(sfd, 128);
return sfd;
}
int main()
{
unsigned short port = 9999;
int sfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sfd, (struct sockaddr *)&clientaddr, &len, 0);
//这里使用的是全局的buffer,其实最好是去使用一个结构体,成员有fd,对应的rbuffer,wbuffer,rlen,wlen
char buffer[BUFFER_LENGTH] = {
0};
while (1)
{
io_uring_submit(&ring);
//printf("hello\n");
//最开始阻塞在这里
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
printf("--\n");
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait
int i = 0;
for (i = 0; i < nready; i++)
{
char sIP[16];
memset(sIP, 0x00, sizeof(sIP));
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == EVENT_ACCEPT)
{
set_event_accept(&ring, sfd, (struct sockaddr *)&clientaddr, &len, 0);
// printf("set_event_accept\n"); //
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
// 打印客户端的IP和PORT
printf("client [%s:%d] connect\n", inet_ntop(AF_INET, &clientaddr.sin_addr.s_addr, sIP, sizeof(sIP)), htons(clientaddr.sin_port));
}
else if (result.event == EVENT_READ)
{
//
int ret = entries->res;
if (ret == 0)
{
close(result.fd);
}
else if (ret > 0)
{
printf("set_event_recv ret: %d, %s\n", ret, buffer);
set_event_send(&ring, result.fd, buffer, ret, 0);
}
}
else if (result.event == EVENT_WRITE)
{
int ret = entries->res;
//printf("set_event_send ret: %d, %s\n", ret, buffer);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
io_uring_cq_advance(&ring, nready);
}
return 0;
}