linux内核分析--异步io(一)

简介:

linux2.6的内核增加了异步io,这个改动可以体现内核架构的重要性,连同epoll的内核实现,提升了io性能。碰巧的是,这两个特性都源自于同 一个本源,那就是睡眠队列的唤醒函数中增加了回调函数,这就可以让用户实现自己的唤醒策略,结果是异步io和epoll都用到了唤醒回调函数,只是实现不 同,本文先讨论异步io,下一篇文章讨论epoll。 
本人文笔不甚好,前面的话我自己都感觉不知所云,还是代码可以说明一切问题(在此小声说一句,哪位志同道合的要想研究内核,千万别买国产的内核分析之类的书,全是垃圾!还是自己牢牢实实读代码的好!) 
先说一下睡眠队列,它被定义为:

typedef struct __wait_queue wait_queue_t
struct __wait_queue { 
unsigned int flags
#define WQ_FLAG_EXCLUSIVE 0x01 
void *private
wait_queue_func_t func
struct list_head task_list; 
}; 
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int sync, void *key); 
这个func就是前面所说的回调函数,至于怎么实现,就要看你的需求了,如果一个进程位于一个睡眠队列__wait_queue_head中,当它被唤醒的时候,过程是这样的:调用wake_up 
#define wake_up(x__wake_up(xTASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL
void fastcall __wake_up(wait_queue_head_t *q, unsigned int mode
int nr_exclusive, void *key

unsigned long flags
spin_lock_irqsave(&q->lockflags); 
__wake_up_common(q, mode, nr_exclusive, 0, key); 
spin_unlock_irqrestore(&q->lockflags); 

这里调用了__wake_up_common 3183 static void __wake_up_common(wait_queue_head_t *q, unsigned int mode
int nr_exclusive, int sync, void *key

struct list_head *tmp, *next
list_for_each_safe(tmpnext, &q->task_list) { 
wait_queue_t *curr; 
unsigned flags
curr = list_entry(tmpwait_queue_t, task_list); 
flags = curr->flags
if (curr->func(curr, modesynckey) && 
(flags & WQ_FLAG_EXCLUSIVE) && 
!--nr_exclusive) 
break; 


第3193行就是唤醒函数的调用,在进程因为种种苦衷必须休息一下的时候,调用 
__add_wait_queue_tail 
static inline void __add_wait_queue_tail(wait_queue_head_t *head
wait_queue_t *new

list_add_tail(&new->task_list, &head->task_list); 

其中的参数new就是一个睡眠体,他的唤醒函数就是上面说的可以自定义的唤醒函数。现在整体过程已经了解了,总结起来一句话,该休息时就睡眠,该工作时被唤醒,拒绝soho,拒绝杂乱! 
现在看看怎么实现异步io,所谓异步io,就是不同步的io---废话!此时的多么惭愧,一句很容易理解的话硬是不知道该怎么说,是这样的:异步io和非阻塞io是根本不同的,非阻塞的意思是当阻塞的时候返回,但是实际上什么也没有做,仅仅得到了一个信息:我现在很忙。言外之意就是:你等会再来。实际上你得一直询问:你还忙吗?直到数据到位了,你再询问,才能真正返回你要的结果,整个过程都是你自己单枪匹马在劳作,别人只给你信息,并不会帮你;而异步io不是这样,异步io当遇到阻塞时也是直接返回,这点和非阻塞io一致,但是它在返回前要交待一下自己的要求,为什么呢?自己的要求给谁交待呢?为何要交待呢?难道要将任务转手,自己的事情推给别人不成,就像我们公司的某某人,某某现在正在写博客的人!是的,正是这样,进程要把请求交待给内核,让内核帮它,其实并不一定非得交待给内核,按照posix的异步io的语义,只要不是它自己干都行,随便交待给一个别的可运行实体都是可以的。 
以上就是二者的区别,从内核的发展来看,Linux内核越来越往posix靠拢。说的不少了,还是分析代码吧: 
异步io涉及几个系统调用: 
io_setup 
io_submit 
io_getevents 
io_destroy 
大家可以自己在网上的man手册里查看以上函数的用法。以上函数对应4个系统调用: 
sys_io_setup 
sys_io_submit 
sys_io_getevents sys_io_destroy 
从分析sys_io_setup开始,一直到sys_io_getevents结束,整个架构就了解了。 
asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t __user *ctxp) 

struct kioctx *ioctx = NULL
unsigned long ctx
long ret; 
ret = get_user(ctx, ctxp); 
if (unlikely(ret)) 
goto out
ret = -EINVAL
if (unlikely(ctx || nr_events == 0)) { 
… 

ioctx = ioctx_alloc(nr_events); 
ret = PTR_ERR(ioctx); 
if (!IS_ERR(ioctx)) { 
ret = put_user(ioctx->user_id, ctxp); 
if (!ret) 
return 0; 
get_ioctx(ioctx); /* io_destroy() expects us to hold a ref */ 
io_destroy(ioctx); 

out
return ret; 

ioctx = ioctx_alloc(nr_events)是这个函数的重点,它分配了一个kioctx结构并且初始化了一些字段 
static struct kioctx *ioctx_alloc(unsigned nr_events) 

struct mm_struct *mm; 
struct kioctx *ctx
/* Prevent overflows */ 
if ((nr_events > (0x10000000U / sizeof(struct io_event))) || 
(nr_events > (0x10000000U / sizeof(struct kiocb)))) { 
… 

if ((unsigned long)nr_events > aio_max_nr
return ERR_PTR(-EAGAIN); 
ctx = kmem_cache_alloc(kioctx_cachepGFP_KERNEL); 
… 
memset(ctx, 0, sizeof(*ctx)); 
ctx->max_reqs = nr_events; 
mm = ctx->mm = current->mm; 
atomic_inc(&mm->mm_count); 
atomic_set(&ctx->users, 1); 
spin_lock_init(&ctx->ctx_lock); 
spin_lock_init(&ctx->ring_info.ring_lock); 
init_waitqueue_head(&ctx->wait); 
INIT_LIST_HEAD(&ctx->active_reqs); 
INIT_LIST_HEAD(&ctx->run_list); 
INIT_WORK(&ctx->wq, aio_kick_handlerctx);//声明了一个工作用于插入一个已经专门为aio定义好的工作队列,aio_kick_handler就是其执行工作函数 
if (aio_setup_ring(ctx) < 0) 
goto out_freectx; 
/* limit the number of system wide aios */ 
spin_lock(&aio_nr_lock); 
if (aio_nr + ctx->max_reqs > aio_max_nr || 
aio_nr + ctx->max_reqs < aio_nr
ctx->max_reqs = 0; 
else 
aio_nr += ctx->max_reqs; 
spin_unlock(&aio_nr_lock); 
if (ctx->max_reqs == 0) 
goto out_cleanup; 
write_lock(&mm->ioctx_list_lock); 
ctx->next = mm->ioctx_list;//这一句和下面的一句将这个kioctx连接到当前进程的mm->ioctx_list结构 
mm->ioctx_list = ctx
write_unlock(&mm->ioctx_list_lock); 
return ctx
… 

static int aio_setup_ring(struct kioctx *ctx

struct aio_ring *ring
struct aio_ring_info *info = &ctx->ring_info
unsigned nr_events = ctx->max_reqs; 
unsigned long size
int nr_pages; 
/* Compensate for the ring buffer's head/tail overlap entry */ 
nr_events += 2; /* 1 is required, 2 for good luck */ 
size = sizeof(struct aio_ring); 
size += sizeof(struct io_event) * nr_events; 
nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT
if (nr_pages < 0) 
return -EINVAL
nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event); 
info->nr = 0; 
info->ring_pages = info->internal_pages; 
if (nr_pages > AIO_RING_PAGES) { 
info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); 
if (!info->ring_pages) 
return -ENOMEM

info->mmap_size = nr_pages * PAGE_SIZE
dprintk("attempting mmap of %lu bytes/n", info->mmap_size); 
down_write(&ctx->mm->mmap_sem); 
info->mmap_base = do_mmap(NULL, 0, info->mmap_size, 
PROT_READ|PROT_WRITEMAP_ANON|MAP_PRIVATE
0);//在当前进程的地址空间映射一块虚拟区间 
if (IS_ERR((void *)info->mmap_base)) { 
… 

dprintk("mmap address: 0x%08lx/n", info->mmap_base); 
info->nr_pages = get_user_pages(currentctx->mm, 
info->mmap_base, nr_pages, 
1, 0, info->ring_pages, NULL);//得到刚才映射的区间的实际物理页的页描述符 
up_write(&ctx->mm->mmap_sem); 
if (unlikely(info->nr_pages != nr_pages)) { 
aio_free_ring(ctx); 
return -EAGAIN

ctx->user_id = info->mmap_base
info->nr = nr_events; /* trusted copy */ 
ring = kmap_atomic(info->ring_pages[0], KM_USER0); 
ring->nr = nr_events; /* user copy */ 
ring->id = ctx->user_id; 
ring->head = ring->tail = 0; 
ring->magic = AIO_RING_MAGIC
ring->compat_features = AIO_RING_COMPAT_FEATURES
ring->incompat_features = AIO_RING_INCOMPAT_FEATURES
ring->header_length = sizeof(struct aio_ring); 
kunmap_atomic(ring, KM_USER0); 
return 0; 

以上的这些足以建立了前序工作,submit可以开始了,但是在说submit之前还得说一句,aio_setup_ring函数是很重要的,因为它实现物理基础设施,如果说整个异步io是一座城市的管理方针发展战略的话,那么这个aio_ring就是城市的工厂,下水道,车站等设施。 
本来准备把所有这些整合到一篇的,可是写着写着发现自己累了,休息一下......



 本文转自 dog250 51CTO博客,原文链接:http://blog.51cto.com/dog250/1274056

相关文章
|
1天前
|
Linux Windows 编译器
|
2天前
|
存储 算法 Linux
【Linux】线程的内核级理解&&详谈页表以及虚拟地址到物理地址之间的转化
【Linux】线程的内核级理解&&详谈页表以及虚拟地址到物理地址之间的转化
|
2天前
|
安全 Linux
【Linux】详解用户态和内核态&&内核中信号被处理的时机&&sigaction信号自定义处理方法
【Linux】详解用户态和内核态&&内核中信号被处理的时机&&sigaction信号自定义处理方法
|
2天前
|
存储 Linux
【Linux】对信号产生的内核级理解
【Linux】对信号产生的内核级理解
|
2天前
|
消息中间件 算法 Linux
【Linux】对system V本地通信的内核级理解
【Linux】对system V本地通信的内核级理解
|
3天前
|
Linux 编译器 调度
xenomai内核解析--双核系统调用(二)--应用如何区分xenomai/linux系统调用或服务
本文介绍了如何将POSIX应用程序编译为在Xenomai实时内核上运行的程序。
19 1
xenomai内核解析--双核系统调用(二)--应用如何区分xenomai/linux系统调用或服务
|
3天前
|
算法 Linux 调度
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
9 1
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(一)--实时端socket创建流程
|
3天前
|
Linux 调度 数据库
|
3天前
|
存储 缓存 Linux
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(三)--实时与非实时数据交互
本文介绍了Xenomai中的XDDP(Xenomai Distributed Data Protocol)通信机制,XDDP用于实时和非实时进程之间的数据交换。XDDP在Xenomai内核中涉及的数据结构和管理方式,以及创建XDDP通道后的实时端和非实时端连接过程。
8 0
xenomai内核解析--xenomai与普通linux进程之间通讯XDDP(三)--实时与非实时数据交互
|
4天前
|
缓存 安全 网络协议
Linux内核详解,什么是linux内核?
Linux内核详解,什么是linux内核?
14 0