[笔记]C++并发编程实战 《四》同步并发操作(四)

简介: [笔记]C++并发编程实战 《四》同步并发操作(四)

4.4.4 持续性连接

假设你有一些列耗费时间的任务要完成,并且想要使用多线程的方式异步完成这些任务,从而减轻主线程上的计算压力。例如:有用户登录了你的应用时,需要将登录凭证发送给后台;然后,对身份信息进行验证后,进一步从后台获取用户的账户信息;最后,当索引到相关信息后,使用获取到的信息对显示进行更新。串行执行的话,可以写成如下的方式:

清单4.18 处理用户登录——串行函数

void process_login(std::string const& username, std::string const& password)
{
  try{
    user_id const id = backend.authenticate_user(username,
    password);
    user_data const info_to_display =
    backend.request_current_info(id);
    update_display(info_to_display);
  } catch(std::exception& e){
    display_error(e);
  }
}

不过,你不想要串行代码吧;你想要的是一段异步代码,所以不想阻塞UI线程。使用 std::async 将另一个列表全部放在后台线程上,不过这依旧会阻塞UI线程,在等待这些任务完成的同时,会消耗大量的资源。如果有很多这样的任务,可以结束一些只在等待的线程,从而节省资源。

清单4.19 处理用户登录——异步方式

std::future<void> process_login(std::string const &username, std::string const &password)
{
    return std::async(std::launch::async, [=]()
    {
        try{
        user_id consst id = backend.authenticate_user(username,
        password);
        user_data const info_to_display =
        backend.request_current_info(id);
        update_display(info_to_display);
        } catch(std::exception& e){
        display_error(e);
    } 
    });
}

为了避免阻塞相应线程,需要有机制对每个完成的任务进行连接:持续性。下面的代码清单展示的处理过程大体相同,但这次将整个任务分成了一系列任务,并且每个任务在完成的时候回连接到前一个任务上。

清单4.20 处理用户登录——持续性方式

std::experimental::future<void> process_login(std::string const &username, std::string const &password)
{
    return spawn_async([=](){ return backend.authenticate_user(username, password); })
        .then([](std::experimental::future<user_id> id)
              { return backend.request_current_info(id.get()); })
        .then([](std::experimental::future<user_data>info_to_display)
              {
                try{
                    update_display(info_to_display.get());
                } catch(std::exception& e){
                    display_error(e);
                } 
        }
    );
}

需要注意的是,每个持续性函数都有一个 std::experimental::future 作为独立参数,然后使用 .get() 来获取其拥有的值。这意味着异常会沿着这个链条进行传播,如果有函数抛出异常,那么就会在调用info_to_display.get()时抛出,捕获结构可以处理所有的异常类型,就如清单4.18的catch那样。

因为需要等待消息通过网络或数据操作进行传输,所函数内部会对后端模块进行调用,但这时前端的任务可能还没有完成。虽然已经将任务进行分割成独立的小任务,但它们仍然会阻塞调用,这样就会阻塞线程的运行,这些需要在后端任务完成时,前端处理就已经准备好了,而不是对线程进行阻塞。

这样的话,backend.async_authenticate_user(username,password)返回 std::experimental::future<user_id> 会比返回user_id更加合适。

你可能觉得这段代码比较复杂,因为持续函数返回的期望值类型为 future<future<some_value>> ,否则只能将调用 .then 的语句放置在持续函数中。如果这么想,就错了;因为持续性支持一种极为精妙的特性,叫做期望值展开(future-unwrapping)。

当你向 .then() 传递了持续性函数,并且返回一个future类型的值时,相应的 .then() 的返回值类型也是future。最终的代码可能如下所示,这样在异步函数链上就不会存在阻塞了。

清单4.21 处理用户登录——全异步操作

std::experimental::future<void> process_login(
    std::string const &username, std::string const &password)
{
    return backend.async_authenticate_user(username,
                                           password)
        .then(
            [](std::experimental::future<user_id> id)
            {
                return backend.async_request_current_info(id.get());
            })
        .then([](std::experimental::future<user_data>
                     info_to_display)
              {
                try{
                update_display(info_to_display.get());
                } catch(std::exception& e){
                display_error(e);
              } 
        }
    );
}

这和清单4.18的代码几乎一模一样,区别就是Lambda表达式和将相应的功能包裹在 .then 的调用中。如果所用编译器支持C++14泛型Lambda表达式,那么Lambda表达式的参数列表中的类型可以使用auto替换,例如:

return backend.async_authenticate_user(username, password).then(
[](auto id){
  return backend.async_request_current_info(id.get());
});

如果需要比较简单的线性控制流,来控制比较复杂的东西,可以使用Lambda表达式来实现一些逻辑功能;如果控制流是真的很复杂,就需要单独写一个函数来完成这件事了。

目前,我们一直将注意力放在支持持续性的 std::experimental::future 上。 std::experimental::shared_future 同样支持持续性。二者的区别在于, std::experimental::shared_future 对象可以具有多个持续性对象,并且持续性参数是 std::experimental::shared_future ,而不是 std::experimental::future 。 std::experimental::shared_future 脱离了共享的本性——因为多个对象可以引用相同的共享状态,如果只允许一个延续,那么多个线程的情况下就会产生条件竞争,每个线程都试图将持续性对象添加到在自己的 std::experimental::shared_future 对象中。这种情况的确很糟糕,所以才允许多持续性的存在。当使用多持续性时,可以通过同一个 std::experimental::shared_future 对象对其进行

添加。另外,当只打算给第二个持续性传递对象时,不能给第一个持续性的传递一个临时std::experimental::shared_future 对象。因此,传递给延续性函数的参数也必须是 std::experimental::shared_future 对象。

auto fut = spawn_async(some_function).share();
auto fut2 = fut.then([]
(
  std::experimental::shared_future<some_data> data){
    do_stuff(data);
});
auto fut3 = fut.then([]
( 
  std::experimental::shared_future<some_data> data){
  return do_other_stuff(data);
});

由于调用了 share() ,fut是一个 std::experimental::share_future 实例,这是因为持续性函数必须将 std::experimental::shared_future 对象作为参数。不过,持续性返回的值为 std::experimental::future ——目前这个值无法共享——所以fut2和fut3的类型都是 std::experimental::future 。

在并发技术扩展规范中,持续性只是增强期望值能力的一种方式,不过这可能是最重要的方式。另外还提供了两个超载函数,并等待其中任意一个期望值状态为就绪,或是等待所有期望值状态为就绪。

4.4.5 等待多个期望值

假设你有很多的数据需要处理,并且每个数据都可以单独的进行处理。这是利用硬件的好机会,可以产生异步任务组来处理数据项,每个任务通过期望值来返回处理过后的数据。不过,当需要等待所有任务完成,才能得到最终的结果,对于逐个的对期望值进行收集,然后在整理结果,这总觉得不是很爽。如果打算用一个异步任务来收集结果,那就先要生成这个异步任务,这样的话就会占用一个线程的资源,并且需要不断的对期望值进行轮询,当所有期望值状态为就绪时,生成新的任务。

下面就展示了一个这样的例子:

清单4.22 使用 std::async 从多个期望值中收集结果

std::future<FinalResult> process_data(std::vector<MyData> &vec)
{
    size_t const chunk_size = whatever;
    std::vector<std::future<ChunkResult> > results;
    for (auto begin = vec.begin(), end = vec.end(); beg != end;)
    {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size,
                                                chunk_size);
        results.push_back(
            std::async(process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }
    return std::async([all_results = std::move(results)]()
                    {
                        std::vector<ChunkResult> v;
                        v.reserve(all_results.size());
                        for (auto& f : all_results)
                        {
                        v.push_back(f.get()); // 1
                        }
                        return gather_results(v); 
                    });
}

这段代码会生成一个异步任务来等待处理结果,在所有处理结果都就绪的情况下,对结果进行整合。不过,每个任务都是独立的,因此当结果可用前,调度程序会在①处反复的进行唤醒,当发现有没有变为就绪态的结果时,再次回到休眠的状态。这样的方式不仅会占用线程资源,而且在之后对期望值的操作中会增加上下文切换频率,从而使应用增加了很多额外的开销。

可以使用 std::experimental::when_all 来避免这里的等待和切换,可以将一组需要等待的期望值传入when_all函数中,其会返回一个新的期望值——当传入的所有期望值的状态都为就绪时,这个新的期望值状态就会被置为就绪,这个期望值可以和持续性配合起来一起处理其他的任务。

下面的代码就展示了这样的一种方式:

清单4.23 使用 std::experimental::when_all 从多个期望值中收集结果

std::experimental::future<FinalResult> process_data(
    std::vector<MyData> &vec)
{
    size_t const chunk_size = whatever;
    std::vector<std::experimental::future<ChunkResult> > results;
    for (auto begin = vec.begin(), end = vec.end(); beg != end)
    {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size,
                                                chunk_size);
        results.push_back(
            spawn_async(
                process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }
    return std::experimental::when_all(
               results.begin(), results.end())
        .then( // 1
            [](std::future<std::vector<std::experimental::future<ChunkResult> > > ready_results)
            {
                std::vector<std::experimental::future<ChunkResult> >
                    all_results = ready_results.get();
                std::vector<ChunkResult> v;
                v.reserve(all_results.size());
                for (auto &f : all_results)
                {
                    v.push_back(f.get()); // 2
                }
                return gather_results(v);
            });
}

这个例子中,可以看到when_all函数会等待所有期望值的状态变为就绪,然后再用 .then 调用调度函数①,而不是使用async。虽然Lambda表达式表面上看上去是一样的,但这里将results的vector作为参数(包装到期望值中),而不是放在捕获器中,并在之后对每个期望值使用get②,从而无阻塞的获得所有处理后的结果。这不需要对代码做太多的修改,就能介绍系统的负担。

为了补全when_all,我们也有when_any。其也会产生一个期望值,当期望值组中任意一个期望为就绪态,那么这个新期望值的状态即为就绪。这对于并发性任务是一个不错的选择,不过就需要为第一个为就绪的线程找点事情来做。

4.4.6 使用when_any等待第一个期望值

假设你在一大堆数据里面找一个符合要求的值,不过符合这样要求的值有很多,找到任何一个都可以。这种任务是可以并行的,可以多线程完成,每个任务去检查数据的一个子集;如果有线程找到了合适的值,那么这个线程会设置一个标志,让其他线程停止搜索,并返回结果。这种情况下,即使其他任务还没有完成清理,还是希望第一个完成搜索任务的线程对数据进行进一步的处理。

这就可以使用 std::experimental::when_any 将期望值收集在一起,并提供了一种新的特性,就是当期望值有一个为就绪时,任务即为完成。when_all会根据传入的期望值集合返回一个新的期望值,when_any会添加额外的层,并将集合和索引值组合在一起,这里的索引用于表示触发就绪的期望值,并将这个期望值添加到std::experimental::when_any_result 类模板实例中。

清单中展示如何使用when_any。

清单4.24 使用 std::experimental::when_any 处理第一个被找到的值

std::experimental::future<FinalResult>
find_and_process_value(std::vector<MyData> &data)
{
    unsigned const concurrency =
        std::thread::hardware_concurrency();
    unsigned const num_tasks = (concurrency > 0) ? concurrency : 2; std::vector<std::experimental::future<MyData *> > results;
    auto const chunk_size = (data.size() + num_tasks - 1) /
                            num_tasks;
    auto chunk_begin = data.begin();
    std::shared_ptr<std::atomic<bool> > done_flag =
        std::make_shared<std::atomic<bool> >(false);
    for (unsigned i = 0; i < num_tasks; ++i)
    { // 1
        auto chunk_end =
            (i < (num_tasks - 1) ? chunk_begin + chunk_size : data.end());
        results.push_back(spawn_async([=] { // 2
            for (auto entry = chunk_begin;
                 !*done_flag && (entry != chunk_end);
                 ++entry)
            {
                if (matches_find_criteria(*entry))
                {
                    *done_flag = true;
                    return &*entry;
                }
            }
            return (MyData *)nullptr;
        }));
        chunk_begin = chunk_end;
    }
    std::shared_ptr<std::experimental::promise<FinalResult> >
        final_result =
            std::make_shared<std::experimental::promise<FinalResult> >();
    struct DoneCheck
    {
        std::shared_ptr<std::experimental::promise<FinalResult> >
            final_result;
        DoneCheck(
            std::shared_ptr<std::experimental::promise<FinalResult> >
                final_result_)
            : final_result(std::move(final_result_)) {}
        void operator()( // 4
            std::experimental::future<std::experimental::when_any_result<
                std::vector<std::experimental::future<MyData *> > > >
                results_param)
        {
            auto results = results_param.get();
            MyData *const ready_result =
                results.futures[results.index].get(); // 5
            if (ready_result)
                final_result->set_value( // 6
                    process_found_value(*ready_result));
            else
            {
                results.futures.erase(
                    results.futures.begin() + results.index); // 7
                if (!results.futures.empty())
                {
                    std::experimental::when_any( // 8
                        results.futures.begin(), results.futures.end())
                        .then(std::move(*this));
                }
                else
                {
                    final_result->set_exception(
                        std::make_exception_ptr( // 9
                            std::runtime_error(“Not found”)));
                }
            }
        };
        std::experimental::when_any(results.begin(), results.end())
            .then(DoneCheck(final_result)); // 3
        return final_result->get_future();  // 10
}

初始化循环①会产生num_tasks个异步任务,每个任务都会执行②处的Lambda表达式。这个Lambda表达式的捕获方式是拷贝,所以每个任务都有自己的chunk_begin和chunk_end,这里同样也拷贝了共享指针done_flag。这就避免了生命周期所带来的问题。

当所有任务都已经产生,希望对任务的返回结果进行处理。可以调用when_any③通过连接持续性完成。这次可将持续性以类的方式去编写,因为想要对其进行递归复用。当其中一个任务完成初始化,DoneCheck的函数操作符会被调用④。首先,已经准备好从就绪的期望值中获取值⑤,并且当符合条件的值被找到,可以对结果进行处理,并对最终结果进行设置⑥。

否则,就需要从集合中丢弃就绪的期望值⑦,当还有很多期望值需要检查时,会产生对

when_any的再次调用⑧,要再触发其持续性,需要等待下个期望值到达就绪态。如果没有剩下任何其他期望值,就说明这个值没找到,那么将会在期望值中存储一个异常⑨。函数的返回值是一个期望值,其包含有最终的结果⑩。当然,这个问题还有其他解法,不过就想在这里展示一下如何使用when_any。

这两个使用when_all和when_any的例子中,都使用了重载版的迭代器范围,其使用一堆迭代器来表示一组处于等待状态期望值的开始和末尾。这两个函数也可以以变量的形式出现,可以将一组期望值作为参数直接进行传入。这个例子中,期望值中存储的是一个元组(或when_any_result持有一个元组),而不是一个vector:

std::experimental::future<int> f1=spawn_async(func1);
std::experimental::future<std::string> f2=spawn_async(func2);
std::experimental::future<double> f3=spawn_async(func3);
std::experimental::future<
std::tuple<
std::experimental::future<int>,
std::experimental::future<std::string>,
std::experimental::future<double>>> result=
std::experimental::when_all(std::move(f1),std::move(f2),std::mov
e(f3));

这个例子强调了when_any和when_all语法的重要性——可以通过容器中的任

意 std::experimental::future 实例进行移动,并且通过值获取参数,因此需要显式的将期望值传入,或是传递一个临时变量。

有时所等待的事件是一组线程,或是要达到代码的特定点,或是需要配合着处理了一定量的数据。这种情况下,最好使用锁存器或栅栏机制,而不是期望值。现在,让我们来了解一下并发技术扩展规范所提供的锁存器和栅栏机制。

4.4.7 并发技术扩展规范中的锁存器和栅栏机制

首先,来思考一下,我们所说的锁存器或是栅栏机制是什么意思。锁存器是一种同步对象,当它的计数器减为0时,它就准备就绪了。锁存器这个名称是基于其输出特性——当处于就绪态时,其就会保持就绪态,直到被销毁。因此,锁存器是为同步一系列事件发生的轻量级装置。

另外,栅栏机制是一种可复用的同步装置,其用于一组线程间的内部同步。虽然,锁存器不在乎是哪个线程使得计数器递减——同一个线程可以对计数器递减多次,或多个线程对计数器递减一次,再或是其中有些线程对计数器有两次的递减——对于栅栏机制来说,每一个线程只能在每个周期到达栅栏一次。当线程都抵达栅栏时,会对线程进行阻塞,直到所有线程都达到栅栏处,这时阻塞将会被解除。栅栏可以复用——线程可以再次到达栅栏处,等待下一个周期的所有线程。

锁存器其实要比栅栏简单很多,我们就先从简单 std::experimental::latch 说起。

4.4.8 std::experimental::latch:基础的锁存器类型

std::experimental::latch 声明在 <experimental/latch> 头文件中。构

造 std::experimental::latch 时,将计数器的值作为构造函数的唯一参数。之后,当等待的事件发生,就会调用锁存器count_down成员函数;当计数器为0时,锁存器状态变为就绪。可以调用wait成员函数对锁存器进行阻塞,直到等待的锁存器处于就绪状态时释放;如果需要对锁存器是否就绪的状态进行检查时,可调用is_ready成员函数。想要减少计数器1并阻塞直至它抵达0,则可以调用count_down_and_wait成员函数。

下面代码清单展示一个简单的例子:

清单4.25 使用 std::experimental::latch 等待所有事件

void foo()
{
    unsigned const thread_count = ...;
    latch done(thread_count); // 1
    my_data data[thread_count];
    std::vector<std::future<void> > threads;
    for (unsigned i = 0; i < thread_count; ++i)
        threads.push_back(std::async(std::launch::async, [&, i] { // 2
            data[i] = make_data(i);
            done.count_down(); // 3
            do_more_stuff();   // 4
        }));
    done.wait();                      // 5
    process_data(data, thread_count); // 6
} // 7

使用需要等待的事件数量对done的构造进行初始化①,并且使用 std::async 产生适量的线程②。在进行下一步之前④,每个线程生成了相应的数据块时,都会对锁存器的计数器进行递减③。在处理生成的数据⑥之前,主线程只需要等待锁存器成为就绪态即可⑤。⑥处的数据处理可能会与对线程的最终处理同步进行④——所以这在函数末尾 std::future 析构之前⑦,无法保证所有线程都已完成。

需要注意的是,在②传递给 std::async Lambda表达式中,是通过引用的方式对除了i之外的所有内容进行捕获,而i是通过值捕获的方式进行传递。这是因为i是这里的循环计数器,如果通过引用捕获将会导致数据竞争和未定义的行为,而数据和完成状态是我们需要共享访问的东西。此外,在这种情况下,只需要一个锁存器就够了,因为线程在数据准备好之后,还有其他任务要做;否则,就需要在处理数据前,等待所有期望值,从确保所有任务都已经完成。

process_data中对data的访问时安全的⑥,即便这个值是其他线程上的任务存储的,因为锁存器是一个同步对象,所以线程调用cound_down改变计数器的行为是可见的,从而保证对wait的调用和返回在同一个锁存器对象上为可见。本质上,对count_down的调用与对wait的调用同步——第5章中了解了底层内存需和同步约束之后,就会明白这意味着什么了。

除了锁存器之外,并发技术扩展规范还为我们提供了用于同步一组线程的可复用的同步对象——栅栏机制。接下来就让我们一起来了解下这个机制。

4.4.9 std::experimental::barrier:简单的栅栏机制

并发技术扩展规范提供了两种栅栏机制, <experimental/barrier> 头文件中分别

为: std::experimental::barrier 和 std::experimental::flex_barrier 。前者更简单,所以

开销更低;后者更灵活,但是开销较大。

假设有一组线程对某些数据进行处理。每个线程都在处理独立的任务,因此在处理过程中无需同步,但当所有线程都必须处理下一个数据项前,完成当前的任务。 std::experimental::barrier 正是针对这样的情况而设计。这里可以为同步组,指定线程

的数量,并为这组线程构造栅栏。当每个线程完成其处理任务时,都会到达栅栏处,并且通过调用栅栏对象的arrive_and_wait成员函数,等待小组的其他成员线程。当最后一个线程抵达时,所有线程将被释放,并且栅栏会被重置。组中的线程可以继续接下来的任务,或是处理下一个数据项,或是进入下一个处理阶段。

锁存器一旦就绪就会保持状态,不会有释放等待线程,重置,以及复用的过程。栅栏机制也只能用于一组线程内的同步——除非组中只有一个线程,否则无法等待栅栏就绪。可以通过显式调用栅栏对象的arrive_and_drop成员函数让线程退出组,这样线程就不用再受栅栏的约束,这样下一个周期到达的线程数就必须要比当前周期到达的线程数少一个了。

清单4.26 std::experimental::barrier 的用法

result_chunk process(data_chunk);
std::vector<data_chunk>
divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source &source, data_sink &sink)
{
    unsigned const concurrency =
        std::thread::hardware_concurrency();
    unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
    std::experimental::barrier sync(num_threads);
    std::vector<joining_thread> threads(num_threads);
    std::vector<data_chunk> chunks;
    result_block result;
    for (unsigned i = 0; i < num_threads; ++i)
    {
        threads[i] = joining_thread([&, i]                           {
    while (!source.done()) { // 6
      if (!i) { // 1
      data_block current_block =
      source.get_next_data_block();
      chunks = divide_into_chunks(current_block, num_threads);
    }
    sync.arrive_and_wait(); // 2
    result.set_chunk(i, num_threads, process(chunks[i])); //  3
    sync.arrive_and_wait(); // 4
    if (!i) { // 5
      sink.write_data(std::move(result));
    }
   } 
});
}
} // 7

清单4.26中展示了,如何使用栅栏来对一组线程进行同步。这里的数据来源是source,并且输出是sink,不过为了并发运行,需要将数据划分成num_threads块。这个操作是串行的,所以需要在初始化数据块①是进行,并且初始化过程只运行在i为0的线程上。并行执行任务之前,所有线程都会在栅栏处等待数据划分完成②,而后每个线程都会处理属于自己的数据块,并且再次同步之前④,将结果更新到result中③。然后就会到达下一个需要串行处理域,这里只有0号线程可以将结果输出到sink中⑤。这时,所有线程都会循环等待,直到将source中的任务全部处理完(done)⑥。当线程进入循环时,串行部分与循环是连接在一起的;因为在串行部分,只有0号线程会执行,所以也没什么问题,在第一个栅栏处②,会将所有线程进行同步。当所有的处理都结束了,就意味着所有线程将会退出循环,并等待所有joining_thread对象的外部函数结束时,对这些对象进行析构⑦(joining_thread在第2章的清单2.7中有过介绍)。

需要着重注意的是,arrive_and_wait函数的调用位置。所有线程就绪前,确定没有线程在运行是很重要的。第一个同步点,所有线程都在等待0号线程到达;而第二个同步点,情况刚好相反,0号线程在等待其他线程都到达之后,才能将完成的结果写入sink中。

并发技术扩展规范不止提供了一种栅栏类型,与 std::experimental::barrier 相同,也可以使用 std::experimental::flex_barrier ,不过这个类型的栅栏更加的灵活。灵活之处在于栅栏拥有完成阶段,一旦参与线程集中的所有线程都到达同步点,则由参与线程之一执行完成阶段。

4.4.10 std::experimental::flex_barrier—更灵活和友好版

std::experimental::barrier

std::experimental::flex_barrier 与 std::experimental::barrier 有一点不同:

  • 其有一个额外的构造函数,需要传递传入一个完整的函数和线程数量。当所有线程都到达栅栏处,那么这个函数就由其中一个线程运行。其不仅指定了一种串行代码块的运行方式,并且还提供了一种修改需要在下一个周期到达栅栏处线程个数的方式。对于线程的技术可以修改成任何数字,无论这个数字比当前数字高或低;因为这个功能,开发者就能保证下一次到达栅栏处的线程数量时正确无误的。

下面的代码清单中,展示了使用 std::experimental::flex_barrier 如何对清单4.26的代码进行重写:

清单4.27 使用 std::experimental::flex_barrier 来管理串行部分

void process_data(data_source &source, data_sink &sink)
{
    unsigned const concurrency =
        std::thread::hardware_concurrency();
    unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
    std::vector<data_chunk> chunks;
    auto split_source = [&] { // 1
        if (!source.done())
        {
            data_block current_block = source.get_next_data_block();
            chunks = divide_into_chunks(current_block, num_threads);
        }
    };
    split_source(); // 2
    result_block result;
    std::experimental::flex_barrier sync(num_threads, [&] { // 3
        sink.write_data(std::move(result));
        split_source(); // 4
        return -1;      // 5
    });
    std::vector<joining_thread> threads(num_threads);
    for (unsigned i = 0; i < num_threads; ++i)
    {
        threads[i] = joining_thread([&, i]                       {
    while (!source.done()) { // 6
      result.set_chunk(i, num_threads, process(chunks[i]));
      sync.arrive_and_wait(); // 7
    } });
    }
}

与清单4.26的:

  • 第一个不同在于这里使用一个Lambda表达式对数据进行拆分①。这个Lambda表达式会在运行前被调用②,并封装在迭代开始时的0号线程上运行。
  • 第二个区别在于,sync对象的类型为 std::experimental::flex_barrier ,并且需要将一个完整的函数和线程数量对实例进行构造③。该函数会在所有线程抵达栅栏处的时候,运行在0号线程上,然后由0号线程调用Lambda表达式对数据进行拆分,当拆分结束后,下一轮迭代开始④。返回值-1表示线程数目保持不变,返回值为0或其他数值则指定的是下一个周期中参与迭代的线程数量。

主循环⑥就简单了:其只包含了并行部分的代码,所以只要有一个同步点就够了⑦。使

用 std::experimental::flex_barrier 能够很好的对代码进行简化。

使用完整函数作为串行段是一种很强大的功能,因为这能够改变参与并行的线程数量。

例如:流水线类型代码在运行时,当流水线的各级都在进行处理时,线程的数量在初始阶段和执行阶段要少于主线程处理阶段。

总结

同步操作对于使用并发编写应用来说,是很重要的一部分:如果没有同步,线程基本上就是独立的,也可写成单独的应用,因其任务之间的相关性,它们才可作为一个群体直接执行。

本章讨论了各式各样的同步操作,有:

  • 条件变量、
  • 期望值、
  • 承诺值、
  • 打包任务、
  • 锁存器
  • 栅栏机制。

也讨论了替代同步的解决方案:

  • 函数化模式编程,完全独立执行的函数,不会受到外部环境的影响;
  • 还有,消息传递模式,以消息子系统为中介,向线程异步的发送消息;
  • 以及持续性方式,其指定了操作的后续任务,并由系统负责调度。

已经讨论了很多C++中的高层工具,现在我们来看一下底层工具是如何工作的:

  • C++内存模型
  • 原子操作。


相关文章
|
5天前
|
C++
在C和C++中,指针的算术操作
在C和C++中,指针的算术操作
|
12天前
|
算法 C++ 容器
黑马c++ STL常用算法 笔记(6) 常用集合算法
黑马c++ STL常用算法 笔记(6) 常用集合算法
|
12天前
|
算法 C++ 容器
黑马c++ STL常用算法 笔记(5) 常用算术生成算法
黑马c++ STL常用算法 笔记(5) 常用算术生成算法
|
12天前
|
算法 C++ 容器
黑马c++ STL常用算法 笔记(4) 常用拷贝和替换算法
黑马c++ STL常用算法 笔记(4) 常用拷贝和替换算法
|
12天前
|
存储 算法 搜索推荐
黑马c++ STL常用算法 笔记(3) 排序算法
黑马c++ STL常用算法 笔记(3) 排序算法
|
12天前
|
算法 C++
黑马c++ STL常用算法 笔记(2) 查找算法
黑马c++ STL常用算法 笔记(2) 查找算法
|
12天前
|
算法 C++ 容器
黑马c++ STL常用算法 笔记(1) 遍历算法
黑马c++ STL常用算法 笔记(1) 遍历算法
|
12天前
|
安全 Go 对象存储
C++多线程编程:并发与同步的实战应用
本文介绍了C++中的多线程编程,包括基础知识和实战应用。C++借助`&lt;thread&gt;`库支持多线程,通过`std::thread`创建线程执行任务。文章探讨了并发与同步的概念,如互斥锁(Mutex)用于保护共享资源,条件变量(Condition Variable)协调线程等待与通知,以及原子操作(Atomic Operations)保证线程安全。实战部分展示了如何使用多线程进行并发计算,利用`std::async`实现异步任务并获取结果。多线程编程能提高效率,但也需注意数据竞争和同步问题,以确保程序的正确性。
|
5天前
|
存储 Serverless 数据安全/隐私保护
C++ 类的成员函数和数据成员的技术性探讨
C++ 类的成员函数和数据成员的技术性探讨
14 0
|
1天前
|
存储 编译器 C语言
【C++语言2】类和对象(上)
【C++语言2】类和对象(上)