内核代码
用途
Linux内核中的异步函数调用机制,其目标是通过并行执行来减少内核启动时间。在系统启动时,一些操作(如硬件延迟和发现操作)可以以非严格的顺序并行执行,从而实现异步处理。
该机制的关键概念是“序列cookie”(可以视为单调递增的数字)。异步核心将为每个调度事件分配一个序列cookie,并将其传递给被调用的函数。
被异步调用的函数在执行全局可见的操作(如注册设备号)之前,应调用async_synchronize_cookie()函数并传递其自己的cookie。async_synchronize_cookie()函数将确保在对应于该cookie的操作之前安排的所有异步操作都已完成。
子系统/驱动程序初始化代码中,如果安排了异步探测函数,但与其他不使用异步调用功能的驱动程序/子系统共享全局资源,则需要在其init函数返回之前使用async_synchronize_full()函数进行完全同步。这是为了在异步和同步部分之间维护严格的顺序。
底层是基于工作队列,可以指定等待某个异步域(async_domain)中地哪些或者哪个任务(async_entry)执行完毕。
接口
定义异步域
Linux内核有提供了默认的异步域async_dfl_domain,此外,也提供了下面的两个宏接口来静态定义一个异步域:
/* * domain participates in global async_synchronize_full 添加到这种domain中的任务,可以调用async_synchronize_full来等待其执行完毕 */ #define ASYNC_DOMAIN(_name) \ struct async_domain _name = { .pending = LIST_HEAD_INIT(_name.pending), \ .registered = 1 } /* * domain is free to go out of scope as soon as all pending work is * complete, this domain does not participate in async_synchronize_full 添加到这种domain中的任务,不能调用async_synchronize_full来等待其执行完毕 */ #define ASYNC_DOMAIN_EXCLUSIVE(_name) \ struct async_domain _name = { .pending = LIST_HEAD_INIT(_name.pending), \ .registered = 0 }
上面这两种async_domain的区别是registered的值不同:
- 如果registered是1,表示在将任务添加到这个domain时,会把该任务挂到这个domain私有的pending链表,同时还会将其添加到全局的async_global_pending链表,这样,可以调用async_synchronize_full来等待任务执行完毕,这个接口会检查async_global_pending链表中的异步任务是否都已经执行完毕,此外也可以调用async_synchronize_full_domain来等待指定domain的私有pending链表中的异步任务全部执行完毕
- 如果registered是0,那么就不能使用async_synchronize_full了,但是可以使用async_synchronize_full_domain
系统默认的async_dfl_domain就是采用ASYNC_DOMAIN宏来定义的,所以既可以用async_synchronize_full,也可以用async_synchronize_full_domain。
注册异步任务
接口 | Domain | |
async_schedule | 全局默认的async_dfl_domain | |
async_schedule_node | 全局默认的async_dfl_domain,不过可以通过node来指定任务运行在哪个cpu上 | |
async_schedule_dev | 全局默认的async_dfl_domain,跟上一个不同之处是node来自传入的device参数 | |
async_schedule_domain | 自定义的domain | |
async_schedule_node_domain | 自定义的domain,不过可以通过node来指定任务运行在哪个cpu上 | |
async_schedule_dev_domain | 自定义的domain,跟上一个不同之处是node来自传入的device参数 | |
在异步任务成功注册后,会返回一个全局唯一的id,称之为cookie,这是一个u64位的数字,单调递增,用于唯一标识这个异步任务,下面可以用来精确地等待某个异步任务执行完毕。
异步任务按照先入先出(FIFO)的方式按顺序执行,如果不考虑回绕,那么排在前面的异步任务的ID小于排在后面的。
下面是注册异步任务的接口的实现关系:
等待任务执行完毕
接口 | 返回条件 | Cookie |
async_synchronize_full | 等待全局链表async_global_pending中的异步任务全部执行完毕 | ASYNC_COOKIE_MAX |
async_synchronize_full_domain | 等待domain的私有pending链表中的异步任务全部执行完毕 | ASYNC_COOKIE_MAX |
async_synchronize_cookie | 排在async_dfl_domain私有的pending链表最前面的等待执行的异步任务的id大于等于传入的cookie | 传入的cookie参数 |
async_synchronize_cookie_domain | 排在指定domain私有的pending链表最前面的等待执行的异步任务的id大于等于传入的cookie | 传入的cookie参数 |
源码分析
async_schedule_node_domain
注册异步任务。
点击查看代码
/** * async_schedule_node_domain - NUMA specific version of async_schedule_domain * @func: function to execute asynchronously * @data: data pointer to pass to the function * @node: NUMA node that we want to schedule this on or close to * @domain: the domain * * Returns an async_cookie_t that may be used for checkpointing later. * @domain may be used in the async_synchronize_*_domain() functions to * wait within a certain synchronization domain rather than globally. * * Note: This function may be called from atomic or non-atomic contexts. * * The node requested will be honored on a best effort basis. If the node * has no CPUs associated with it then the work is distributed among all * available CPUs. */ async_cookie_t async_schedule_node_domain(async_func_t func, void *data, int node, struct async_domain *domain) { struct async_entry *entry; unsigned long flags; async_cookie_t newcookie; /* allow irq-off callers 这个函数允许从原子上下文调用 */ entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC); /* * If we're out of memory or if there's too much work * pending already, we execute synchronously. 如果entry分配失败,或者当前系统中等待执行的异步任务的个数大于32768时,不再使用异步处理,而是改用同步处理,然后返回这个异步任务的id,不过这个异步任务此时已经执行完毕了。 */ if (!entry || atomic_read(&entry_count) > MAX_WORK) { kfree(entry); spin_lock_irqsave(&async_lock, flags); // 获取全局唯一的cookie,这里用自旋锁保护这个全局变量 newcookie = next_cookie++; spin_unlock_irqrestore(&async_lock, flags); /* low on memory.. run synchronously 如果内存不足或者等待处理的异步任务过多,改用同步处理 */ func(data, newcookie); return newcookie; } INIT_LIST_HEAD(&entry->domain_list); INIT_LIST_HEAD(&entry->global_list); // 使用工作队列,设置任务的处理回调函数async_run_entry_fn INIT_WORK(&entry->work, async_run_entry_fn); entry->func = func; entry->data = data; entry->domain = domain; // 这里通过全局自旋锁保证了id的分配和添加到链表的原子性,实现了一个FIFO,而且排在前面的任务的id小于后面的 spin_lock_irqsave(&async_lock, flags); /* allocate cookie and queue 分配全局唯一的id,这是一个u64的数字,单调递增 */ newcookie = entry->cookie = next_cookie++; // 将异步任务添加到domain私有的pending链表,这里实现了一个FIFO,先入先出 list_add_tail(&entry->domain_list, &domain->pending); // 如果这个domain的registered为1,那么同时将这个异步任务添加到全局链表async_global_pending中 if (domain->registered) list_add_tail(&entry->global_list, &async_global_pending); // 统计等待处理的异步任务的个数 atomic_inc(&entry_count); spin_unlock_irqrestore(&async_lock, flags); /* schedule for execution 提交异步任务,这里node可以控制任务将来在哪些CPU上运行 */ queue_work_node(node, system_unbound_wq, &entry->work); // 返回一个唯一的id return newcookie; } EXPORT_SYMBOL_GPL(async_schedule_node_domain);
这里使用底层使用了工作队列来实现异步处理,但是仅仅利用工作队列无法确保指定的异步任务执行完毕。
async_run_entry_fn
工作任务的回调,其中会回调异步任务的处理函数。
点击查看代码
/* * pick the first pending entry and run it */ static void async_run_entry_fn(struct work_struct *work) { struct async_entry *entry = container_of(work, struct async_entry, work); unsigned long flags; ktime_t calltime; /* 1) run (and print duration) */ pr_debug("calling %lli_%pS @ %i\n", (long long)entry->cookie, entry->func, task_pid_nr(current)); calltime = ktime_get(); // 回调异步任务的回调函数,入参data来自用户,cookie是这个任务全局唯一的id // 但是这个不能确保回调函数中发起的异步任务执行完毕 entry->func(entry->data, entry->cookie); pr_debug("initcall %lli_%pS returned after %lld usecs\n", (long long)entry->cookie, entry->func, microseconds_since(calltime)); /* 2) remove self from the pending queues 异步任务执行完毕后,将异步任务结构体从domian的私有pending链表和全局的async_global_pending中删除 */ spin_lock_irqsave(&async_lock, flags); list_del_init(&entry->domain_list); list_del_init(&entry->global_list); /* 3) free the entry 释放异步任务结构体,并且更新待处理的异步任务的个数 */ kfree(entry); atomic_dec(&entry_count); spin_unlock_irqrestore(&async_lock, flags); /* 4) wake up any waiters 唤醒等待异步任务执行的进程 */ wake_up(&async_done); }
async_synchronize_cookie_domain
等待异步任务执行完毕。
点击查看代码
/** * async_synchronize_cookie_domain - synchronize asynchronous function calls within a certain domain with cookie checkpointing * @cookie: async_cookie_t to use as checkpoint * @domain: the domain to synchronize (%NULL for all registered domains) * * This function waits until all asynchronous function calls for the * synchronization domain specified by @domain submitted prior to @cookie * have been done. */ void async_synchronize_cookie_domain(async_cookie_t cookie, struct async_domain *domain) { ktime_t starttime; pr_debug("async_waiting @ %i\n", task_pid_nr(current)); starttime = ktime_get(); // 获取pending链表中排在最前面的尚未执行的异步任务的id,然后跟传入的cookie比较 // 同一个pending链表中的异步任务按照FIFO的顺序排序,而且id也是小的排在前面 // 注意:这里wait返回的条件用的是>=,即如果等于的话,也会返回,但是此时意味着 // 这里cookie对应的异步任务还在pending,即还没有执行或者执行完毕 // 因此,在实际使用时,传入的cookie要比实际要等待的异步任务的id大 wait_event(async_done, lowest_in_progress(domain) >= cookie); pr_debug("async_continuing @ %i after %lli usec\n", task_pid_nr(current), microseconds_since(starttime)); } EXPORT_SYMBOL_GPL(async_synchronize_cookie_domain);
这里需要注意:这里wait_event返回的条件用的是>=,即如果等于的话,也会返回,但是此时意味着cookie对应的异步任务还在pending,即还没有执行或者执行完毕。因此,在实际使用时,传入的cookie应该大于实际要等待的异步任务的id,下面是从内核中找到的几个示例程序:
示例1:
示例2:
而对于full这种等待,设置的cookie是ASYNC_COOKIE_MAX,对应的值是ULLONG_MAX,导致full的实际效果是,当指定的pengding链表中所有的异步任务都执行完毕后,才可以返回,参考lowest_in_progress
lowest_in_progress
获取指定domain的pending链表或者全局async_global_pending链表中第一个尚未执行的第一个异步任务的id,这些任务是以FIFO的方式在pending链表中排队,只有当任务执行完毕,才会从pending链表中删除。
点击查看代码
static async_cookie_t lowest_in_progress(struct async_domain *domain) { struct async_entry *first = NULL; // 当pending链表中的异步任务都处理完毕,那么会返回值这个值,此时full这种形式的同步才能返回 async_cookie_t ret = ASYNC_COOKIE_MAX; unsigned long flags; spin_lock_irqsave(&async_lock, flags); if (domain) { // 如果指定了domain if (!list_empty(&domain->pending)) // 如果非空,返回排在pending链表最开始的一个等待执行的异步任务 first = list_first_entry(&domain->pending, struct async_entry, domain_list); } else { // 没有指定domain,也就是full形式的等待 if (!list_empty(&async_global_pending))// 如果非空,返回排在pending链表最开始的一个等待执行的异步任务 first = list_first_entry(&async_global_pending, struct async_entry, global_list); } if (first) // 如果找到排在最开头的等待执行的异步任务,返回它的id ret = first->cookie; spin_unlock_irqrestore(&async_lock, flags); return ret; }
完。