接下来,我们再创建一个描述银行的Bank类。
// 09_deadlock_bank_transfer.cpp class Bank { public: void addAccount(Account* account) { mAccounts.insert(account); } bool transferMoney(Account* accountA, Account* accountB, double amount) { lock_guard guardA(*accountA->getLock()); // ① lock_guard guardB(*accountB->getLock()); if (amount > accountA->getMoney()) { // ② return false; } accountA->changeMoney(-amount); // ③ accountB->changeMoney(amount); return true; } double totalMoney() const { double sum = 0; for (auto a : mAccounts) { sum += a->getMoney(); } return sum; } private: set<Account*> mAccounts; };
银行类中记录了所有的账号,并且提供了一个方法用来查询整个银行的总金额。这其中,我们最主要要关注转账的实现:transferMoney。该方法的几个关键点如下:
- 为了保证线程安全,在修改每个账号之前,需要获取相应的锁。
- 判断转出账户金额是否足够,如果不够此次转账失败。
- 进行转账。
有了银行和账户结构之后就可以开发转账系统了,同样的,由于是为了演示所用,我们的转账系统也会尽可能的简单:
// 09_deadlock_bank_transfer.cpp void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; } else { cout << "Transfer failed, " << accountA->getName() << " has only $" << accountA->getMoney() << ", but " << randomMoney << " required" << endl; } } }
这里每次生成一个随机数,然后通过银行进行转账。
最后我们在main函数中创建两个线程,互相在两个账号之间来回转账:
// 09_deadlock_bank_transfer.cpp int main() { Account a("Paul", 100); Account b("Moira", 100); Bank aBank; aBank.addAccount(&a); aBank.addAccount(&b); thread t1(randomTransfer, &aBank, &a, &b); thread t2(randomTransfer, &aBank, &b, &a); t1.join(); t2.join(); return 0; }
至此,我们的银行转账系统就开发完成了。然后编译并运行,其结果可能像下面这样:
... Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200 Transfer failed, Moira has only $34.7581, but 66.3208 required Transfer failed, Moira has only $34.7581, but Transfer 93.191 from 53.9176 required Transfer 60.6146 from Moira to Paul, Bank totalMoney: 200 Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney: Transfer failed, Moira has only $17.6041, but 18.1186 required Transfer failed, Moira has only $17.6041, but 18.893 required Transfer failed, Moira has only $17.6041, but 34.7078 required Transfer failed, Moira has only $17.6041, but 33.9569 required Transfer 12.7899 from 200 Moira to Paul, Bank totalMoney: 200 Transfer failed, Moira has only $63.9373, but 80.9038 required Transfer 50.933 from Moira to Paul, Bank totalMoney: 200 Transfer failed, Moira has only $13.0043, but 30.2056 required Transfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200 Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required
如果你运行了这个程序,你会发现很快它就卡住不动了。为什么?
因为发生了死锁。
我们仔细思考一下这两个线程的逻辑:这两个线程可能会同时获取其中一个账号的锁,然后又想获取另外一个账号的锁,此时就发生了死锁。如下图所示:
当然,发生死锁的原因远不止上面这一种情况。如果两个线程互相join就可能发生死锁。还有在一个线程中对一个不可重入的互斥体(例如mutex而非recursive_mutex)多次加锁也会死锁。
你可能会觉得,我可不会这么傻,写出这样的代码。但实际上,很多时候是由于代码的深层次嵌套导致了死锁的发生,由于调用关系的复杂导致发现这类问题并不容易。
如果仔细看一下上面的输出,我们会发现还有另外一个问题:这里的输出是乱的。两个线程的输出混杂在一起了。究其原因也很容易理解:两个线程可能会同时输出,没有做好隔离。
下面我们就来逐步解决上面的问题。
对于输出混乱的问题很好解决,专门用一把锁来保护输出逻辑即可:
// 10_improved_bank_transfer.cpp mutex sCoutLock; void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { sCoutLock.lock(); cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; sCoutLock.unlock(); } else { sCoutLock.lock(); cout << "Transfer failed, " << accountA->getName() << " has only " << accountA->getMoney() << ", but " << randomMoney << " required" << endl; sCoutLock.unlock(); } } }
请思考一下两处lock和unlock调用,并考虑为什么不在while(true)下面写一次整体的加锁和解锁。
通用锁定算法
- 主要API
要避免死锁,需要仔细的思考和设计业务逻辑。
有一个比较简单的原则可以避免死锁,即:对所有的锁进行排序,每次一定要按照顺序来获取锁,不允许乱序。例如:要获取某个玩具,一定要先拿到锁A,再拿到锁B,才能玩玩具。这样就不会死锁了。
这个原则虽然简单,但却不容易遵守。因为数据常常是分散在很多地方的。
不过好消息是,C++ 11标准中为我们提供了一些工具来避免因为多把锁而导致的死锁。我们只要直接调用这些接口就可以了。这个就是上面提到的两个函数。它们都支持传入多个Lockable对象。
接下来我们用它来改造之前死锁的转账系统:
// 10_improved_bank_transfer.cpp bool transferMoney(Account* accountA, Account* accountB, double amount) { lock(*accountA->getLock(), *accountB->getLock()); // ① lock_guard lockA(*accountA->getLock(), adopt_lock); // ② lock_guard lockB(*accountB->getLock(), adopt_lock); // ③ if (amount > accountA->getMoney()) { return false; } accountA->changeMoney(-amount); accountB->changeMoney(amount); return true; }
这里只改动了3行代码。
- 这里通过lock函数来获取两把锁,标准库的实现会保证不会发生死锁。
- lock_guard在下面我们还会详细介绍。这里只要知道它会在自身对象生命周期的范围内锁定互斥体即可。创建lock_guard的目的是为了在transferMoney结束的时候释放锁,lockB也是一样。但需要注意的是,这里传递了 adopt_lock表示:现在是已经获取到互斥体了的状态了,不用再次加锁(如果不加adopt_lock就是二次锁定了)。
运行一下这个改造后的程序,其输出如下所示:
... Transfer failed, Paul has only $1.76243, but 17.5974 required Transfer failed, Paul has only $1.76243, but 59.2104 required Transfer failed, Paul has only $1.76243, but 49.6379 required Transfer failed, Paul has only $1.76243, but 63.6373 required Transfer failed, Paul has only $1.76243, but 51.8742 required Transfer failed, Paul has only $1.76243, but 50.0081 required Transfer failed, Paul has only $1.76243, but 86.1041 required Transfer failed, Paul has only $1.76243, but 51.3278 required Transfer failed, Paul has only $1.76243, but 66.5754 required Transfer failed, Paul has only $1.76243, but 32.1867 required Transfer failed, Paul has only $1.76243, but 62.0039 required Transfer failed, Paul has only $1.76243, but 98.7819 required Transfer failed, Paul has only $1.76243, but 27.046 required Transfer failed, Paul has only $1.76243, but 62.9155 required Transfer 98.8478 from Moira to Paul, Bank totalMoney: 200 Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200 Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200 Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200 Transfer failed, Moira has only $10.0142, but 61.3033 required Transfer failed, Moira has only $10.0142, but 24.5595 required ...
现在这个转账程序会一直运行下去,不会再死锁了。输出也是正常的了。
通用互斥管理
- 主要API
互斥体(mutex相关类)提供了对于资源的保护功能,但是手动的锁定(调用lock或者try_lock)和解锁(调用unlock)互斥体是要耗费比较大的精力的,我们需要精心考虑和设计代码才行。因为我们需要保证,在任何情况下,解锁要和加锁配对,因为假设出现一条路径导致获取锁之后没有正常释放,就会影响整个系统。如果考虑方法还可以会抛出异常,这样的代码写起来会很费劲。
鉴于这个原因,标准库就提供了上面的这些API。它们都使用了叫做RAII的编程技巧,来简化我们手动加锁和解锁的“体力活”。
请看下面的例子:
// https://en.cppreference.com/w/cpp/thread/lock_guard #include <thread> #include <mutex> #include <iostream> int g_i = 0; std::mutex g_i_mutex; // ① void safe_increment() { std::lock_guard<std::mutex> lock(g_i_mutex); // ② ++g_i; std::cout << std::this_thread::get_id() << ": " << g_i << '\n'; // ③ } int main() { std::cout << "main: " << g_i << '\n'; std::thread t1(safe_increment); // ④ std::thread t2(safe_increment); t1.join(); t2.join(); std::cout << "main: " << g_i << '\n'; }
这段代码中:
- 全局的互斥体g_i_mutex用来保护全局变量g_i
- 这是一个设计为可以被多线程环境使用的方法。因此需要通过互斥体来进行保护。这里没有调用lock方法,而是直接使用lock_guard来锁定互斥体。
- 在方法结束的时候,局部变量std::lock_guard lock会被销毁,它对互斥体的锁定也就解除了。
- 在多个线程中使用这个方法。
RAII
上面的几个类(lock_guard,unique_lock,shared_lock,scoped_lock)都使用了一个叫做RAII的编程技巧。
RAII全称是Resource Acquisition Is Initialization,直译过来就是:资源获取即初始化。
RAII是一种C++编程技术,它将必须在使用前请求的资源(例如:分配的堆内存、执行线程、打开的套接字、打开的文件、锁定的互斥体、磁盘空间、数据库连接等——任何存在受限供给中的事物)的生命周期与一个对象的生存周期相绑定。RAII保证资源可用于任何会访问该对象的函数。它亦保证所有资源在其控制对象的生存期结束时,以获取顺序的逆序释放。类似地,若资源获取失败(构造函数以异常退出),则为已构造完成的对象和基类子对象所获取的所有资源,会以初始化顺序的逆序释放。这有效地利用了语言特性以消除内存泄漏并保证异常安全。
RAII 可总结如下:
- 将每个资源封装入一个类,其中:
- 构造函数请求资源,并建立所有类不变式,或在它无法完成时抛出异常,
- 析构函数释放资源并决不抛出异常;
- 始终经由 RAII 类的实例使用满足要求的资源,该资源
- 自身拥有自动存储期或临时生存期,或
- 具有与自动或临时对象的生存期绑定的生存期
回想一下上文中的transferMoney方法中的三行代码:
lock(*accountA->getLock(), *accountB->getLock()); lock_guard lockA(*accountA->getLock(), adopt_lock); lock_guard lockB(*accountB->getLock(), adopt_lock);
如果使用unique_lock这三行代码还有一种等价的写法:
unique_lock lockA(*accountA->getLock(), defer_lock); unique_lock lockB(*accountB->getLock(), defer_lock); lock(*accountA->getLock(), *accountB->getLock());
请注意这里lock方法的调用位置。这里先定义unique_lock指定了defer_lock,因此实际没有锁定互斥体,而是到第三行才进行锁定。
最后,借助scoped_lock,我们可以将三行代码合成一行,这种写法也是等价的。
scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());
scoped_lock会在其生命周期范围内锁定互斥体,销毁的时候解锁。同时,它可以锁定多个互斥体,并且避免死锁。
目前,只还有shared_lock我们没有提到。它与其他几个类的区别在于:它是以共享的方式锁定互斥体。
条件变量
| API | C++标准 | 说明 |
| condition_variable | C++ 11 | 提供与 std::unique_lock 关联的条件变量 |
| condition_variable_any | C++ 11 |提供与任何锁类型关联的条件变量 |
| notify_all_at_thread_exit |C++ 11 | 安排到在此线程完全结束时对 notify_all 的调用 |
| cv_status | C++ 11 |列出条件变量上定时等待的可能结果 |
至此,我们还有一个地方可以改进。那就是:转账金额不足的时候,程序直接返回了false。这很难说是一个好的策略。因为,即便虽然当前账号金额不足以转账,但只要别的账号又转账进来之后,当前这个转账操作也许就可以继续执行了。
这在很多业务中是很常见的一个需求:每一次操作都要正确执行,如果条件不满足就停下来等待,直到条件满足之后再继续。而不是直接返回。
条件变量提供了一个可以让多个线程间同步协作的功能。这对于生产者-消费者模型很有意义。在这个模型下:
- 生产者和消费者共享一个工作区。这个区间的大小是有限的。
- 生产者总是产生数据放入工作区中,当工作区满了。它就停下来等消费者消费一部分数据,然后继续工作。
- 消费者总是从工作区中拿出数据使用。当工作区中的数据全部被消费空了之后,它也会停下来等待生产者往工作区中放入新的数据。
从上面可以看到,无论是生产者还是消费者,当它们工作的条件不满足时,它们并不是直接报错返回,而是停下来等待,直到条件满足。
下面我们就借助于条件变量,再次改造之前的银行转账系统。
这个改造主要在于账号类。我们重点是要调整changeMoney方法。
// 11_bank_transfer_wait_notify.cpp class Account { public: Account(string name, double money): mName(name), mMoney(money) {}; public: void changeMoney(double amount) { unique_lock lock(mMoneyLock); // ② mConditionVar.wait(lock, [this, amount] { // ③ return mMoney + amount > 0; // ④ }); mMoney += amount; mConditionVar.notify_all(); // ⑤ } string getName() { return mName; } double getMoney() { return mMoney; } private: string mName; double mMoney; mutex mMoneyLock; condition_variable mConditionVar; // ① };
这几处改动说明如下:
- 这里声明了一个条件变量,用来在多个线程之间协作。
- 这里使用的是unique_lock,这是为了与条件变量相配合。因为条件变量会解锁和重新锁定互斥体。
- 这里是比较重要的一个地方:通过条件变量进行等待。此时:会通过后面的lambda表达式判断条件是否满足。如果满足则继续;如果不满足,则此处会解锁互斥体,并让当前线程等待。解锁这一点非常重要,因为只有这样,才能让其他线程获取互斥体。
- 这里是条件变量等待的条件。如果你不熟悉lambda表达式,请自行网上学习,或者阅读我之前写的文章。
- 此处也很重要。当金额发生变动之后,我们需要通知所有在条件变量上等待的其他线程。此时所有调用wait线程都会再次唤醒,然后尝试获取锁(当然,只有一个能获取到)并再次判断条件是否满足。除了notify_all还有notify_one,它只通知一个等待的线程。wait和notify就构成了线程间互相协作的工具。
请注意:wait和notify_all虽然是写在一个函数中的,但是在运行时它们是在多线程环境中执行的,因此对于这段代码,需要能够从不同线程的角度去思考代码的逻辑。这也是开发并发系统比较难的地方。
有了上面的改动之后,银行的转账方法实现起来就很简单了,不用再考虑数据保护的问题了:
// 11_bank_transfer_wait_notify.cpp void Bank::transferMoney(Account* accountA, Account* accountB, double amount) { accountA->changeMoney(-amount); accountB->changeMoney(amount); }
当然,转账逻辑也会变得简单,不用再管转账失败的情况发生。
// 11_bank_transfer_wait_notify.cpp mutex sCoutLock; void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; { lock_guard guard(sCoutLock); cout << "Try to Transfer " << randomMoney << " from " << accountA->getName() << "(" << accountA->getMoney() << ") to " << accountB->getName() << "(" << accountB->getMoney() << "), Bank totalMoney: " << bank->totalMoney() << endl; } bank->transferMoney(accountA, accountB, randomMoney); } }
修改完之后的程序运行输出如下:
... Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200 Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200 Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200 Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200 Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200 Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200 Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200 Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200 Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002 Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200 Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200 Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200 ...
这下比之前都要好了。
但是细心的读者会发现,Bank totalMoney的输出有时候是200,有时候不是。但不管怎样,即便这一次不是,下一次又是了。关于这一点,请读者自行思考一下为什么,以及如何改进。
future
这一小节中,我们来熟悉更多的可以在并发环境中使用的工具,它们都位于头文件中。
async
很多语言都提供了异步的机制。异步使得耗时的操作不影响当前主线程的执行流。
在C++11中,async便是完成这样的功能的。下面是一个代码示例:
// 12_async_task.cpp static const int MAX = 10e8; static double sum = 0; void worker(int min, int max) { for (int i = min; i <= max; i++) { sum += sqrt(i); } } int main() { sum = 0; auto f1 = async(worker, 0, MAX); cout << "Async task triggered" << endl; f1.wait(); cout << "Async task finish, result: " << sum << endl << endl; }
这仍然是我们之前熟悉的例子。这里有两个地方需要说明:
- 这里以异步的方式启动了任务。它会返回一个future对象。future用来存储异步任务的执行结果,关于future我们在后面packaged_task的例子中再详细说明。在这个例子中我们仅仅用它来等待任务执行完成。
- 此处是等待异步任务执行完成。
需要注意的是,默认情况下,async是启动一个新的线程,还是以同步的方式(不启动新的线程)运行任务,这一点标准是没有指定的,由具体的编译器决定。如果希望一定要以新的线程来异步执行任务,可以通过launch::async来明确说明。launch中有两个常量:
- async:运行新线程,以异步执行任务。
- deferred:调用方线程上第一次请求其结果时才执行任务,即惰性求值。
除了通过函数来指定异步任务,还可以lambda表达式的方式来指定。如下所示:
// 12_async_task.cpp int main() { double result = 0; cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl; auto f2 = async(launch::async, [&result]() { cout << "Lambda task in thread: " << this_thread::get_id() << endl; for (int i = 0; i <= MAX; i++) { result += sqrt(i); } }); f2.wait(); cout << "Async task with lambda finish, result: " << result << endl << endl; return 0; }
在上面这段代码中,我们使用一个lambda表达式来编写异步任务的逻辑,并通过launch::async明确指定要通过独立的线程来执行任务,同时我们打印出了线程的id。
这段代码输出如下:
Async task with lambda triggered, thread: 0x11290d5c0 Lambda task in thread: 0x700007aa1000 Async task with lambda finish, result: 2.10819e+13
对于面向对象编程来说,很多时候肯定希望以对象的方法来指定异步任务。下面是一个示例:
// 12_async_task.cpp class Worker { public: Worker(int min, int max): mMin(min), mMax(max) {} // ① double work() { // ② mResult = 0; for (int i = mMin; i <= mMax; i++) { mResult += sqrt(i); } return mResult; } double getResult() { return mResult; } private: int mMin; int mMax; double mResult; }; int main() { Worker w(0, MAX); cout << "Task in class triggered" << endl; auto f3 = async(&Worker::work, &w); // ③ f3.wait(); cout << "Task in class finish, result: " << w.getResult() << endl << endl; return 0; }
这段代码有三处需要说明:
- 这里通过一个类来描述任务。这个类是对前面提到的任务的封装。它包含了任务的输入参数,和输出结果。
- work函数是任务的主体逻辑。
- 通过async执行任务:这里指定了具体的任务函数以及相应的对象。请注意这里是&w,因此传递的是对象的指针。如果不写&将传入w对象的临时复制。
packaged_task
在一些业务中,我们可能会有很多的任务需要调度。这时我们常常会设计出任务队列和线程池的结构。此时,就可以使用packaged_task来包装任务。
如果你了解设计模式,你应该会知道命令模式。
packaged_task绑定到一个函数或者可调用对象上。当它被调用时,它就会调用其绑定的函数或者可调用对象。并且,可以通过与之相关联的future来获取任务的结果。调度程序只需要处理packaged_task,而非各个函数。
packaged_task对象是一个可调用对象,它可以被封装成一个std::fucntion,或者作为线程函数传递给std::thread,或者直接调用。
下面是一个代码示例:
// 13_packaged_task.cpp double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum; } double concurrent_task(int min, int max) { vector<future<double>> results; // ① unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { // ② packaged_task<double(int, int)> task(concurrent_worker); // ③ results.push_back(task.get_future()); // ④ int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); // ⑤ t.detach(); min = range + 1; } cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); ⑥ } return sum; } int main() { auto start_time = chrono::steady_clock::now(); double r = concurrent_task(0, MAX); auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl; return 0; }
在这段代码中:
- 首先创建一个集合来存储future对象。我们将用它来获取任务的结果。
- 同样的,根据CPU的情况来创建线程的数量。
- 将任务包装成packaged_task。请注意,由于concurrent_worker被包装成了任务,我们无法直接获取它的return值。而是要通过future对象来获取。
- 获取任务关联的future对象,并将其存入集合中。
- 通过一个新的线程来执行任务,并传入需要的参数。
- 通过future集合,逐个获取每个任务的计算结果,将其累加。这里r.get()获取到的就是每个任务中concurrent_worker的返回值。
为了简单起见,这里的示例只使用了我们熟悉的例子和结构。但在实际上的工程中,调用关系通常更复杂,你可以借助于packaged_task将任务组装成队列,然后通过线程池的方式进行调度:
promise与future
在上面的例子中,concurrent_task的结果是通过return返回的。但在一些时候,我们可能不能这么做:在得到任务结果之后,可能还有一些事情需要继续处理,例如清理工作。
这个时候,就可以将promise与future配对使用。这样就可以将返回结果和任务结束两个事情分开。
下面是对上面代码示例的改写:
// 14_promise_future.cpp double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum; } void concurrent_task(int min, int max, promise<double>* result) { // ① vector<future<double>> results; unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { packaged_task<double(int, int)> task(concurrent_worker); results.push_back(task.get_future()); int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); t.detach(); min = range + 1; } cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); } result->set_value(sum); // ② cout << "concurrent_task finish" << endl; } int main() { auto start_time = chrono::steady_clock::now(); promise<double> sum; // ③ concurrent_task(0, MAX, &sum); auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed." << endl; cout << "Result: " << sum.get_future().get() << endl; // ④ return 0; }
这段代码和上面的示例在很大程度上是一样的。只有小部分内容做了改动:
- concurrent_task不再直接返回计算结果,而是增加了一个promise对象来存放结果。
- 在任务计算完成之后,将总结过设置到promise对象上。一旦这里调用了set_value,其相关联的future对象就会就绪。
- 这里是在main中创建一个promoise来存放结果,并以指针的形式传递进concurrent_task中。
- 通过sum.get_future().get()来获取结果。第2点中已经说了:一旦调用了set_value,其相关联的future对象就会就绪。
需要注意的是,future对象只有被一个线程获取值。并且在调用get()之后,就没有可以获取的值了。如果从多个线程调用get()会出现数据竞争,其结果是未定义的。
如果真的需要在多个线程中获取future的结果,可以使用shared_future。
并行算法
从C++17开始。和 头文件的中的很多算法都添加了一个新的参数:sequenced_policy。
借助这个参数,开发者可以直接使用这些算法的并行版本,不用再自己创建并发系统和划分数据来调度这些算法。
sequenced_policy可能的取值有三种,它们的说明如下:
注意:本文的前面已经提到,目前clang编译器还不支持这个功能。因此想要编译这部分代码,你需要使用gcc 9.0或更高版本,同时还需要安装Intel Threading Building Blocks。
下面还是通过一个示例来进行说明:
// 15_parallel_algorithm.cpp void generateRandomData(vector<double>& collection, int size) { random_device rd; mt19937 mt(rd()); uniform_real_distribution<double> dist(1.0, 100.0); for (int i = 0; i < size; i++) { collection.push_back(dist(mt)); } } int main() { vector<double> collection; generateRandomData(collection, 10e6); // ① vector<double> copy1(collection); // ② vector<double> copy2(collection); vector<double> copy3(collection); auto time1 = chrono::steady_clock::now(); // ③ sort(execution::seq, copy1.begin(), copy1.end()); // ④ auto time2 = chrono::steady_clock::now(); auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count(); cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤ auto time3 = chrono::steady_clock::now(); sort(execution::par, copy2.begin(),copy2.end()); // ⑥ auto time4 = chrono::steady_clock::now(); duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count(); cout << "Parallel sort consuming " << duration << "ms." << endl; auto time5 = chrono::steady_clock::now(); sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦ auto time6 = chrono::steady_clock::now(); duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count(); cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl; }
这段代码很简单:
- 通过一个函数生成1000,000个随机数。
- 将数据拷贝3份,以备使用。
- 接下来将通过三个不同的parallel_policy参数来调用同样的sort算法。每次调用记录开始和结束的时间。
- 第一次调用使用std::execution::seq参数。
- 输出本次测试所使用的时间。
- 第二次调用使用std::execution::par参数。
- 第三次调用使用std::execution::par_unseq参数。
该程序的输出如下
Sequenced sort consuming 4464ms. Parallel sort consuming 459ms. Parallel unsequenced sort consuming 168ms.
可以看到,性能最好的和最差的相差了超过26倍。