上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。
TG的主要成员
讲到TG先看TG的主要成员:
size_t _steal_seed; size_t _steal_offset; ContextualStack* _main_stack; bthread_t _main_tid; WorkStealingQueue<bthread_t> _rq; RemoteTaskQueue _remote_rq;
每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。
每个TG有两个TM的队列:rq和remote_rq,它们之间有啥区别呢?
rq 与 remote_rq
通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。
再看一下ready_to_run_remote注释:
// Push a bthread into the runqueue from another non-worker thread. void ready_to_run_remote(bthread_t tid, bool nosignal = false);
在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。
再看下ready_to_run_remote()的调用的地方。
在butex_wake()中:
TaskGroup* g = tls_task_group; if (g) { TaskGroup::exchange(&g, bbw->tid); } else { bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid); }
在start_background()中:
template <bool REMOTE> int TaskGroup::start_background(bthread_t* __restrict th, const bthread_attr_t* __restrict attr, void * (*fn)(void*), void* __restrict arg) { ... ... if (REMOTE) { ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL)); } else { ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL)); } return 0; }
start_background<>()
模板参数为true的时候被调用ready_to_run_remote()
。而在start_from_non_worker()
中,会调用start_background<true>()
!
好了,言归正传。
TaskGroup::run_main_task()
run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。
void TaskGroup::run_main_task() { ... TaskGroup* dummy = this; bthread_t tid; while (wait_task(&tid)) { TaskGroup::sched_to(&dummy, tid); DCHECK_EQ(this, dummy); DCHECK_EQ(_cur_meta->stack, _main_stack); if (_cur_meta->tid != _main_tid) { TaskGroup::task_runner(1/*skip remained*/); } ... } // Don't forget to add elapse of last wait_task. current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns; }
死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task()的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);
由此观之,我们发现三个关键函数:wait_task()
、sched_to()
、 task_runner()
简述一下他们的基本功能:
- wait_task():找到一个任务。其中会涉及工作窃取(work stealing)。
- sched_to():进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。
- task_runner():执行任务。
现在我们的观察视角终于可以切入到“work stealing”了。
工作窃取(work stealing)
首先声明,work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡说的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。
20世纪90年代,MIT的Charles E. Leiserson 教授发起并指导了CILK项目。该项目发表了许多论文,启发了各种使用“工作窃取”的基于任务的调度器。
TaskGroup::wait_task()
看源码,这里简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关
bool TaskGroup::wait_task(bthread_t* tid) { do { if (_last_pl_state.stopped()) { return false; } _pl->wait(_last_pl_state); if (steal_task(tid)) { return true; } } while (true); }
_pl是ParkingLot*类型,_last_plstate是pl中的state。关于它俩的更多介绍,后面会有其他文章。
_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。
TaskGoup::steal_task() bool steal_task(bthread_t* tid) { if (_remote_rq.pop(tid)) { return true; } _last_pl_state = _pl->get_state(); return _control->steal_task(tid, &_steal_seed, _steal_offset); }
首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。
视角从TG跳出,来看一看TC的steal_task()
TaskControl::steal_task
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/); if (0 == ngroup) { return false; } // NOTE: Don't return inside `for' iteration since we need to update |seed| bool stolen = false; size_t s = *seed; for (size_t i = 0; i < ngroup; ++i, s += offset) { TaskGroup* g = _groups[s % ngroup]; // g is possibly NULL because of concurrent _destroy_group if (g) { if (g->_rq.steal(tid)) { stolen = true; break; } if (g->_remote_rq.pop(tid)) { stolen = true; break; } } } *seed = s; return stolen; }
可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。
通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。
那么当前TG的rq是什么时候被消费的呢?
在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。
TaskGroup::task_runner()
void TaskGroup::task_runner(intptr_t skip_remained) { TaskGroup* g = tls_task_group; if (!skip_remained) { while (g->_last_context_remained) { RemainedFn fn = g->_last_context_remained; g->_last_context_remained = NULL; fn(g->_last_context_remained_arg); g = tls_task_group; } ... }
在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):
do { // Meta and identifier of the task is persistent in this run. TaskMeta* const m = g->_cur_meta; ... // 执行TM(bthread)中的回调函数 void* thread_return; try { thread_return = m->fn(m->arg); } catch (ExitException& e) { thread_return = e.value(); } // Group is probably changed g = tls_task_group; // TODO: Save thread_return (void)thread_return; ... 日志 // 清理 线程局部变量(下面是原注释) // Clean tls variables, must be done before changing version_butex // otherwise another thread just joined this thread may not see side // effects of destructing tls variables. KeyTable* kt = tls_bls.keytable; if (kt != NULL) { return_keytable(m->attr.keytable_pool, kt); // After deletion: tls may be set during deletion. tls_bls.keytable = NULL; m->local_storage.keytable = NULL; // optional } // 累加版本号,且版本号不能为0(下面是原注释) // Increase the version and wake up all joiners, if resulting version // is 0, change it to 1 to make bthread_t never be 0. Any access // or join to the bthread after changing version will be rejected. // The spinlock is for visibility of TaskGroup::get_attr. { BAIDU_SCOPED_LOCK(m->version_lock); if (0 == ++*m->version_butex) { ++*m->version_butex; } } // 唤醒joiner butex_wake_except(m->version_butex, 0); // _nbthreads减1(注意_nbthreads不是整型) g->_control->_nbthreads << -1; g->set_remained(TaskGroup::_release_last_context, m); // 查找下一个任务,并切换到其对应的运行时上下文 ending_sched(&g); } while (g->_cur_meta->tid != g->_main_tid);
do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。
在ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。
然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。
好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,下一篇文章,我来带大家解读sched_to()。之所以把它单独成篇,是因为会涉及一些汇编的知识,读起来可能晦涩艰深。大家做好准备!