bthread源码剖析(二): 工作窃取与TaskGroup的run_main_task()

简介: 上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。

上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。


TG的主要成员


讲到TG先看TG的主要成员:


size_t _steal_seed;
    size_t _steal_offset;
    ContextualStack* _main_stack;
    bthread_t _main_tid;
    WorkStealingQueue<bthread_t> _rq;
    RemoteTaskQueue _remote_rq;


每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。


每个TG有两个TM的队列:rq和remote_rq,它们之间有啥区别呢?


rq 与 remote_rq


通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。


再看一下ready_to_run_remote注释:


// Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);


在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。


再看下ready_to_run_remote()的调用的地方。


在butex_wake()中:


TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
    }


在start_background()中:


template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
...
...
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}


start_background<>()模板参数为true的时候被调用ready_to_run_remote()。而在start_from_non_worker()中,会调用start_background<true>()


好了,言归正传。


TaskGroup::run_main_task()


run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。


void TaskGroup::run_main_task() {
    ...
    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
        ...
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}


死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task()的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);


由此观之,我们发现三个关键函数:wait_task()sched_to()task_runner()


简述一下他们的基本功能:


  1. wait_task():找到一个任务。其中会涉及工作窃取(work stealing)。


  1. sched_to():进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。


  1. task_runner():执行任务。


现在我们的观察视角终于可以切入到“work stealing”了。


工作窃取(work stealing)


首先声明,work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡说的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。


20世纪90年代,MIT的Charles E. Leiserson 教授发起并指导了CILK项目。该项目发表了许多论文,启发了各种使用“工作窃取”的基于任务的调度器。


TaskGroup::wait_task()


看源码,这里简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关


bool TaskGroup::wait_task(bthread_t* tid) {
    do {
        if (_last_pl_state.stopped()) {
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
        }
    } while (true);
}


_pl是ParkingLot*类型,_last_plstate是pl中的state。关于它俩的更多介绍,后面会有其他文章。


_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。


TaskGoup::steal_task()
    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
        _last_pl_state = _pl->get_state();
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }


首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。

视角从TG跳出,来看一看TC的steal_task()


TaskControl::steal_task


bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }
    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}


可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。


通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。


那么当前TG的rq是什么时候被消费的呢?


在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。


TaskGroup::task_runner()


void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;
    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
     ...
     }


在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):


do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;
        ... 
        // 执行TM(bthread)中的回调函数
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        // Group is probably changed
        g = tls_task_group;
        // TODO: Save thread_return
        (void)thread_return;
        ... 日志
        // 清理 线程局部变量(下面是原注释)
        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL; // optional
        }
        // 累加版本号,且版本号不能为0(下面是原注释)
        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        // 唤醒joiner
        butex_wake_except(m->version_butex, 0);
        // _nbthreads减1(注意_nbthreads不是整型)
        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        // 查找下一个任务,并切换到其对应的运行时上下文
        ending_sched(&g);
    } while (g->_cur_meta->tid != g->_main_tid);


do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。


在ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。


然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。


好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,下一篇文章,我来带大家解读sched_to()。之所以把它单独成篇,是因为会涉及一些汇编的知识,读起来可能晦涩艰深。大家做好准备!

相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
6月前
|
消息中间件
每日一博 - 图解进程(Process)和线程(Thread)区别联系
每日一博 - 图解进程(Process)和线程(Thread)区别联系
38 0
|
3月前
|
Java
多线程中的run方法和start方法有什么区别?
多线程中的run方法和start方法有什么区别?
|
7月前
|
安全 Java API
为什么启动线程不直接调用run(),而要调用start(),如果调用两次start()方法会有什么后果
1位工作6年的小伙伴去某里P6一面,被问到这样一道面试题,说,为什么启动一个线程不直接调用run()方法,而要调用start()方法来启动,如果调用两次start()会有什么后果?
61 0
|
10月前
|
Java
Java线程中的run()和start()区别
Java线程中的run()和start()区别
62 0
|
调度 C语言 芯片
RT-Thread记录(二、RT-Thread内核启动流程 — 启动文件和源码分析)
今天就在前面我们RT-Thread Studio工程基础之上讲一讲RT-Thread内核启动流程
541 0
RT-Thread记录(二、RT-Thread内核启动流程 — 启动文件和源码分析)
|
存储 Java 编译器
进程和线程的区别&&run和start区别与联系
进程和线程的区别&&run和start区别与联系
|
测试技术
loadrunner 技巧-模拟Run Logic中的随机Action运行
loadrunner 技巧-模拟Run Logic中的随机Action运行
61 0
|
Java API
多线程中run()和start()的异同详细分析(全)
目录前言1. 定义2. 代码区别 前言 为什么不直接调用线程中的run方法,而要通过调用start方法才可以实现线程的异步执行互不干扰? run方法和start的调用方法区别在于哪? 相信看到此处的人们,都会有类似的疑问,这篇文章就为你打开新世界,铭记脑海中 1. 定义 查看其官方的api接口定义 start方法 “ “Causes this thread to begin execution; the Java Virtual Machine calls the run method of this t
129 0
多线程中run()和start()的异同详细分析(全)
|
Oracle Java 关系型数据库
聊一聊多线程的 run() 和 start(),挖一挖start0
聊一聊多线程的 run() 和 start(),挖一挖start0
119 0
聊一聊多线程的 run() 和 start(),挖一挖start0
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
248 0