bthread源码剖析(二): 工作窃取与TaskGroup的run_main_task()

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。

上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。


TG的主要成员


讲到TG先看TG的主要成员:


size_t _steal_seed;
    size_t _steal_offset;
    ContextualStack* _main_stack;
    bthread_t _main_tid;
    WorkStealingQueue<bthread_t> _rq;
    RemoteTaskQueue _remote_rq;


每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。


每个TG有两个TM的队列:rq和remote_rq,它们之间有啥区别呢?


rq 与 remote_rq


通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。


再看一下ready_to_run_remote注释:


// Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);


在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。


再看下ready_to_run_remote()的调用的地方。


在butex_wake()中:


TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
    }


在start_background()中:


template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
...
...
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}


start_background<>()模板参数为true的时候被调用ready_to_run_remote()。而在start_from_non_worker()中,会调用start_background<true>()


好了,言归正传。


TaskGroup::run_main_task()


run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。


void TaskGroup::run_main_task() {
    ...
    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
        ...
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}


死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task()的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);


由此观之,我们发现三个关键函数:wait_task()sched_to()task_runner()


简述一下他们的基本功能:


  1. wait_task():找到一个任务。其中会涉及工作窃取(work stealing)。


  1. sched_to():进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。


  1. task_runner():执行任务。


现在我们的观察视角终于可以切入到“work stealing”了。


工作窃取(work stealing)


首先声明,work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡说的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。


20世纪90年代,MIT的Charles E. Leiserson 教授发起并指导了CILK项目。该项目发表了许多论文,启发了各种使用“工作窃取”的基于任务的调度器。


TaskGroup::wait_task()


看源码,这里简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关


bool TaskGroup::wait_task(bthread_t* tid) {
    do {
        if (_last_pl_state.stopped()) {
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
        }
    } while (true);
}


_pl是ParkingLot*类型,_last_plstate是pl中的state。关于它俩的更多介绍,后面会有其他文章。


_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。


TaskGoup::steal_task()
    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
        _last_pl_state = _pl->get_state();
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }


首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。

视角从TG跳出,来看一看TC的steal_task()


TaskControl::steal_task


bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }
    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}


可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。


通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。


那么当前TG的rq是什么时候被消费的呢?


在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。


TaskGroup::task_runner()


void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;
    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
     ...
     }


在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):


do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;
        ... 
        // 执行TM(bthread)中的回调函数
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        // Group is probably changed
        g = tls_task_group;
        // TODO: Save thread_return
        (void)thread_return;
        ... 日志
        // 清理 线程局部变量(下面是原注释)
        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL; // optional
        }
        // 累加版本号,且版本号不能为0(下面是原注释)
        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        // 唤醒joiner
        butex_wake_except(m->version_butex, 0);
        // _nbthreads减1(注意_nbthreads不是整型)
        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        // 查找下一个任务,并切换到其对应的运行时上下文
        ending_sched(&g);
    } while (g->_cur_meta->tid != g->_main_tid);


do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。


在ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。


然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。


好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,下一篇文章,我来带大家解读sched_to()。之所以把它单独成篇,是因为会涉及一些汇编的知识,读起来可能晦涩艰深。大家做好准备!

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
相关文章
|
安全 项目管理
一文搞懂需求流程规范的制定方法和落地技巧
随着业务和产品的发展、团队的不断扩大,很多团队都不可避免的会遇到需求流程混乱的问题。虽然有的团队也编写了一些“需求流程规范”的文档,但最终却流于纸面,难以在团队真正落地。如何科学制定并有效落实需求管理规范呢?对此,云效产品经理陈逊进行了非常详细的直播分享,本文是他经验的文字总结。
103495 19
|
Java 编译器 API
protobuf万字语法详解
当用protocol buffer编译器来运行.proto文件时,编译器将生成所选择语言的代码,这些代码可以操作在.proto文件中定义的消息类型,包括获取、设置字段值,将消息序列化到一个输出流中,以及从一个输入流中解析消息。
179 0
protobuf万字语法详解
|
网络协议 安全 Android开发
软件丨李跳跳们现在该如何跳呢?
前段时间,李跳跳等软件被某大厂发了律师函,之后,好些个跳广告软件都相继发布公众号说明,停止维护软件,并且下架了相关软件,那我们还能跳吗?该怎么跳呢?
970 0
软件丨李跳跳们现在该如何跳呢?
|
5月前
|
人工智能 自然语言处理 API
MCP与A2A协议比较:人工智能系统互联与协作的技术基础架构
本文深入解析了人工智能领域的两项关键基础设施协议:模型上下文协议(MCP)与代理对代理协议(A2A)。MCP由Anthropic开发,专注于标准化AI模型与外部工具和数据源的连接,降低系统集成复杂度;A2A由Google发布,旨在实现不同AI代理间的跨平台协作。两者虽有相似之处,但在设计目标与应用场景上互为补充。文章通过具体示例分析了两种协议的技术差异及适用场景,并探讨了其在企业工作流自动化、医疗信息系统和软件工程中的应用。最后,文章强调了整合MCP与A2A构建协同AI系统架构的重要性,为未来AI技术生态系统的演进提供了方向。
804 62
|
机器学习/深度学习 PyTorch 算法框架/工具
RNN、LSTM、GRU神经网络构建人名分类器(三)
这个文本描述了一个使用RNN(循环神经网络)、LSTM(长短期记忆网络)和GRU(门控循环单元)构建的人名分类器的案例。案例的主要目的是通过输入一个人名来预测它最可能属于哪个国家。这个任务在国际化的公司中很重要,因为可以自动为用户注册时提供相应的国家或地区选项。
|
负载均衡 监控 Java
Eureka介绍与使用
Eureka介绍与使用
|
关系型数据库 MySQL Linux
关系型数据库mysql的跨平台支持
【6月更文挑战第12天】
524 1
|
SQL 消息中间件 存储
【一文看懂】使用hape部署分布式版Havenask
本次分享内容为使用hape部署分布式版Havenask,共2个部分组成(部署分布式版Havenask集群、 分布式相关问题排查),希望可以帮助大家更好了解和使用Havenask。
165232 6
|
存储 算法 程序员
bthread源码剖析(一): 基本概念与TaskControl初始化
bthread源码剖析(一): 基本概念与TaskControl初始化
851 0
bthread源码剖析(一): 基本概念与TaskControl初始化
|
Prometheus Kubernetes 负载均衡
Kruise Rollout v0.3.0:教你玩转 Deployment 分批发布和流量灰度
Kruise Rollout v0.3.0:教你玩转 Deployment 分批发布和流量灰度
Kruise Rollout v0.3.0:教你玩转 Deployment 分批发布和流量灰度