内核异步处理技术 —— async_schedule

简介: 内核异步处理技术 —— async_schedule

内核代码

用途

Linux内核中的异步函数调用机制,其目标是通过并行执行来减少内核启动时间。在系统启动时,一些操作(如硬件延迟和发现操作)可以以非严格的顺序并行执行,从而实现异步处理。

该机制的关键概念是“序列cookie”(可以视为单调递增的数字)。异步核心将为每个调度事件分配一个序列cookie,并将其传递给被调用的函数。

被异步调用的函数在执行全局可见的操作(如注册设备号)之前,应调用async_synchronize_cookie()函数并传递其自己的cookie。async_synchronize_cookie()函数将确保在对应于该cookie的操作之前安排的所有异步操作都已完成。

子系统/驱动程序初始化代码中,如果安排了异步探测函数,但与其他不使用异步调用功能的驱动程序/子系统共享全局资源,则需要在其init函数返回之前使用async_synchronize_full()函数进行完全同步。这是为了在异步和同步部分之间维护严格的顺序。

底层是基于工作队列,可以指定等待某个异步域(async_domain)中地哪些或者哪个任务(async_entry)执行完毕。

接口

定义异步域

Linux内核有提供了默认的异步域async_dfl_domain,此外,也提供了下面的两个宏接口来静态定义一个异步域:

/*
 * domain participates in global async_synchronize_full
   添加到这种domain中的任务,可以调用async_synchronize_full来等待其执行完毕
 */
#define ASYNC_DOMAIN(_name) \
  struct async_domain _name = { .pending = LIST_HEAD_INIT(_name.pending), \
              .registered = 1 }
/*
 * domain is free to go out of scope as soon as all pending work is
 * complete, this domain does not participate in async_synchronize_full
  添加到这种domain中的任务,不能调用async_synchronize_full来等待其执行完毕
 */
#define ASYNC_DOMAIN_EXCLUSIVE(_name) \
  struct async_domain _name = { .pending = LIST_HEAD_INIT(_name.pending), \
              .registered = 0 }

上面这两种async_domain的区别是registered的值不同:

  • 如果registered是1,表示在将任务添加到这个domain时,会把该任务挂到这个domain私有的pending链表,同时还会将其添加到全局的async_global_pending链表,这样,可以调用async_synchronize_full来等待任务执行完毕,这个接口会检查async_global_pending链表中的异步任务是否都已经执行完毕,此外也可以调用async_synchronize_full_domain来等待指定domain的私有pending链表中的异步任务全部执行完毕
  • 如果registered是0,那么就不能使用async_synchronize_full了,但是可以使用async_synchronize_full_domain

系统默认的async_dfl_domain就是采用ASYNC_DOMAIN宏来定义的,所以既可以用async_synchronize_full,也可以用async_synchronize_full_domain。

注册异步任务

接口 Domain
async_schedule 全局默认的async_dfl_domain
async_schedule_node 全局默认的async_dfl_domain,不过可以通过node来指定任务运行在哪个cpu上
async_schedule_dev 全局默认的async_dfl_domain,跟上一个不同之处是node来自传入的device参数
async_schedule_domain 自定义的domain
async_schedule_node_domain 自定义的domain,不过可以通过node来指定任务运行在哪个cpu上
async_schedule_dev_domain 自定义的domain,跟上一个不同之处是node来自传入的device参数

在异步任务成功注册后,会返回一个全局唯一的id,称之为cookie,这是一个u64位的数字,单调递增,用于唯一标识这个异步任务,下面可以用来精确地等待某个异步任务执行完毕。

异步任务按照先入先出(FIFO)的方式按顺序执行,如果不考虑回绕,那么排在前面的异步任务的ID小于排在后面的。

下面是注册异步任务的接口的实现关系:

等待任务执行完毕

接口 返回条件 Cookie
async_synchronize_full 等待全局链表async_global_pending中的异步任务全部执行完毕 ASYNC_COOKIE_MAX
async_synchronize_full_domain 等待domain的私有pending链表中的异步任务全部执行完毕 ASYNC_COOKIE_MAX
async_synchronize_cookie 排在async_dfl_domain私有的pending链表最前面的等待执行的异步任务的id大于等于传入的cookie 传入的cookie参数
async_synchronize_cookie_domain 排在指定domain私有的pending链表最前面的等待执行的异步任务的id大于等于传入的cookie 传入的cookie参数

源码分析

async_schedule_node_domain

注册异步任务。

点击查看代码

/**
 * async_schedule_node_domain - NUMA specific version of async_schedule_domain
 * @func: function to execute asynchronously
 * @data: data pointer to pass to the function
 * @node: NUMA node that we want to schedule this on or close to
 * @domain: the domain
 *
 * Returns an async_cookie_t that may be used for checkpointing later.
 * @domain may be used in the async_synchronize_*_domain() functions to
 * wait within a certain synchronization domain rather than globally.
 *
 * Note: This function may be called from atomic or non-atomic contexts.
 *
 * The node requested will be honored on a best effort basis. If the node
 * has no CPUs associated with it then the work is distributed among all
 * available CPUs.
 */
async_cookie_t async_schedule_node_domain(async_func_t func, void *data,
            int node, struct async_domain *domain)
{
  struct async_entry *entry;
  unsigned long flags;
  async_cookie_t newcookie;
  /* allow irq-off callers
     这个函数允许从原子上下文调用
  */
  entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC);
  /*
   * If we're out of memory or if there's too much work
   * pending already, we execute synchronously.
   
     如果entry分配失败,或者当前系统中等待执行的异步任务的个数大于32768时,不再使用异步处理,而是改用同步处理,然后返回这个异步任务的id,不过这个异步任务此时已经执行完毕了。
   */
  if (!entry || atomic_read(&entry_count) > MAX_WORK) {
    kfree(entry);
    spin_lock_irqsave(&async_lock, flags);
    // 获取全局唯一的cookie,这里用自旋锁保护这个全局变量
    newcookie = next_cookie++;
    spin_unlock_irqrestore(&async_lock, flags);
    /* low on memory.. run synchronously
       如果内存不足或者等待处理的异步任务过多,改用同步处理
    */
    func(data, newcookie);
    return newcookie;
  }
  INIT_LIST_HEAD(&entry->domain_list);
  INIT_LIST_HEAD(&entry->global_list);
  // 使用工作队列,设置任务的处理回调函数async_run_entry_fn
  INIT_WORK(&entry->work, async_run_entry_fn);
  entry->func = func;
  entry->data = data;
  entry->domain = domain;
  // 这里通过全局自旋锁保证了id的分配和添加到链表的原子性,实现了一个FIFO,而且排在前面的任务的id小于后面的
  spin_lock_irqsave(&async_lock, flags);
  /* allocate cookie and queue
     分配全局唯一的id,这是一个u64的数字,单调递增
  */
  newcookie = entry->cookie = next_cookie++;
  // 将异步任务添加到domain私有的pending链表,这里实现了一个FIFO,先入先出
  list_add_tail(&entry->domain_list, &domain->pending);
  // 如果这个domain的registered为1,那么同时将这个异步任务添加到全局链表async_global_pending中
  if (domain->registered)
    list_add_tail(&entry->global_list, &async_global_pending);
  // 统计等待处理的异步任务的个数
  atomic_inc(&entry_count);
  spin_unlock_irqrestore(&async_lock, flags);
  /* schedule for execution
     提交异步任务,这里node可以控制任务将来在哪些CPU上运行
  */
  queue_work_node(node, system_unbound_wq, &entry->work);
  // 返回一个唯一的id
  return newcookie;
}
EXPORT_SYMBOL_GPL(async_schedule_node_domain);

这里使用底层使用了工作队列来实现异步处理,但是仅仅利用工作队列无法确保指定的异步任务执行完毕。

async_run_entry_fn

工作任务的回调,其中会回调异步任务的处理函数。

点击查看代码

/*
 * pick the first pending entry and run it
 */
static void async_run_entry_fn(struct work_struct *work)
{
  struct async_entry *entry =
    container_of(work, struct async_entry, work);
  unsigned long flags;
  ktime_t calltime;
  /* 1) run (and print duration) */
  pr_debug("calling  %lli_%pS @ %i\n", (long long)entry->cookie,
     entry->func, task_pid_nr(current));
  calltime = ktime_get();
  // 回调异步任务的回调函数,入参data来自用户,cookie是这个任务全局唯一的id
  // 但是这个不能确保回调函数中发起的异步任务执行完毕
  entry->func(entry->data, entry->cookie);
  pr_debug("initcall %lli_%pS returned after %lld usecs\n",
     (long long)entry->cookie, entry->func,
     microseconds_since(calltime));
  /* 2) remove self from the pending queues
     异步任务执行完毕后,将异步任务结构体从domian的私有pending链表和全局的async_global_pending中删除
  */
  spin_lock_irqsave(&async_lock, flags);
  list_del_init(&entry->domain_list);
  list_del_init(&entry->global_list);
  /* 3) free the entry
     释放异步任务结构体,并且更新待处理的异步任务的个数
  */
  kfree(entry);
  atomic_dec(&entry_count);
  spin_unlock_irqrestore(&async_lock, flags);
  /* 4) wake up any waiters
     唤醒等待异步任务执行的进程
  */
  wake_up(&async_done);
}

等待异步任务执行完毕。

点击查看代码

/**
 * async_synchronize_cookie_domain - synchronize asynchronous function calls within a certain domain with cookie checkpointing
 * @cookie: async_cookie_t to use as checkpoint
 * @domain: the domain to synchronize (%NULL for all registered domains)
 *
 * This function waits until all asynchronous function calls for the
 * synchronization domain specified by @domain submitted prior to @cookie
 * have been done.
 */
void async_synchronize_cookie_domain(async_cookie_t cookie, struct async_domain *domain)
{
  ktime_t starttime;
  pr_debug("async_waiting @ %i\n", task_pid_nr(current));
  starttime = ktime_get();
  // 获取pending链表中排在最前面的尚未执行的异步任务的id,然后跟传入的cookie比较
  // 同一个pending链表中的异步任务按照FIFO的顺序排序,而且id也是小的排在前面
  // 注意:这里wait返回的条件用的是>=,即如果等于的话,也会返回,但是此时意味着
  // 这里cookie对应的异步任务还在pending,即还没有执行或者执行完毕
  // 因此,在实际使用时,传入的cookie要比实际要等待的异步任务的id大
  wait_event(async_done, lowest_in_progress(domain) >= cookie);
  pr_debug("async_continuing @ %i after %lli usec\n", task_pid_nr(current),
     microseconds_since(starttime));
}
EXPORT_SYMBOL_GPL(async_synchronize_cookie_domain);

这里需要注意:这里wait_event返回的条件用的是>=,即如果等于的话,也会返回,但是此时意味着cookie对应的异步任务还在pending,即还没有执行或者执行完毕。因此,在实际使用时,传入的cookie应该大于实际要等待的异步任务的id,下面是从内核中找到的几个示例程序:

示例1:

示例2:

而对于full这种等待,设置的cookie是ASYNC_COOKIE_MAX,对应的值是ULLONG_MAX,导致full的实际效果是,当指定的pengding链表中所有的异步任务都执行完毕后,才可以返回,参考lowest_in_progress

lowest_in_progress

获取指定domain的pending链表或者全局async_global_pending链表中第一个尚未执行的第一个异步任务的id,这些任务是以FIFO的方式在pending链表中排队,只有当任务执行完毕,才会从pending链表中删除。

点击查看代码

static async_cookie_t lowest_in_progress(struct async_domain *domain)
{
  struct async_entry *first = NULL;
  // 当pending链表中的异步任务都处理完毕,那么会返回值这个值,此时full这种形式的同步才能返回
  async_cookie_t ret = ASYNC_COOKIE_MAX;
  unsigned long flags;
  spin_lock_irqsave(&async_lock, flags);
  if (domain) { // 如果指定了domain
    if (!list_empty(&domain->pending)) // 如果非空,返回排在pending链表最开始的一个等待执行的异步任务
      first = list_first_entry(&domain->pending,
          struct async_entry, domain_list);
  } else {  // 没有指定domain,也就是full形式的等待
    if (!list_empty(&async_global_pending))// 如果非空,返回排在pending链表最开始的一个等待执行的异步任务
      first = list_first_entry(&async_global_pending,
          struct async_entry, global_list);
  }
  if (first)  // 如果找到排在最开头的等待执行的异步任务,返回它的id
    ret = first->cookie;
  spin_unlock_irqrestore(&async_lock, flags);
  return ret;
}

完。

相关文章
|
4月前
|
Kubernetes API 调度
在k8S中,Scheduler作用及实现原理是什么?
在k8S中,Scheduler作用及实现原理是什么?
|
7月前
|
存储 JavaScript 前端开发
RxJS中的调度器(Scheduler)机制
RxJS中的调度器(Scheduler)机制
200 0
|
vr&ar Swift
大师学SwiftUI第9章Part 1 - 异步并发之Task、Async、Await和错误
苹果系统借助现代处理器的多核可同步执行多条代码,提升同一时间内程序所能执行的任务。例如,一段代码从网上下载文件,另一段代码可以在屏幕上显示进度。此时,我们不能等待第一个执行完后再执行第二个,而必须要同步执行这两个任务。
205 0
|
Java API 调度
ThreadPoolTaskScheduler轻量级多线程定时任务框架
面对一些小的功能需求点,如果需要非常灵活的进行处理定时任务处理,但是又因为你的需求需要使用到页面进行定时任务配置,显然使用Spring注解定时任务,无法满足你,这时你想到了xxl-job 或者 quezy等定时任务框架,但是过于繁琐,可能成本较大。那么本文将要解决你的问题
2366 0
|
消息中间件 前端开发 JavaScript
ES8 中的 async/await —— 异步函数
ES8 中的 async/await —— 异步函数
197 0
|
Java 调度
定时任务@Scheduled 和 异步@Async
定时任务@Scheduled 和 异步@Async
|
前端开发
es6 异步处理之 Promise学习总结
es6 异步处理之 Promise学习总结
|
存储 前端开发 JavaScript
ES2017 异步函数的最佳实践(`async` /`await`)
简单来说,async函数是 promise 的 "语法糖"。它们允许我们使用更熟悉的语法来模拟同步执行,从而代替 promise 链式写法。
ES2017 异步函数的最佳实践(`async` /`await`)
|
调度 数据库
Quartz的Scheduler的关闭和挂起,并发控制(四)中
Quartz的Scheduler的关闭和挂起,并发控制(四)中
1766 0
Quartz的Scheduler的关闭和挂起,并发控制(四)中
下一篇
DataWorks