手撸一款简单高效的线程池(五)

简介: 在之前的内容中,我们给大家介绍了 C++实现线程池过程中的一些常用线优化方案,并分析了不同机制使用时的利弊。这一篇,是线程池系列的最后一章。我们会介绍一下 CGraph 中的 threadpool 如何使用,给出性能对比,并对接下来的工作做一些展望。让我们在线程池性能优化和功能提升的道路上,越走越远。

在之前的内容中,我们给大家介绍了 C++实现线程池过程中的一些常用线优化方案,并分析了不同机制使用时的利弊。这一篇,是线程池系列的最后一章。我们会介绍一下 CGraph 中的 threadpool 如何使用,给出性能对比,并对接下来的工作做一些展望。让我们在线程池性能优化和功能提升的道路上,越走越远。


大家好,我是不会写代码的纯序员——Chunel Feng。各位绅士们,很高兴又在这里跟大家见面了。


在之前的几篇文章里,我们先是讨论了原生 C++对多线程编程支持的匮乏,然后又分别从线程调度层面和工程实现层面,给介绍了一些线程池优化的实用思路,主要包括:local-thread 机制、lock-free 机制、work-stealing 机制、自动扩缩容机制、批量处理机制、负载均衡机制、避免 busy-waiting、分支预测优化和减少无用 copy。真心希望以上内容,会对大家有所帮助。


本章内容,是这个线程池系列的最后一章。我们会先介绍一下 CGraph 中 threadpool 的使用 demo,并提供出来实测的一些数据,来佐证我们之前的各种努力,的确达到了优化的效果。同时也会畅想一下今后可能的优化方向和扩展点,算是从起因、经过、结果和展望这几个大的方面,形成一个完整的“闭环”吧。


首先,还是照例,先上源码链接:CGraph 源码链接[1] 其中,线程池的实现在 /src/UtilsCtrl/ThreadPool/ 文件夹中


使用 Demo


首先,来看一个简单的使用 demo 吧。


我们在线程池优化第一章中,立 flag 的时候说过,threadpool 要可以支持任意入参和返回值的任务执行,并且简单好上手。下面这段代码,分别定义了 4 种不同类型的函数:普通函数,静态函数,类成员函数,类成员静态函数


int add(int i, int j) {    return i + j;}static float minusBy5(float i) {    return i - 5.0f;}class myFunction {public:    std::string pow2(std::string& str) const {        int result = 1;        int pow = power_;        while (pow--) {            result *= (int)atoi(str.c_str());        }        return "pow2 result is : " + std::to_string(result);    }    static int divide(int i, int j) {        if (0 == j) {            return 0;        }        return i / j;    }    int power_ = 2;};


下面一段代码,主要展示了如何通过 CGraph 中的 threadpool 来执行以上几种函数,并且说明了如何做阻塞等待。


void tutorial_threadpool() {    UThreadPoolPtr tp = UThreadPoolSingleton::get();    // 通过单例方式获取    int i = 6, j = 3;    std::string str = "5";    myFunction mf;    /**     * 可以通过lambda表达式传递函数     * 也可以传入任意多个参数     * 方法返回值也可以是任意类型     */    auto r1 = tp->commit([i, j] { return add(i, j); });    auto r2 = tp->commit(std::bind(minusBy5, 8.5f));    auto r3 = tp->commit(std::bind(&myFunction::pow2, mf, str));    std::future<int> r4 = tp->commit(std::bind(&myFunction::divide, i, j));    // commit()返回值,实际上是一个std::future<T>类型    /**     * 返回值可以是int、string等各种类型     * 调用get()方法,表示阻塞等待该函数执行完毕     * 不调用get()方法,表示不阻塞等待     */    std::cout << r1.get() << std::endl;    // std::cout << r2.get() << std::endl;    // 不阻塞等待该函数执行完毕    std::cout << r3.get() << std::endl;    std::cout << r4.get() << std::endl;}


代码行数不多哈,整体也比较 easy,也没啥好说的。从我实际做(B)开(U)发(G)的经验来说,支持以上这几种函数,就可以完成基本所有方法的调度了。


需要强调的一点就是,针对 threadpool 这一块,我进行过亿次以上的任务写入测试,功能也都是稳定可靠的。


22.png


性能对比


一波骚操作之后,我们终于还是要出一下性能测试报告的。在这里捏,我最后说一遍,多线程实际执行的情况复杂,实际性能数据需要带入实际环境大量实测。


我们接下来的测试数据,主要就是跟最常见的one input, one output的 threadpool 写法进行对比,具体链接我就不上了,大家 github 或者*乎上直接搜索一下就好。


测试的方法呢,主要就是高并发大批量空跑return 0;的任务,从而考察线程池的调度能力。还有要说明的,就是实验结果并非单纯测试线程池本身,而是通过在 CGraph 框架中,并发跑 并发数 个任务,然后等待批量任务执行完毕后再跑下一轮,一共跑 执行次数 轮。


void cgraph_threadpool_test() {    CGRAPH_ECHO("cur thread size is [%d], and max thread size is [%d].", CGRAPH_DEFAULT_THREAD_SIZE,CGRAPH_MAX_THREAD_SIZE);    GPipelinePtr pipeline = GPipelineFactory::create();    int thd = 2;    int times = 5000000;    for (int i = 0; i < thd; i++) {        GElementPtr ptr = nullptr;        // 其中,HelloWorldNode执行内容为:return 0;        pipeline->registerGElement<HelloWorldNode>(&ptr);    }    time_t start = time(nullptr);    pipeline->process(times); // 运⾏pipeline    time_t end = time(nullptr);    CGRAPH_ECHO("[%d] node for one time, run [%d] times, ts is [%d].", thd, times, end-start);    GPipelineFactory::destroy(pipeline); }


由于常规线程池并不包含自动扩缩容机制,在测试过程中,也是保证 CGraph 中的线程数量 max 和 default 值也设置为相同的。


23.png


测试的结果,更多是在比较 CGraph 在使用传统线程池和优化后的线程池时,空跑调度任务的性能差距。旨在说明以上提出的优化方案切实有效。以上所有结果,均验证 2 次以上,不同配置的机器上可能略有差别。


24.png


在这里,也给大家推荐其他几个 C++常用的并行库:openmptbb等,还有很多国内互联网巨头公司提供出来的相关开源组件,使用起来都很简单,大家也可以去尝试看看。


void myPrint(int x) {    std::cout << x << std::endl;}void openmp_version() {    /* openmp版本,并发打印0~99 */    #pragma omp parallel for num_threads(4)    for(int i = 0; i < 100; i++) {        myPrint(i);    }}void tbb_version(){    /* tbb版本,并发打印0~99 */    tbb::parallel_for(0, 100, 1, myPrint);}void cgraph_version() {    /* CGraph版本,并发打印0~99 */    UThreadPoolPtr tp = UThreadPoolSingleton::get();    for (int i = 0; i < 100; i++) {        tp->commit([i] { myPrint(i); });    }}


并发编程是一个很常见的话题。通用的方法很多,也各有优劣,建议大家在选择和使用的时候多思考,多比较。在成长的过程中,不是很建议陷入那种“这个项目之前就是用的 xx 方法,所以我也就用了”的怪圈。当然,如果在现实工作中,考虑到功能稳定性、一致性、快速迭代和可维护性的话,那就是另外的话题了。


TODO


以上说了这么多,都是在说好的地方:做了哪些优化,加了哪些功能。在快要结束的时候,也要总结一下,CGraph 中的 threadpool 中还有哪些功能的缺失和不足


任务优先级


我们之前提过,local-thread 机制和批量执行机制,会使得 threadpool 中的任务执行顺序,和实际写入顺序变的不一致。关于这一点,我们提供了CGRAPH_FAIR_LOCK_ENABLE参数(默认为false),如果设置为true则任务执行顺序和任务写入顺序保持一致。


不过,还有一个很实际的问题:有些情况下,不同任务的优先级就是不同的,优先级高的任务,理应优先被执行。这个时候,线程池中任务的存储方式就应该从queue改为priority_queue了,当然肯定还会牵扯到一些其他的改动,有兴趣的话可以尝试去实现一下。


拒绝策略


任务写入 threadpool 中,是瞬间的动作,但是有些任务执行起来就需要很长的时间,比如:sleep(100);。当线程池中源源不断的写入大量任务,却无法及时消费的时候,是可能引发各种意想不到的问题,甚至程序崩溃的。所以,一个优秀的线程池中还应该有拒绝策略,比如:内部未执行任务超过 1 爽(约 208w)之后,就外部任务就无法继续写入了。


拒绝策略,又可以区分为严格的拒绝策略和宽松的拒绝策略。严格,主要体现在写入任务的瞬间,如果 pool 中的任务数量正好是 1 爽的时候,就拒绝;宽松,就可以是 pool 中的任务数量超过 1 爽之后的若干秒后,pool 开始拒绝外部写入,直到其中任务被消费到小于 1 爽之后的若干秒后,又恢复正常。


严格和宽松,没有绝对的优劣之分,只有各自适合和不适合的场景。如果有时间,可以尝试去实现一下。


加入 cas 机制


我们之前也提过,cas 属于无锁编程技术的一种。再次申明,并不是通过无锁编程的实现方式,性能就一定高于有锁编程。不过,这也是一个很值得尝试的优化点,而且理论上有相对更优的性能。等排期哦哦哦,哈哈。


线程绑定运行


我们之前聊过一些线程亲和性的问题。没提到的一点是,一个线程是可以绑定在特定 cpu 核上运行的,这样做也可以一定程度上提升亲和性。这个功能在 Linux 上实现相对简单,但是我并没有找到什么跨平台的方法来实现这个功能。有兴趣的朋友也可以自己试试看。


25.png


本章小结


本章节是我们关于线程池优化的最后一篇了,我把个人对线程池的理解,基本上都写在了这几篇文章里了。并发编程本身博大精深,而个人水平和时间都有限,有些地方没能讲解的非常深入。在写文章的过程中,也竭尽全力对自己的每一处内容做了验证,力求正确无误,却仍可能存在一些认知的盲区和错误的描述,希望大家不吝指教。


虽然还存在诸多的不完美,但还是非常开心能从一而终的把代码和文章写完,并在 github 上和这里跟大家分享。C++ 线程池我之前在工作中用到过,但是并没有做过什么优化和改进。通过这一个月自己闲暇时间的摸索和学习吧,我个人是有了一些自己的积累和进步,也希望可以通过分享,对大家有一丝的帮助和启发。


在这个过程中,也有幸得到了阿里云、搜狗和旷视等一流公司资深大佬的一些指导,在这里再次深表感谢。  如果说自己的既有行动是“99%的努力”的话,那高人的点拨更像是“1%的灵感”,让我受益匪浅,足以提升一个层次,今后还请多多指教。


也要感谢我们产品的童鞋,你并没有什么超前的意识,所以也提不出来什么过分的需求,让我在日常工作之余还有一些自己的时间,来做相关的事情。还要感谢我身边的一位同事,在间歇性架构升级和持续性相互撕逼的常规操作中,默默的帮我挡掉了一些小小的需求。今后的日子里,两位多多保重,去你们的(更广阔的舞台)吧,哈哈。最后,更要感谢我的女神,在这期间一直不回我微信,也约不出来,好让我有了足够多的时间来撸代码和写文章。


26.png


最近,也通过写博客和公众号,认识了不少新朋友。很高兴认识大家,很感谢大家的关注和一键三连。同样感谢几位朋友的打赏,就像一杯午后的咖啡,不贵,但很暖心。

最后的最后,下面是我的微信二维码,欢迎大家加我微信,随时交流聊天,多多指教。添加的时候,请加上简单个人备注信息,不然的话,一律标注为“大姨妈介绍的第 N 个相亲对象” ——我会时不时去撩你的哦,嘿嘿。


推荐阅读


如何实现一个图化框架?代码已开源!

一文看懂开源图化框架中的循环设计逻辑!

手撸一款简单高效的线程池(一)

手撸一款简单高效的线程池(二)

手撸一款简单高效的线程池(三)

手撸一款简单高效的线程池(四)


引用链接


[1] CGraph 源码链接: https://github.com/ChunelFeng/CGraph

目录
相关文章
|
4小时前
|
缓存 Java 程序员
程序员的金三银四:创建线程池有哪几种方式?
程序员的金三银四:创建线程池有哪几种方式?
18 0
|
4小时前
|
存储 缓存 Oracle
Java线程池,白话文vs八股文,原来是这么回事!
一、线程池原理 1、白话文篇 1.1、正式员工(corePoolSize) 正式员工:这些是公司最稳定和最可靠的长期员工,他们一直在工作,不会被解雇或者辞职。他们负责处理公司的核心业务,比如生产、销售、财务等。在Java线程池中,正式员工对应于核心线程(corePoolSize),这些线程会一直存在于线程池中。他们负责执行线程池中的任务,如果没有任务,他们会等待新的任务到来。 1.2、所有员工(maximumPoolSize) 所有员工:这些是公司所有的员工,包括正式员工和外包员工。他们共同组成了公司的团队,协作完成公司的各种业务。在Java线程池中,所有员工对应于所有线程(maxim
|
4小时前
|
存储 安全 Java
手撕线程池与性能测试
手撕线程池与性能测试
67 0
|
10月前
|
SQL 算法 Java
直击灵魂!美团大牛手撸并发原理笔记,由浅入深剖析JDK源码
并发编程这四个字想必大家最近都在网上看到过有很多的帖子在讨论。我们都知道并发编程可选择的方式有多进程、多线程和多协程。在Java中,并发就是多线程模式。而多线程编程也一直是一个被广泛而深入讨论的领域。如果遇到复杂的多线程编程场景,大多数情况下我们就需要站在巨人的肩膀上利用并发编程框架——JDK Concurrent包来解决相关线程问题。
|
消息中间件 JavaScript 小程序
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
面试官:小伙子我们先来唠唠并发编程的几大核心知识点
并发编程算是Java的一个难点,经常做业务相关的程序员基本上用不到juc的包,但是这些知识点十分重要,所以不管在哪里,时刻保持学习真的很重要。
|
消息中间件 存储 前端开发
面试官让我手写队列,差点没写出来,回来后赶忙把重点记下来
栈和队列是一对好兄弟,前面我们介绍过一篇栈的文章(栈,不就后进先出),栈的机制相对简单,后入先出,就像进入一个狭小的山洞,山洞只有一个出入口,只能后进先出(在外面的先出去,堵在里面先进去的就有点倒霉)。而队列就好比是一个隧道,后面的人跟着前面走,前面人先出去(先入先出)。日常的排队就是队列运转形式的一个描述!
89 0
面试官让我手写队列,差点没写出来,回来后赶忙把重点记下来
|
设计模式 安全 Java
重生之我在人间敲代码_Java并发基础_浅析并发编程
并发编程可以抽象为三个核心问题:分工、同步、互斥。 所谓分工指的是如何高效地拆解任务并分配给线程,而同步指的是线程之间如何协作,互斥则是保证同一时刻只允许一个线程访问共享资源。
|
人工智能 负载均衡 算法
手撸一款简单高效的线程池(四)
在前几章的内容中,我们给大家介绍了一些 C++线程池中的优化思路和实现方案。这一章中,我们来聊一聊在编程实现过程中,一些工程层面的优化。让我们的代码执行的速度,跟得上自己的思路。
215 0
手撸一款简单高效的线程池(四)
|
缓存 负载均衡 前端开发
手撸一款简单高效的线程池(一)
线程池大家应该都用过,不过如何从 0 到 1 的设计一款简单好用且性能较好的线程池?我们在接下来的几篇文章中,为您一一介绍。
443 0
手撸一款简单高效的线程池(一)