该分析sys_io_submit函数了,这个函数有点复杂,但是条理很清晰,先说一句就是提交异步io,具体怎么提交呢?我们知道,对于异步io,一次性可以提交多个请求,那么可以想象的就是在sys_io_submit中会把我们用户程序的多个请求分解成一个一个的请求,依次提交,这是很合理的假设,内核实际上也是这么做的,刚才的建立异步io的阶段只是建立了一个可以让异步io表演的大的环境,现在的提交请求和将来的读取数据便是大戏了,准备好了吗?马上开演!
asmlinkage long sys_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user * __user *iocbpp)
{
struct kioctx *ctx;
long ret = 0;
int i;
if (unlikely(nr < 0))
return -EINVAL;
if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
return -EFAULT;
ctx = lookup_ioctx(ctx_id);//这里查找我们刚才建立的kioctx
if (unlikely(!ctx)) {
…
for (i=0; i<nr; i++) {//这个循环实质上分解了用户请求
struct iocb __user *user_iocb;
struct iocb tmp;
if (unlikely(__get_user(user_iocb, iocbpp + i))) {
ret = -EFAULT;
break;
}
if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp))))
…
ret = io_submit_one(ctx, user_iocb, &tmp);//一次提交一个,直到全部提交完毕
if (ret)
break;
}
put_ioctx(ctx);
return i ? i : ret;
}
上面的函数提到了查找kioctx结构,这个函数猜也能猜到怎么实现的,就是在进程的mm中的ioctx_list中寻找id和sunmit参数的id相 同的kioctx,说了半天,到底什么是kioctx,它到底有什么用?它实际上正如前面说的那样,是一个大舞台,提供了一个工作体,提供了一个运行队列 等等,所有属于这个大舞台的请求必须纳入它的管理范畴,它的字段决定了谁应该被调度,以及调度后应该做甚。下面来看看完成实际工作的io_submit_one函数:
1476 int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
struct iocb *iocb)
{
struct kiocb *req;
struct file *file;
ssize_t ret;
/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
…
/* prevent overflows */
if (unlikely(
(iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
(iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
((ssize_t)iocb->aio_nbytes < 0)
)) {
…
file = fget(iocb->aio_fildes);
req = aio_get_req(ctx); /* returns with 2 references to req */
if (unlikely(!req)) {
…
req->ki_filp = file;
ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
dprintk("EFAULT: aio_key/n");
goto out_put_req;
}
req->ki_obj.user = user_iocb;
req->ki_user_data = iocb->aio_data;
req->ki_pos = iocb->aio_offset;
req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
req->ki_opcode = iocb->aio_lio_opcode;
init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);//注册唤醒时的回调函数
INIT_LIST_HEAD(&req->ki_wait.task_list);
req->ki_retried = 0;
ret = aio_setup_iocb(req);//设置retry回调函数
if (ret)
goto out_put_req;
spin_lock_irq(&ctx->ctx_lock);
aio_run_iocb(req);//首先先执行一次,没准一次就能成功,要不怕做无用功,内核真是精打细算啊
if (!list_empty(&ctx->run_list)) {
/* drain the run list */
while (__aio_run_iocbs(ctx))//如果当前异步上下文的运行队列有请求,执行之!
;
}
spin_unlock_irq(&ctx->ctx_lock);
aio_put_req(req); /* drop extra ref to req */
return 0;
…
}
我们来看看aio_run_iocb函数,实际上__aio_run_iocbs(ctx)最终也是要调用aio_run_iocb函数的:
static ssize_t aio_run_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
if (iocb->ki_retried++ > 1024*1024) {
…
}
if (!(iocb->ki_retried & 0xff)) {
}
if (!(retry = iocb->ki_retry)) {
…
}
kiocbClearKicked(iocb);
iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
spin_unlock_irq(&ctx->ctx_lock);
/* Quit retrying if the i/o has been cancelled */
if (kiocbIsCancelled(iocb)) {
…
}
BUG_ON(current->io_wait != NULL);
current->io_wait = &iocb->ki_wait;//以下3行很重要,如果当前请求未果,则可能睡眠在io_wait,当被唤醒的时候执行aio_wake_function
ret = retry(iocb);
current->io_wait = NULL;
if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
…
}
…
}
注意aio_wake_function是唤醒回调函数,这个函数本质上也是执行将上下文挂入工作队列的任务,为什么呢?为何不让它直接把任务完成呢?因为它可能在中断上下文中,这又很多限制,比如不能睡眠,于是乎就把任务挂入一个工作队列,这样就有了进程的上下文,一切变得明朗!到这基本上就完事了,不用用户进程操心了,到了实在闲来无事的时候来取数据吧!如果实在觉得这篇文章到此意尤未尽,那么请看《linux工作队列和异步io 》下面将要进行的就是取数据了。一会回来,更精彩!
本文转自 dog250 51CTO博客,原文链接:http://blog.51cto.com/dog250/1274055