异步IO由于它的非阻塞特性和强大的并发能力,非常适合用在要求高并发和高吞吐率的场景,比如用在提供SAN存储的块设备读写的实现上。和传统IO模式类似,异步IO提供了一次提交一个IO请求的模式,还提供了一次提交一组IO请求的方式。下面将分别介绍这两种模式的使用方法和差异。
-
单个处理模式
下面对一个长度为BUF_LEN的buffer,提交了MAX_IO个单个处理的IO请求,每个请求处理BUF_LEN/MAX_IO这么长的数据。为了便于读者实际上机验证,下面贴出了具体处理的代码:
..............................................................................................
int main(int argc, char * argv[])
{
io_context_t ctx_id = 0;
struct iocb mycb[MAX_AIO];
struct iovec myiov[MAX_AIO];
struct io_event events[MAX_AIO];
const int aioto = 5000000;
io_callback_t cb = my_callback;
unsigned nr_events = MAX_AIO;
int i, lenperIo, done, cnt = 0;
char buf[BUF_LEN];
memset(mycb, 0, sizeof(mycb));
memset(myiov, 0, sizeof(myiov));
int fd = open("./text.txt", O_CREAT | O_RDWR);
assert(fd >= 0);
memset(buf, 'a', BUF_LEN);
io_setup(nr_events, &ctx_id);
lenperIo = BUF_LEN/MAX_AIO;
for (i = 0; i < MAX_AIO; i++) {
io_prep_pwrite(&mycb[i], fd, lenperIo * i + (char *)buf, lenperIo, lenperIo * i);
io_set_callback(&mycb[i], my_callback);
mycb[i].key = i;
events[i].obj = &mycb[i];
}
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = aioto * 1000;
while (cnt < MAX_AIO) {
done = io_getevents(ctx_id, 1, MAX_AIO, events, &ts);
cnt += done;
printf("Finish %d coming IO!\n", done);
sleep(1);
}
for (i = 0; i < MAX_AIO; i++) {
cb = (io_callback_t)(events[i].data);
cb(ctx_id, (struct iocb *)events[i].obj, i, 0);
printf("events[%d]: obj = %p res = %ld res2 = %ld\n", i, events[i].obj, events[i].res, events[i].res2);
}
printf("My callback address : %p\n", (void *)my_callback);
close(fd);
}
..............................................................................................
可以看到,上面是多次提交io_prep_pwrite(),然后通过检查是否收到相应数目的event来判断IO是否完成。
2. 批量处理模式
初始化的代码差不多,主要的差别代码在下面
..............................................................................................
io_setup(nr_events, &ctx_id);
lenperIo = BUF_LEN/MAX_AIO;
for (i = 0; i < MAX_AIO; i++) {
myiov[i].iov_len = lenperIo;
myiov[i].iov_base = lenperIo * i + (char *)buf;
pmycb[i] = &mycb[i];
}
io_prep_pwritev(&mycb[0], fd, myiov, MAX_AIO, 32);
io_set_callback(&mycb[0], cb);
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = aioto * 1000;
while (cnt < 1) {
done = io_getevents(ctx_id, 1, 1, events, &ts);
cnt += done;
printf("Finish %d coming IO!\n", done);
sleep(2);
}
..............................................................................................
可以看到,在批量处理模式,先需要初始化一个io vector数组,在数组里面再指定IO操作在内存中的数据起始地址和长度,然后调用一次io_prep_pweritev(),最后等待唯一的一个event就可以了。
3. 两种处理方式的比较
下面的表格,总结了使用上面两种处理方式完成相同IO任务的实现上的差异:
模式 |
使用函数 |
是否必需用iovec |
io_setup调用次数 |
IO提交函数调用次数 |
io_getevents 需调用的次数 |
生成的io events数目 |
单个处理
|
io_prep_pwrite io_prep_pread |
不是 |
多次 |
多次 |
很可能多次 |
多个 |
批量处理 |
io_prep_pwritev io_prep_preadv |
是 |
1次 |
1次 |
最少一次 |
1 个 |
这里常常容易混淆的地方就是误以为每个io操作对于一个IO event,其实不然:是每个iocb对应一个IO event, 因为它IO event数据结构内部的obj和callback 都只有一份和iocb 数据结构里面的相对应,关于这点的详细说明可以参考我关于异步IO的上篇博客《C中异步IO浅析之三:深入理解异步IO的基本数据结构》。
4. 注意事项
单个IO处理的模式很好理解,而对批量处理,个人认为有一处man手册和头文件中都没有说明白的地方,那就是: io_prep_pwritev/ io_prep_preadv函数的最后一个参数offset的含义,它表示的是io vecotr里面最早执行的那个IO开始执行时读写操作在磁盘或文件上的物理偏移, 而下一个IO在磁盘或文件上读写的起始地址,就是这个偏移再加上刚完成IO操作的长度。因此,IO操作的总长度是IO vector里面所有成员的iov_len字段之和。