一、io_uring和epoll的区别
(1)epoll设置事件完成之后,以后只要不修改或删除事件,就可以一直等待IO事件触发。即事件驱动机制。
epoll对事件的管理使用的是红黑树。
(2)io_uring有两个队列,SQ和CQ,io_uring_submite之后,事件提交在SQ等待,事件达到后交给CQ,应用程序调用io_uring_peek_batch_cqe从CQ取出后,会调用io_uring_cq_advance将事件触发销毁,因此要想一直可以等待事件,需要从CQ取出后再次把事件加入SQ中。即异步机制。
io_uring对事件的管理采用两个队列:SQ(submition queue)和CQ(completion queue)。
二、io_uring 与epoll性能比较
2.1、安装rust_echo_bench测试工具
(1)安装cargo:
# ubuntusudo apt-get install cargo # centossudo yum install cargo
(2)下载rust_echo_bench:
git clone https://github.com/haraldh/rust_echo_bench.git
(3)编译rust_echo_bench:
cd rust_echo_bench cargo run --release----help
执行过程:
Updating crates.io index Downloaded unicode-width v0.1.9 Downloaded getopts v0.2.21 Downloaded 2 crates (35.2 KB) in1.58s Compiling unicode-width v0.1.9 Compiling getopts v0.2.21 Compiling echo_bench v0.2.0 (/home/user/rust_echo_bench) Finished release [optimized] target(s) in 4m 05s Running `target/release/echo_bench --help`Echo benchmark. Usage: target/release/echo_bench [ -a <address> ] [ -l <length> ] [ -c <number> ] [ -t <duration> ] target/release/echo_bench (-h | --help) target/release/echo_bench --versionOptions: -h, --help Print this help. -a, --address <address> Target echo server address. Default: 127.0.0.1:12345 -l, --length <length> Test message length. Default: 512-t, --duration <duration> Test duration in seconds. Default: 60-c, --number <number> Test connection number. Default: 50
(3)运行
cargo run --release----address"192.168.7.233:9999"--number1000--duration60--length512
将相关参数修改即可。
2.2、测试比较
io_uring结果:
Finished release [optimized] target(s) in0.00s Running `target/release/echo_bench --address '192.168.7.233:9999' --number 1000 --duration 60 --length 512`Benchmarking: 192.168.7.235:9999 1000 clients, running 512 bytes, 60 sec. Speed: 8836 request/sec, 8836 response/sec Requests: 530176Responses: 530176
epoll测试前,需要先调整ulimit大小:ulimit -n 1024567。
epoll结果:
Finished release [optimized] target(s) in0.01s Running `target/release/echo_bench --address '192.168.7.233:8888' --number 1000 --duration 60 --length 512`Benchmarking: 192.168.7.233:8888 1000 clients, running 512 bytes, 60 sec. Speed: 7908 request/sec, 7908 response/sec Requests: 474517Responses: 474516
注意,此测试结果仅仅是某次数据或访问的比较,为自己编写的代码提供一种测试方案,不作为说明io_uring与epoll/poll/select的性能高低。
2.3、小结
io_uring在性能上不比reactor高多少,io_uring不一定会完全替代epoll,未来是io_uring与epoll并存,只是网络IO事件的处理方案多了一个选择。
三、实现封装io_uring用户态文件读写接口
io_uring提供三个系统调用接口:io_uring_submit()、io_uring_enter()、io_uring_register()。不使用liburing情况下,需要自己实现用户层的接口。io_uring除了可以处理网络IO,也可以处理磁盘IO事件。
3.1、系统调用
系统调用函数是syscall。调用syscall函数时,会触发一个0x80的中断。每个系统调用都有系统调用号,存放在sys_call_table数组中。
流程:调用syscall,触发0x80中断,将系统调用号赋给eax寄存器,参数赋给ebx、ecx等寄存器,然后执行system_call。
函数原型:
/* See feature_test_macros(7) *//* For SYS_xxx definitions */longsyscall(longnumber, ...);
使用示例:
intmain(intargc, char*argv[]) { pid_ttid; tid=syscall(SYS_gettid); tid=syscall(SYS_tgkill, getpid(), tid, SIGHUP); }
3.2、内存映射mmap
应用层和内核直接数据交互,可以通过映射内存块的方式。
函数原型:
void*mmap(void*addr, size_tlength, intprot, intflags, intfd, off_toffset); intmunmap(void*addr, size_tlength);
使用示例:
intmain(intargc, char*argv[]) { char*addr; intfd; structstatsb; off_toffset, pa_offset; size_tlength; ssize_ts; if (argc<3||argc>4) { fprintf(stderr, "%s file offset [length]\n", argv[0]); exit(EXIT_FAILURE); } fd=open(argv[1], O_RDONLY); if (fd==-1) handle_error("open"); if (fstat(fd, &sb) ==-1) /* To obtain file size */handle_error("fstat"); offset=atoi(argv[2]); pa_offset=offset&~(sysconf(_SC_PAGE_SIZE) -1); /* offset for mmap() must be page aligned */if (offset>=sb.st_size) { fprintf(stderr, "offset is past end of file\n"); exit(EXIT_FAILURE); } if (argc==4) { length=atoi(argv[3]); if (offset+length>sb.st_size) length=sb.st_size-offset; /* Can't display bytes past end of file */ } else { /* No length arg ==> display to end of file */length=sb.st_size-offset; } addr=mmap(NULL, length+offset-pa_offset, PROT_READ, MAP_PRIVATE, fd, pa_offset); if (addr==MAP_FAILED) handle_error("mmap"); s=write(STDOUT_FILENO, addr+offset-pa_offset, length); if (s!=length) { if (s==-1) handle_error("write"); fprintf(stderr, "partial write"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); }
3.3、SQ_RING、CQ_RING、SQES关系
SQ_RING和CQ_RING的各项偏移指向SQES,真正存储数据的是SQES。
3.4、代码实现示例
// 定义结构体,将内核数据mmap到用户层structapp_io_sq_ring { unsigned*head; unsigned*tail; unsigned*ring_mask; unsigned*ring_entries; unsigned*flags; unsigned*array; }; structapp_io_cq_ring { unsigned*head; unsigned*tail; unsigned*ring_mask; unsigned*ring_entries; structio_uring_cqe*cqes; }; // 返回structsubmitter { intring_fd; structapp_io_sq_ringsq_ring; structapp_io_cq_ringcq_ring; structio_uring_sqe*sqes; }; // file infostructfile_info { off_tfile_sz; structioveciovecs[]; }; intio_uring_setup(unsignedentries, structio_uring_params*p) { // io_uring.c// SYSCALL_DEFINE2(io_uring_setup,u32,entries,struct io_uring_params __user *,params)// 内核代码 io_uring_setupreturn (int)syscall(__NR_io_uring_setup, entries, p); } intio_uring_enter(intring_fd, unsignedintto_submit, unsignedintmin_complete, unsignedintflags) { return (int)syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete, flags, NULL, 0); } intapp_setup_uring(structsubmitter*s) { structio_uring_paramsp; memset(&p, 0, sizeof(p)); // 创建 CQ和SQ 内存空间s->ring_fd=io_uring_setup(URING_QUEUE_DEPTH, &p); if (s->ring_fd<0) return-1; intsring_sz=p.sq_off.array+p.sq_entries*sizeof(unsigned); intcring_sz=p.cq_off.cqes+p.cq_entries*sizeof(structio_uring_cqe); // 单映射,判断cq和sq是否公用一块内存if (p.features&IORING_FEAT_SINGLE_MMAP) { if (cring_sz>sring_sz) sring_sz=cring_sz; cring_sz=sring_sz; } // mmap sq空间void*sq_ptr=mmap(0, sring_sz, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, s->ring_fd, IORING_OFF_SQ_RING); if (sq_ptr==MAP_FAILED) return-1; // 单映射,判断cq和sq是否公用一块内存void*cq_ptr=NULL; if (p.features&IORING_FEAT_SINGLE_MMAP) { cq_ptr=sq_ptr; } else { // mmap cq空间cq_ptr=mmap(0, sring_sz, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, s->ring_fd, IORING_OFF_CQ_RING); if (cq_ptr==MAP_FAILED) return-1; } structapp_io_sq_ring*sring=&s->sq_ring; structapp_io_cq_ring*cring=&s->cq_ring; // sq赋值sring->head=sq_ptr+p.sq_off.head; sring->tail=sq_ptr+p.sq_off.tail; sring->ring_mask=sq_ptr+p.sq_off.ring_mask; sring->ring_entries=sq_ptr+p.sq_off.ring_entries; sring->flags=sq_ptr+p.sq_off.flags; sring->array=sq_ptr+p.sq_off.array; // 将sqes映射到用户空间s->sqes=mmap(0, p.sq_entries*sizeof(structio_uring_sqe),PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, s->ring_fd, IORING_OFF_SQES); if (s->sqes==MAP_FAILED) return-1; // cq赋值cring->head=cq_ptr+p.cq_off.head; cring->tail=cq_ptr+p.cq_off.tail; cring->ring_mask=cq_ptr+p.cq_off.ring_mask; cring->ring_entries=cq_ptr+p.cq_off.ring_entries; cring->cqes=sq_ptr+p.cq_off.cqes; return0; } // get file sizeoff_tget_file_size(intfd) { structstatst; if (fstat(fd, &st) <0) return-1; if (S_ISBLK(st.st_mode)) { unsignedlonglongbytes; if (ioctl(fd, BLKGETSIZE64, &bytes) !=0) return-1; returnbytes; } elseif (S_ISREG(st.st_mode)) returnst.st_size; return-1; } voidoutput_to_console(char*buf,intlen) { while (len--) { fputc(*buf++, stdout); } } voidread_from_cq(structsubmitter*s) { structfile_info*fi; structapp_io_cq_ring*cring=&s->cq_ring; structio_uring_cqe*cqe; unsignedhead=*cring->head; while (1) { if (head==*cring->tail) break; cqe=&cring->cqes[head&*s->cq_ring.ring_mask]; fi= (structfile_info*)cqe->user_data; if (cqe->res<0) fprintf(stderr, "error: %d\n", cqe->res); intblocks=fi->file_sz/BLOCK_SZ; if (fi->file_sz%BLOCK_SZ) blocks++; inti=0; while (++i<blocks) { output_to_console(fi->iovecs[i].iov_base, fi->iovecs[i].iov_len); printf("################################ i : %d, blocks : %d\n",i, blocks); } head++; printf("head : %d,tail : %d, blocks: %d\n", head, *cring->tail,blocks); } *cring->head=head; printf("read form cq end!\n"); } intsubmit_to_sq(char*file_path,structsubmitter*s) { intfilefd=open(file_path, O_RDONLY); if (filefd<0) return-1; off_tfilesz=get_file_size(filefd); if (filesz<0) { close(filefd); return-1; } structapp_io_sq_ring*sring=&s->sq_ring; off_tbytes_remaining=filesz; intblocks=filesz/BLOCK_SZ; if (filesz%BLOCK_SZ) blocks++; structfile_info*fi=malloc(sizeof(structfile_info) +sizeof(structiovec)*blocks); if (!fi) { close(filefd); return-2; } fi->file_sz=filesz; unsignedcurrent_block=0; while (bytes_remaining) { off_tbytes_to_read=bytes_remaining; if (bytes_to_read>BLOCK_SZ) bytes_to_read=BLOCK_SZ; fi->iovecs[current_block].iov_len=bytes_to_read; void*buf; if (posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) { close(filefd); return1; } fi->iovecs[current_block].iov_base=buf; current_block++; bytes_remaining-=bytes_to_read; } unsignednext_tail=0, tail=0, index=0; next_tail=tail=*sring->tail; next_tail++; index=tail&*s->sq_ring.ring_mask; structio_uring_sqe*sqe=&s->sqes[index]; sqe->fd=filefd; sqe->flags=0; sqe->opcode=IORING_OP_READV; sqe->addr= (unsignedlong)fi->iovecs; sqe->len=blocks; sqe->off=0; sqe->user_data= (unsignedlonglong)fi; sring->array[index] =index; tail=next_tail; if (*sring->tail!=tail) *sring->tail=tail; intret=io_uring_enter(s->ring_fd, 1, 1, IORING_ENTER_GETEVENTS); if (ret<0) { close(filefd); return-1; } close(filefd); return0; } intmain(intargc, char*argv[]) { structsubmitter*s=malloc(sizeof(structsubmitter)); if (s==NULL) { perror("malloc fail"); return-1; } memset(s, 0, sizeof(structsubmitter)); if (app_setup_uring(s)) return1; inti=1; for (i=1; i<argc; i++) { if (submit_to_sq(argv[i], s)) return1; read_from_cq(s); } return0; }
总结
io_uring比epoll好的点是io_uring使用共享内存,不仅仅共享了IO事件,需要的变量也通过共享内存共享到用户空间,像SQ和CQ队列。io_uring不仅可以处理网络IO,也可以处理磁盘IO。
封装io_uring处理磁盘IO事件:
(1)io_uring_setup,创建SQ和CQ以及SQES,mmap io_uring_cqe / io_uring_sqe / SQES到用户空间。
(2)submit 文件IO到SQ。
(3)从CQ中读取文件IO。
关注公众号《Lion 莱恩呀》随时随地学习。