10分钟,带你掌握C++多线程同步!-阿里云开发者社区

开发者社区> 阿里云SAP上云> 正文
登录阅读全文

10分钟,带你掌握C++多线程同步!

简介: 本文介绍了C++11中如何开启新线程,并详细讲解了线程的基础同步原语。如何采用async, packaged_task和promise实现future同步机制?怎样处理spurious wakeup?本文以质数判定服务为例为大家分享C++多线程同步措施!
摘要:本文介绍了C++11中如何开启新线程,并详细讲解了线程的基础同步原语:mutex, lock_guard, unique_lock, condition variable和semaphore等。如何采用async,  packaged_task和promise实现future同步机制?怎样处理spurious wakeup?本文以质数判定服务为例为大家分享C++多线程同步措施!

数十款阿里云产品限时折扣中,赶紧点击这里领劵开始云上实践吧!

本次直播视频精彩回顾,戳这里! 

演讲嘉宾简介:
陶云峰,阿里云高级技术专家,上海交通大学理论计算机科学博士,专注数据存储、分布式系统与计算等领域,写了20多年程序。2000年参加ACM/ICPC大赛,实现亚洲队伍进World Final前十的突破。

以下内容根据演讲嘉宾视频分享以及PPT整理而成。

本文将围绕一下几个方面进行介绍:
1. C++线程和基础同步原语
  • Thread
  • mutex, lock_guard, unique_lock
  • condition variable, semaphore
2. 高级同步原语:future and async/packaged_task/promise
3. 示例:质数判定服务

一. C++线程和基础同步原语
1. Thread
C++11中怎样开启一个线程呢?以下是一段代码示例:
99dfb2d5d7dec0ae3f9fa6538a4605ec4dc9b4b1
首先新建thread变量,变量声明中第一个参数为函数,第二个参数为函数参数。如果函数没有参数,thread对象便没有第二个参数,同理若函数有两个参数,thread变量声明中便有三个参数。上例中新建了两个thread变量,每个线程变量将输出五个“thread n”。C++11中线程分为可结合的(joinable)和分离的(detached) 。每个joinable线程都对应相应的thread对象, 并且需要使用join来等待其退出,而detached线程没有对应的thread对象,只在后台自主运行。这里不建议大家使用detached线程,因为线程运行时会访问一些对象,而主线程退出时detached线程未必退出,这时线程就非常容易崩溃。上述程序运行结果如下所示:
bc3454c9193ca3c6b7d2f4faaf72293911055981
大家使用线程是为了加速,提高程序并行度从而提高运行速度。但上述程序结果却出乎原本意料,第一行连续打印出两个thread再打印12。出现这种情况的根本原因是C++中的输出操作并不是原子的,一个线程尚未执行完成时另一个线程可以在中间加塞,上例便是线程一首先输出thread后,线程二加塞输出thread,然后线程一输出1,线程二输出2。为了避免这种情况,这里需要一些同步机制。

2. Mutex
Mutex使用如下所示:
5fbe2017f77a5bd8efe6256966154a8423d97e52
首先在线程之外声明mutex变量,在线程进入临界区之前调用该变量的lock()函数,出临界区之前调用unlock(),如此每一行输出的只有一个线程,一共10行,便不会发生上述两行交错的异常。虽然该程序得到了正确的结果,但程序本身并不正确。因为cout输出时理论上会抛出异常,一旦其抛出异常mutex变量的unlock()便不能执行。这意味着该锁没有被释放,整个程序无法进入该临界区,往往程序会挂死。该问题属于异常安全问题,在抛出异常时需要注意一些收尾操作。这也是RAII的设计目标之一,标准库提供了一种RAII锁形式,即lock_guard。

3. Lock_guard
同样首先在线程之外声明mutex变量,在线程进入临界区之前声明lock_guard变量,将mutex变量作为变量传入,在构造函数中会调用该变量的lock(),在析构函数中调用unlock(),如此无论是正常运行结束还是临界区中出现异常都会正常执行锁操作。lock_guard优势是实现简单、使用方便,适用于大多数场景,但存在的问题是使用场景过于简单,无法处理一些精细操作。此时便需要使用unique_lock。
741365813f4f5a57fc39081db869e65f64cc8138

4. unique_lock
unique_lock基本用法和lock_guard一致,在构造函数和析构函数中进行锁操作,不同的地方在于它提供了非常多构造函数。
df28b38c51eda98a5f2ddcb4827b2bf721be89e8
78cdd57afa86562f937ca3d1257f44eea2ba21d8
第一种unique_lock()是默认构造函数,不持有mutex,因此也不做锁操作。unique_lock(unique_lock&&)提供移动mutex的所有权。unique_lock(mutex_type&)持有mutex并上锁,也就是上述实例中采用的构造函数。并且可以加上参数try_to_lock_t,即后一种构造函数,这意味着可以试图上锁,如果不成功仍然持有该mutex,但没有上锁。Defer_lock_t是指持有mutex但不执行上锁操作,adopt_lock_t是指已知该mutex上锁,直接持有该mutex。另外如果该mutex是timed_mutex,可以持有该mutex并尝试上锁一段时间,或者尝试上锁到某个时间点。具体使用方法如下:
bce43a544322e1cde853690de86538a92268471a
在使用unique_lock上锁时,传入try_to_lock参数,try_to_lock在构造完成后,会使用owns_lock()检查是否实际持有这把锁,它一定持有该mutex,但未必持有这把锁。本例中,如果持有锁打印“*”号,不持有则打印“x”号。那么启动50个线程运行时,大多数情况下都持有锁,但是也存在打印“x”号的情况。

5. Condition variable
条件变量是线程间的通知机制。将通过以下示例进行讲解:
5221b84e4841ff18ea31bab0bdc35b584de245b6
条件变量必须配合mutex使用。首先新建全局变量mutex和condition variable在两个线程中使用。线程一中,首先使用unique_lock对mutex加锁,然后将lck传入cv的wait操作。Wait操作首先对锁进行unlock(),然后等待,线程阻塞直至其他线程notify该线程。线程二即为进行notify过程,它调用notify_one()方法,如果此时线程一正处于等待阶段,那么便会通知到,线程一即会醒来,然后重新对mutex上锁。那么该段程序会有几种可能的结果。结果一为上述过程中最期待的结果,线程一处于waiting状态时,线程二进行notify,线程一wake up。结果二是,线程二先开始运行,发送notify,而线程一尚未进行到waiting状态,那么在线程二的角度即没有线程在等待notify,那么该notify便会丢失。此时线程一才刚运行到waiting阶段,在这个角度看来没有其他线程通知,那么线程便会一直处于hang状态。另外一种可能的结果是,线程一运行后到waiting状态,没有notify时, 出于某种原因自行wake up,此时线程二才开始notify。这种自行wakeup的情况不是bug,而是设计中必须存在的,被称为虚假唤醒(Spurious wakeup),因此在使用时必须能够处理这种情形。对于众多可能出现的结果,程序员很难罗列完整,而这种不确定性就是并发编程的本质。每个线程的先后顺序原本就是未知的,因此有多种可能的执行结果。通常而言,大家不需要关注哪些结果是可能出现的,重点要关注哪些结果是不可能的。那么上例中,不可能出现的结果是Waiting和Notify在一行中打印。这种情况只会在线程一打印出Waiting后尚未换行,此时线程二也恰好打印出Notify,然后线程一二再换行。但由于线程中的打印受到锁保护,打印文字和换行否则一同完成,否则都不完成,因此这种情况不可能发生。

6. Semaphore
Condition variable的一个用法是实现信号量。信号量(semaphore)是一种同步机制,但在C++11中并没有原生提供该机制,那么就需要自己去实现。信号量可以想象成一种跨线程安全的资源的计数,包括两个基本操作:post,每调用一次post,这种资源就多一个;wait,每调用一次wait,这种资源便消耗掉一个。如果当前没有这种资源,那么就阻塞等待,直至有其他线程post,该线程才会wakeup。以下便是使用条件变量实现信号量的过程:
24d580687ae6762b319e582da009dab0051f6264
每个信号量带有一个mutex、一个条件变量和一个整型计数器,以及post()和wait()两个方法。
。post()操作中,首先对线程进行加锁,并且将资源数量mAvailable加一,然后通过条件变量对其他线程进行notify_one()操作,如果没有线程接收便直接丢失。需要注意的是notify_one()操作可以置于锁的临界区中,但一般不这样做,因为这会有线程被挂死的风险。Wait()操作中,同样首先对线程加锁,检查资源数量mAvailable是否为0,如果有空余,那么便消耗掉一个,如果没有空余,那么程序便进入wait状态。如果发生虚假唤醒(Spurious wakeup),程序从wait状态中自行wakeup,但仍需要进行资源数量检查,此时mAvailable仍然为0,便不会造成不恰当的消耗。在本例post()中,逻辑上notify_one()和notify_all()都可以使用,但这里使用notify_all()是不正确的,因为notify_all()是将所有等待的线程都唤醒,那么这些线程便需要从操作系统的waiting队列中移动至ready队列中,但只有一个线程能够抢到锁,剩下的所有线程仍然需要被移回waiting队列中,这是非常消耗内核CPU的,因此这里使用notify_one()即可。

二. 高级同步原语:future and async/packaged_task/promise
Future的目标是充分利用CPU的并发性,它只能通过async,promise和package_task三种方式构造。Future只能移动,不可复制,需要复制时可以使用shared_future,但通常不建议使用。调用future的get()时可能会发生阻塞,直到返回值ready。Future有三种姿势的等待:wait()即一直等待直到得到返回值;wait_for()表示设定一个超时时间;wait_until()是等待到某个时间点。Future有一特化版本future<void>,返回值为空,即不返回任何值,因此仅能用于线程间通知,但却是最常用的future。
有时某项工作很早就可以开始做(前置条件都已完备),而等待这件工作结果的任务在非常靠后的位置,这时候就需要async。换言之,如果可以尽早开始做一件事,就让其在后台运行即可,或快或慢都可以,只需在需要结果的时候运行完成就好。例如下载文件,一般文件都比较大,一个HTTP请求并不能完成。HTTP下载都是客户端通知服务器,需要某文件的从某特定位置到另一特定位置的数据。客户端收到一段数据后,需要完成两件事:一,处理这段数据(解压、存盘等);二,请求下一段数据。这两件事是可以并行处理的。
  • 一种方法是启动两个线程,一根负责通讯,一根负责处理。采用之前介绍的同步机制来沟通。
  • 另一种方法是收到一段数据后把“请求下一段数据”放进async中,然后转去处理数据。这种实现方法,数据处理的逻辑比较集中,容易阅读和理解。而通常数据处理的逻辑都比较复杂,打散后更容易出现bug。
1. async
34963e407439db16bd133ef4618f324bc3650a33
在main函数中,使用async方式调用theFinalAnswer函数。theFinalAnswer函数首先进行一段输出,然后等待一秒,再进行一段输出,最后返回一个整型值42。在主线程中,新建future<int>类型变量lazyAns获取theFinalAnswer函数返回值,然后等待100毫秒,输出lazyAns的值。主线程只等待100毫秒,而另一线程需要1秒,因此绝大可能主线程结束时theFinalAnswer尚未结束,那么输出lazyAns值时需要等待,直到另一线程结束,返回返回值。因此在结果中可以看到,首先是theFinalAnswer输出语句“theFinalAnswer is running”,其次主线程运行较快,输出“main is running”,等待theFinalAnswer运行完成,输出“answer is ready”,最后主线程才能输出返回值。因此通过async可以达到延迟计算的目标,即在前置条件满足时,可以计算某一值,而该值是在后续进行一段其他工作后才会使用,越早的计算就可以更充分利用CPU的并发性,即达到future的目标。Async另有一种推迟模式,但此处不做过多介绍。

2. package_task
使用async会将theFinalAnswer置于一独立的线程中单独运行,但很多情况下并不希望另起一个线程,因为线程是非常重要的资源。因此希望可以合理的管理线程资源,这就需要使用线程池。如何将future与线程池同时使用呢?这就需要采用package_task。
d94e49c5850879096ce312ab16988b10e1b36333
package_task本质是将一个函数包装成一个future。如上例所示,这个task类似于std::function,有输入输出,大家可以将其认为是一个异步函数,但该异步函数并不负责执行,而是将其结果预置于一个future变量中,然后交给一个线程来实际执行,此时主线程便可以得到其返回值。

3. promise
由上述示例可见,无论是async还是package_task都是将函数返回值作为写入future内容的手段,但很多情况下,设计者希望future只提供读的接口,而暴露出写的接口。这便是promise的目标,具体见下例:
293c2f93c33b76610643b0a1b1d3966541586365
首先新建promise<bool>型变量,然后得到一个future,在另一线程中将promise传入test_prime执行。test_prime函数是检查输入的参数x是否为质数,若是,就将promise设置为true,若不是,则设置为false。在promise设置完成之后,future便可以得到其值。

三. 示例:质数判定服务
首先将判定服务置于一独立线程中,然后利用request方法将一个整数传入质数判定服务,返回一个future,即判断结果是否为质数,在主线程中打印。最后需要退出程序,即发送0至request中,具体如下所示。
a78a78c8f85ddcfcdbb7293a0d02ad548d657a26
request的具体内容如下所示:
aaf7125de9d373250f189a73d33dbc490c448f9c
该服务使用队列实现,将判定结果(promise<bool>)和需要判定的数值组成tuple,放在队列中,使用mutex加锁保护,条件变量进行消息通知。在request中,新建promise,从中获取结果赋值给future,然后为队列加锁,向队列中插入元素,注意这里使用move将promise移动至队列中,如果不使用move,一旦request函数返回,prm就会失效,同样其对应的future也失效。结束后使用notify_one通知主线程,并且返回res。
prime_service的具体内容如下所示:
c46f5905a22a8254a3090e8fffacdd17291638c7
在prime_service中,首先为保护队列加锁,然后需要等待request消息,如果reqs为空,那么会处于wait状态。如果此处发生虚假唤醒,即队列为空、没有线程请求时醒来,仍然需要再次判定和等待,不会造成其他异常结果。reqs是一个二元组,第一元为promise,这里不能将其复制到res中,而是move至res中,移动之后队列中的该reqs即失效,此时需要从队列中pop出来。n为需要判定的数值,当其为0即退出,不为0时将其设置为res的值,那么res中的future便会得到判定的结果。

本文由云栖志愿小组郭雪整理,编辑百见

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享: