协程是什么
来源:协程是在C++20 标准中提出的一个新的工具。
它突破传统的程序在cpu中来回切换时需要更新和恢复PCB资源现场的耗时操作(多进程)或者COW(低级调度)操作时间。
因此,我们可以说
协程是一种轻量级的线程,
可以在同一个线程内实现多个协程的切换,从而实现并发编程。
协程可以看作是一种用户态线程,不需要操作系统的支持,可以自己实现调度和切换。
在C++中,协程是通过协程库来实现的,可以使用关键字co_await和co_yield来实现协程的切换。
协程库提供了协程的创建、销毁、切换等操作,可以方便地实现异步编程和并发编程。
协程的优点是轻量级、高效、可控,可以提高程序的性能和可维护性。
接下来我们介绍:协程和我们熟悉的线程、进程之间有什么区别。然后我们就可以介绍什么时候使用协程了。
协程和线程、进程是什么关系
协程、进程、线程是三种不同的并发编程模型。
- 协程是一种轻量级的线程,可以在同一个线程内实现多个协程的切换,不需要操作系统的支持,可以自己实现调度和切换。
- 进程是操作系统中的一个独立的执行单元,拥有自己的地址空间、文件描述符等资源,需要操作系统的支持来进行调度和切换。
- 线程是进程中的一个执行单元,共享进程的地址空间和文件描述符等资源,需要操作系统的支持来进行调度和切换。
在实际应用中,应该根据具体的需求来选择使用哪种并发编程模型。
- 如果需要高并发、高性能的应用,可以选择使用协程,
- 因为协程的切换开销比线程和进程小,
- 可以提高程序的性能和可维护性。
- 如果需要独立的执行环境和资源隔离,可以选择使用进程。
- 如果需要共享资源和数据,可以选择使用线程。
协程的优点是
- 轻量级、高效、可控,
- 可以提高程序的性能和可维护性。
- 协程的效率优点主要是
- 因为协程的切换开销比线程和进程小,
- 因为协程的切换是在用户态完成的,不需要操作系统的介入,所以开销比较小。
- 此外,协程可以避免 线程切换时的上下文切换和内核态和用户态之间的切换,也可以避免线程之间的竞争和死锁等问题。
进程的优点是
- 独立的执行环境和资源隔离,可以提高程序的安全性和可靠性。
- 进程的效率优点主要是
- 因为进程之间的资源隔离,可以避免进程之间的竞争和死锁等问题。
- 此外,进程可以利用多核处理器的优势,进行并行计算。
线程的优点是
- 共享资源和数据,可以提高程序的效率和可维护性。
- 线程的效率优点主要是
- 因为线程之间的共享资源和数据,可以避免进程之间的通信和同步等问题。
- 此外,线程可以利用多核处理器的优势,进行并行计算。
协程的缺点是
- 不能利用多核处理器的优势,不能进行并行计算。
- 此外,协程的实现比较复杂,需要使用协程库来实现。
进程的缺点是
- 创建和销毁的开销比较大,需要操作系统的支持来进行调度和切换。
- 此外,进程之间的通信和同步比较复杂,需要使用进程间通信机制来实现。
线程的缺点是
共享资源和数据可能会导致竞争和死锁等问题。
此外,线程之间的通信和同步也比较复杂,需要使用线程间通信机制来实现。
为什么要用协程
- 以下是使用协程的7个理由:
- 高效性:协程的切换开销比线程和进程小,因为协程的切换是在用户态完成的,不需要操作系统的介入,所以开销比较小。
- 可维护性:协程的代码结构比线程和进程简单,易于维护和调试。
- 可控性:协程可以自己实现调度和切换,可以根据具体的需求来控制协程的执行顺序和优先级。
- 资源占用:协程的资源占用比线程和进程小,因为协程可以在同一个线程内实现多个协程的切换,不需要创建多个线程或进程。
- 并发性:协程可以实现高并发,因为可以在同一个线程内实现多个协程的切换,可以充分利用CPU资源。
- 可移植性:协程的代码可以在不同的操作系统和平台上运行,不需要进行大量的修改和适配。
- 可扩展性:协程可以根据具体的需求进行扩展和优化,可以实现更高效、更灵活的并发编程模型。
怎么使用协程
以下是一个我简单封装的使用C++协程库封装协程的类Coroutine:
#include <coroutine> #include <functional> class Coroutine { public: Coroutine(std::function<void()> func) : m_func(func) {} void start() { m_handle = std::coroutine_handle::from_promise(m_promise.get_return_object()); m_handle.resume(); } void resume() { m_handle.resume(); } void stop() { m_handle.destroy(); } private: struct promise_type { auto get_return_object() { return std::coroutine_handle<promise_type>::from_promise(*this); } auto initial_suspend() { return std::suspend_never{}; } auto final_suspend() noexcept { return std::suspend_never{}; } void unhandled_exception() { std::terminate(); } }; std::function<void()> m_func; std::coroutine_handle<promise_type> m_handle; std::promise<void> m_promise; };
这个类封装了协程的重要操作,包括启动、恢复和停止。它使用了C++20的协程库,通过std::coroutine_handle和std::promise来实现协程的管理和调度。
下面是一个测试样例,它创建了两个协程,分别输出数字1到5和6到10:
#include <iostream> void print_numbers(int start, int end) { for (int i = start; i <= end; i++) { std::cout << i << std::endl; co_yield; } } int main() { Coroutine c1([&]() { print_numbers(1, 5); }); Coroutine c2([&]() { print_numbers(6, 10); }); c1.start(); c2.start(); while (c1 || c2) { if (c1) { c1.resume(); } if (c2) { c2.resume(); } } return 0; }
这个测试样例创建了两个协程,分别输出数字1到5和6到10。它使用了Coroutine类来管理协程的启动、恢复和停止,通过while循环来轮流执行两个协程,直到它们都执行完毕。
正如我们之前所说的,协程是C++20 的工具,所以不理解上面已经封装过后的类实现也无妨,我们可以更细致一点介绍。
这个工具具体是什么样子?
这里套用C++指导手册内容:
如果函数的定义包含以下任何一项,则该函数就是协程:
co_await表达式 — 暂停执行,直到恢复 task<> tcp_echo_server() { char data[1024]; while (true) { [std::size_t](https://en.cppreference.com/w/cpp/types/size_t) n = co_await socket.async_read_some(buffer(data)); co_await async_write(socket, buffer(data, n)); } }
co_yield表达式 — 暂停执行返回值 generator<int> iota(int n = 0) { while (true) co_yield n++; }
co_return语句 — 完成返回值的执行 lazy<int> f() { co_return 7; }
每个协程都必须具有满足许多要求的返回类型,如下所述。
限制
协程不能使用可变参数、纯返回语句或占位符返回类型(auto 或 Concept)。
Consteval 函数、constexpr 函数、构造函数、析构函数和 main 函数不能是协程。
执行
每个协程都拥有
- 承诺对象,从协程内部操作。协程通过此对象提交其结果或异常。
- 协程_句柄,从协程_外部操作。这是一个非拥有句柄,用于恢复协程的执行或销毁协程帧。
- 协程状态,即内部动态分配的存储(除非优化分配),包含以下内容的对象
- 承诺对象
- 参数(全部按值复制)
- 当前暂停点的一些表示形式,以便恢复知道从哪里继续,销毁知道范围内的局部变量
- 生存期跨越当前挂起点的局部变量和临时变量。
当协程开始执行时,它将执行以下操作:
- 使用运算符 new 分配协程状态对象。
- 将所有函数参数复制到协程状态:按值参数被移动或复制,按引用参数保持引用(因此,如果在引用对象的生存期结束后恢复协程,则可能会变得悬而未决 — 有关示例,请参见下文)。
- 调用承诺对象的构造函数。如果 promise 类型具有采用所有协程参数的构造函数,则使用复制后协程参数调用该构造函数。否则,将调用默认构造函数。
- 调用承诺。get_return_object()并将结果保存在局部变量中。当协程首次挂起时,该调用的结果将返回给调用方。在此步骤之前引发并包含此步骤的任何异常都会传播回调用方,而不是放在承诺中。
- 调用承诺。initial_suspend()及其结果。典型类型要么返回 std::suspend_always(对于延迟启动的协程)或 std::suspend_never(对于急切启动的协程)。co_await``Promise
- 什么时候co_await承诺。initial_suspend()恢复,开始执行协程的主体。
参数悬空的一些示例:
#include <coroutine> #include <iostream> struct promise; struct coroutine : std::coroutine_handle<promise> { using promise_type = ::promise; }; struct promise { coroutine get_return_object() { return {coroutine::from_promise(*this)}; } std::suspend_always initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; struct S { int i; coroutine f() { std::cout << i; co_return; } }; void bad1() { coroutine h = S{0}.f(); // S{0} destroyed h.resume(); // resumed coroutine executes std::cout << i, uses S::i after free h.destroy(); } coroutine bad2() { S s{0}; return s.f(); // returned coroutine can't be resumed without committing use after free } void bad3() { coroutine h = [i = 0]() -> coroutine // a lambda that's also a coroutine { std::cout << i; co_return; }(); // immediately invoked // lambda destroyed h.resume(); // uses (anonymous lambda type)::i after free h.destroy(); } void good() { coroutine h = [](int i) -> coroutine // make i a coroutine parameter { std::cout << i; co_return; }(0); // lambda destroyed h.resume(); // no problem, i has been copied to the coroutine // frame as a by-value parameter h.destroy(); }
当协程达到暂停点时
- 如有必要,在隐式转换为协程的返回类型后,之前获取的返回对象将返回给调用方/恢复器。
- 当协程到达 co_return 语句时,它将执行以下操作:
- 调用承诺。return_void()为
- co_return;
- co_return expr;哪里埃克普尔具有类型空
- 从协程的末尾掉下来。如果类型没有,则行为未定义Promise承诺::return_void()在这种情况下,成员函数。
- 或致电承诺。return_value(EXPR)为co_return expr;哪里埃克普尔具有非空隙型
- 以创建变量的相反顺序销毁具有自动存储持续时间的所有变量。
- 调用承诺。final_suspend()并co_await结果。
如果协程以未捕获的异常结束,它将执行以下操作:
- 捕获异常并调用承诺。unhandled_exception()从捕获块内
- 调用承诺。final_suspend()并co_await结果(例如,恢复延续或发布结果)。从这一点恢复协程是未定义的行为。
当协程状态因通过co_return或未捕获异常终止,或者因为它通过其句柄销毁而被销毁时,它将执行以下操作:
调用承诺对象的析构函数。
调用函数参数副本的析构函数。
调用运算符 Delete 以释放协程状态使用的内存。
将执行传输回调用方/恢复方。
动态分配
协程状态通过非数组运算符 new 动态分配。
如果类型定义了类级替换,则将使用它,否则将使用全局运算符 new。Promise
如果类型定义了采用其他参数的运算符 new 的放置形式,并且它们与参数列表匹配,其中第一个参数是请求的大小(类型为 std::size_t),其余参数是协程函数参数,则这些参数将传递给运算符 new(这使得对协程使用前导分配器约定成为可能)。Promise
在以下情况下,可以优化对运算符 new 的调用(即使使用自定义分配器)
- 协程状态的生存期严格嵌套在调用方的生存期内,并且
- 协程帧的大小在调用站点是已知的。
在这种情况下,协程状态嵌入在调用方的堆栈帧(如果调用方是普通函数)或协程状态(如果调用方是协程)中。
如果分配失败,协程将抛出 std::bad_alloc,除非类型定义了成员函数 Promise::get_return_object_on_allocation_failure()。如果定义了该成员函数,则分配使用运算符 new 的 nothrow 形式,并且在分配失败时,协程会立即将从 Promise::get_return_object_on_allocation_failure() 获得的对象返回给调用方,例如:Promise
struct Coroutine::promise_type { /* ... */ // ensure the use of non-throwing operator-new static Coroutine get_return_object_on_allocation_failure() { std::cerr << "get_return_object_on_allocation_failure()\n"; throw std::bad_alloc(); // or, return Coroutine(nullptr); } // custom non-throwing overload of new void* operator new(std::size_t n) noexcept { if (void* mem = std::malloc(n)) return mem; return nullptr; // allocation failure } };
承诺
类型由编译器使用 std::coroutine_traits 从协程的返回类型确定。Promise
形式上,let 和 design 分别表示协程的返回类型和参数类型列表,并且(如果有的话)分别表示协程所属的类类型及其 cv 限定 如果它被定义为非静态成员函数,则其类型由下式确定:R``Args...``ClassT``_cv-qual_``Promise
- std::coroutine_traits::promise_type,如果协程未定义为非静态成员函数,
- std::coroutine_traits::promise_type,如果协程定义为非 rvalue-reference 限定的非静态成员函数,
- std::coroutine_traits::promise_type,如果协程定义为受右值引用限定的非静态成员函数。
- 如果协程定义为… 那么它的类型是…Promise
task foo(int x); std::coroutine_traits, int>::promise_type
task Bar::foo(int x) const; std::coroutine_traits, const Bar&, int>::promise_type
task bar::foo(int x) &&; std::coroutine_traits, Bar&&, int>::promise_type
co_await
一元运算符co_await挂起协程并将控制权返回给调用方。其操作数是一个表达式,其中 (1) 是定义成员运算符的类类型co_await或可以传递给非成员运算符co_await,或者 (2) 可通过当前协程转换为此类类型承诺::await_transform.
**co_await**
co_await表达式只能出现在正则函数体内的潜在计算表达式中,而不能出现
- 在异常处理程序中,
- 在声明语句中,除非它出现在该声明语句的初始值设定项中,
- 在 init 语句的简单声明中(请参阅 if、switch、for 和 range-for),除非它出现在该 init 语句的初始值设定项中,
- 在默认参数中,或
- 在具有静态或线程存储持续时间的块范围变量的初始值设定项中。
首先,expr 转换为可等待的,如下所示:
- 如果 expr 由初始挂起点、最终挂起点或屈服表达式生成,则“等待”按原样为 EXPR。
- 否则,如果当前协程的类型具有成员函数,则Promise``await_transform承诺。await_transform(扩展).
- 否则,可等待的是 EXPR,按原样。
然后,获取等待者对象,如下所示:
- 如果运算符 co_await 的重载解析提供了单个最佳重载,则等待程序是该调用的结果:
- 等待。运算符 co_await()对于杆件过载,
- 运算符co_await(static_cast<可等待&>(可等待))对于非成员重载。
- 否则,如果重载解析未找到运算符co_await,则等待程序按原样等待。
- 否则,如果重载解析不明确,则程序格式不正确。
如果上面的表达式是 prvalue,则等待者对象是从它实现的临时实例化。否则,如果上面的表达式是 glvalue,则等待者对象是它引用的对象。
然后等待者。await_ready()调用(如果已知结果已准备就绪或可以同步完成,这是避免挂起成本的快捷方式)。如果其结果,上下文转换为布尔值假然后
协程挂起(其协程状态由局部变量和当前挂起点填充)。
等待者。await_suspend(手柄)调用,其中句柄是表示当前协程的协程句柄。在该函数中,可以通过该句柄观察到挂起的协程状态,并且此函数负责安排它以在某个执行器上恢复或销毁(返回错误计数作为调度)
如果返回 void,则立即将控制权返回到当前协程的调用方/恢复器(此协程保持挂起状态),否则await_suspend
- 如果返回布尔值,await_suspend
- 价值真将控制权返回给当前协程的调用方/恢复方
- 价值假恢复当前协程。
- 如果返回某个其他协程的协程句柄,则恢复该句柄(通过调用await_suspend处理。简历())(请注意,这可能会链最终导致当前协程恢复)。
- 如果引发异常,则会捕获异常,恢复协程,并立即重新引发异常。await_suspend
最后等待者。await_resume()被调用(无论协程是否暂停),其结果是整体的结果co_await expr表达。
如果协程在 co_await 表达式中挂起,并且稍后恢复,则恢复点紧接在调用等待者。await_resume().
请注意,因为协程在进入之前已完全挂起等待者。await_suspend(),该函数可以自由地跨线程传输协程句柄,而无需额外的同步。例如,它可以将其放在回调中,该回调计划在异步 I/O 操作完成时在线程池上运行。在这种情况下,由于当前协程可能已恢复并因此执行了等待者对象的析构函数,因此所有内容都同时作为await_suspend()继续在当前线程上执行,await_suspend()应该治疗*本已销毁,并且在句柄发布到其他线程后无法访问它。
例
运行此代码
#include <coroutine> #include <iostream> #include <stdexcept> #include <thread> auto switch_to_new_thread(std::jthread& out) { struct awaitable { std::jthread* p_out; bool await_ready() { return false; } void await_suspend(std::coroutine_handle<> h) { std::jthread& out = *p_out; if (out.joinable()) throw std::runtime_error("Output jthread parameter not empty"); out = std::jthread([h] { h.resume(); }); // Potential undefined behavior: accessing potentially destroyed *this // std::cout << "New thread ID: " << p_out->get_id() << '\n'; std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK } void await_resume() {} }; return awaitable{&out}; } struct task { struct promise_type { task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; }; task resuming_on_new_thread(std::jthread& out) { std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n'; co_await switch_to_new_thread(out); // awaiter destroyed here std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n'; } int main() { std::jthread out; resuming_on_new_thread(out); }
可能的输出:
Coroutine started on thread: 139972277602112 New thread ID: 139972267284224 Coroutine resumed on thread: 139972267284224
注意:等待者对象是协程状态的一部分(作为生存期跨越挂起点的临时对象),并在co_await表达式完成之前销毁。它可用于根据某些异步 I/O API 的要求维护每个操作的状态,而无需诉诸其他动态分配。
标准库定义了两个简单的 await:std::suspend_always 和 std::suspend_never。
co_yield
co_yield表达式向调用方返回一个值并挂起当前协程:它是可恢复生成器函数的通用构建块。
**co_yield** expr
**co_yield** braced-init-list
它相当于
co_await promise.yield_value(expr)
典型的生成器将存储(复制/移动或仅存储地址,因为参数的生存期越过 )其参数内的暂停点)到生成器对象中并返回 std::suspend_always,将控制权转移给调用方/恢复器。yield_value``co_await
运行此代码
#include <coroutine> #include <cstdint> #include <exception> #include <iostream> template <typename T> struct Generator { // The class name 'Generator' is our choice and it is not required for coroutine // magic. Compiler recognizes coroutine by the presence of 'co_yield' keyword. // You can use name 'MyGenerator' (or any other name) instead as long as you include // nested struct promise_type with 'MyGenerator get_return_object()' method. struct promise_type; using handle_type = std::coroutine_handle<promise_type>; struct promise_type // required { T value_; std::exception_ptr exception_; Generator get_return_object() { return Generator(handle_type::from_promise(*this)); } std::suspend_always initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void unhandled_exception() { exception_ = std::current_exception(); } // saving // exception template <std::convertible_to<T> From> // C++20 concept std::suspend_always yield_value(From&& from) { value_ = std::forward<From>(from); // caching the result in promise return {}; } void return_void() { } }; handle_type h_; Generator(handle_type h) : h_(h) { } ~Generator() { h_.destroy(); } explicit operator bool() { fill(); // The only way to reliably find out whether or not we finished coroutine, // whether or not there is going to be a next value generated (co_yield) // in coroutine via C++ getter (operator () below) is to execute/resume // coroutine until the next co_yield point (or let it fall off end). // Then we store/cache result in promise to allow getter (operator() below // to grab it without executing coroutine). return !h_.done(); } T operator()() { fill(); full_ = false; // we are going to move out previously cached // result to make promise empty again return std::move(h_.promise().value_); } private: bool full_ = false; void fill() { if (!full_) { h_(); if (h_.promise().exception_) std::rethrow_exception(h_.promise().exception_); // propagate coroutine exception in called context full_ = true; } } }; Generator<std::uint64_t> fibonacci_sequence(unsigned n) { if (n == 0) co_return; if (n > 94) throw std::runtime_error("Too big Fibonacci sequence. Elements would overflow."); co_yield 0; if (n == 1) co_return; co_yield 1; if (n == 2) co_return; std::uint64_t a = 0; std::uint64_t b = 1; for (unsigned i = 2; i < n; i++) { std::uint64_t s = a + b; co_yield s; a = b; b = s; } } int main() { try { auto gen = fibonacci_sequence(10); // max 94 before uint64_t overflows for (int j = 0; gen; j++) std::cout << "fib(" << j << ")=" << gen() << '\n'; } catch (const std::exception& ex) { std::cerr << "Exception: " << ex.what() << '\n'; } catch (...) { std::cerr << "Unknown exception.\n"; } }
输出:
fib(0)=0 fib(1)=1 fib(2)=1 fib(3)=2 fib(4)=3 fib(5)=5 fib(6)=8 fib(7)=13 fib(8)=21 fib(9)=34
综述
好了,经过这么多的摘录折磨,实际我们都是云里雾里的,所以接下来我们用自己的话说一下。
只要你在代码中发现co_await、co_yield、co_return 这几个关键词,就代表你遇到协程了。
协程的单元是函数,当我们说这是协程的时候,是指这个函数是协程函数。使用协程,是指写一个函数作为协程函数。
注意事项
我们喜爱的auto关键字在协程中不能使用了。
协程不能使用可变参数、纯返回语句或占位符返回类型(auto 或 Concept)。
Consteval 函数、constexpr 函数、构造函数、析构函数和 main 函数不能是协程。
Consteval 函数、constexpr 函数、构造函数、析构函数和 main 函数不能是协程,是因为它们都有特殊的语义和限制,与协程的语义和限制不兼容。例如,Consteval 函数和 constexpr 函数必须在编译时求值,而协程是在运行时执行的;构造函数和析构函数必须按照特定的顺序执行,而协程的执行顺序是不确定的;main 函数是程序的入口,而协程是在程序运行过程中创建和销毁的。因此,将这些函数定义为协程是不合适的。
实际应用场景
这一章节将从实际使用过程中向读者介绍从无到有的使用协程应该经过哪些环节,跟着步骤可以从无到有的实现使用协程完成一个包含zeroMQ和redis的实际应用。
环境准备
首先,我们准备一台服务器(当然虚拟机也可以)。
在上面安装 centos 7 操作系统。(小系统版本在这里不对我们的使用造成影响,不细纠)
照着网上教程 安装 redis 、hiredis、 zeroMQ、zmq.h 和 czmq.h 这些依赖库和辅助服务。这里可以参照我的其他文章,我已经介绍了这部份内容。
开始安装C++ 20 操作环境
首先使用 gcc --version 命令在shell 命令窗口命令行下执行,查看当前的gcc版本。
注意: gcc 的版本需要为10.x.x 以上。8.x.x的gcc版本已经支持C++20,但是C++20的协程库是在gcc 10及以上版本中才被支持的。所以你需要升级你的gcc版本到10或以上才能使用C++20的协程库。
升级gcc版本需要谨慎操作,建议先备份重要数据。你可以通过以下步骤升级gcc版本,并且保证可回退版本:
添加CentOS 7的EPEL仓库:
sudo yum install epel-release
安装devtoolset-10:
sudo yum install centos-release-scl sudo yum install devtoolset-10
启用devtoolset-10:
scl enable devtoolset-10 bash
验证gcc版本:
gcc --version
如果输出的信息中包含了gcc 10.x.x,则说明升级成功。
注意:启用devtoolset-10只对当前终端有效,如果需要永久启用,可以将scl enable devtoolset-10 bash添加到~/.bashrc文件中。如果需要回退版本,可以通过禁用devtoolset-10来实现:
scl disable devtoolset-10
这样就可以回到系统默认的gcc版本了。
一个简单的协程测试
这里的简单测试让我们不必理解代码内容,专注于编译时应当注重的内容
简单demo :
/* * @Author : Zry && 978524088@qq.com * @Date : 2023-06-13 18:09:34 * @LastEditors : Zry && 978524088@qq.com * @LastEditTime : 2023-06-13 18:21:55 * @FilePath : /zryTest/src/include/Mysql/coroutine.cpp * @Description : * * Copyright (c) 2023 by 978524088@qq.com, All Rights Reserved. */ #include <coroutine> #include <iostream> using namespace std; struct CoRet { struct promise_type { int _out; int _res; suspend_never initial_suspend() { return {}; } suspend_always final_suspend() noexcept { return {}; } void unhandled_exception() {} CoRet get_return_object() { return {coroutine_handle<promise_type>::from_promise(*this)}; } suspend_always yield_value(int r) { _out = r; return {}; } void return_value(int r) { _res = r; } }; coroutine_handle<promise_type> _h; // _h.resume(), _h() }; struct Note { int guess; }; struct Input { Note &_in; bool await_ready() { return false; } void await_suspend(coroutine_handle<CoRet::promise_type> h) {} int await_resume() { return _in.guess; } }; CoRet Guess(Note ¬e) { // CoRet::promise_type promise; // CoRet ret = promise.get_return_object(); // co_await promise.initial_suspend(); int res = (rand() % 30) + 1; Input input{note}; int g = co_await input; cout << "coroutine: You guess " << g << endl; co_yield (res > g ? 1 : (res == g ? 0 : -1)); // co_await promise.yield_value() co_return res; // co_await promise.final_suspend(); } int main() { srand(time(nullptr)); Note note; auto ret = Guess(note); cout << "main: make a guess ..." << endl; note.guess = 10; ret._h.resume(); // resume from co_await cout << "main: result is " << ((ret._h.promise()._out == 1) ? "larger" : ((ret._h.promise()._out == 0) ? "the same" : "smaller")) << endl; ret._h.resume(); // resume from co_yield if (ret._h.done()) { cout << "main: the result is " << ret._h.promise()._res << endl; } }
编译方式
使用以下命令编译包含#include 和#include 头文件的C++代码:
g++ -std=c++20 -fcoroutines -o output_file input_file.cpp
其中,-std=c++20表示使用C++20标准,-fcoroutines表示启用协程支持,-o表示指定输出文件名,input_file.cpp表示输入文件名。如果需要链接其他库,可以使用-L和-l选项。例如,如果需要链接libboost_system库,可以使用以下命令:
g++ -std=c++20 -fcoroutines -o output_file input_file.cpp -L/path/to/lib -lboost_system
其中,-L指定库文件所在的路径,-l指定要链接的库名。
注意: 因为我们使用的是devtoolset-10 安装的C++ 20 环境。所以在系统默认路径下找不到 线程库依赖,所以,我们要指明路径。
devtoolset-10安装成功后,对应的gcc库文件在以下路径:
/opt/rh/devtoolset-10/root/usr/lib/gcc/x86_64-redhat-linux/10/
其中,x86_64-redhat-linux是系统架构,10是gcc版本号。在该路径下,可以找到各种gcc库文件,例如libstdc++.so、libgcc_s.so等。
VsCode 怎样设置
作为有一些代码强迫症的人,vscode 纠错显示的一片飘红的波浪线也是很影响心情的。
因此,在使用VsCode 远程开发时,由于devtoolset-10 安装的C++ 20 库信息,在默认路径下找不到,所以 诸如suspend_always、suspend_never是不能识别的。
因此我们需要修改 环境查询路径如下:
![[Pasted image 20230614102209.png]]
将协程、ZeroMQ、Redis 结合起来实现一个高效的分布式通讯和数据处理
实现方法
将协程、ZeroMQ和Redis结合起来可以实现高效的分布式通信和数据处理。具体实现方法如下:
使用C++20标准库中的协程支持,编写协程异步网络通信代码,例如使用boost.asio库或者libuv库。
使用ZeroMQ作为消息队列,将协程异步通信的消息发送到消息队列中。
在另外的协程中,从消息队列中读取消息,并将消息处理结果存储到Redis中。
在需要使用处理结果的地方,从Redis中读取数据并进行处理。
需要注意的是,协程异步通信和消息队列的使用需要考虑线程安全和并发性能等问题,可以使用锁、线程池等技术进行优化。同时,需要根据具体的业务需求和系统架构进行设计和实现。
代码实现
好的,这是封装后的代码,并添加了一些新的功能:
#include <iostream> #include <coroutine> //协程 #include <czmq.h> //zeroMQ #include <hiredis/hiredis.h> //redis using namespace std; // 封装协程异步通信代码 class AsyncCommunication { public: AsyncCommunication(zsock_t* socket, redisContext* redis) : socket_(socket), redis_(redis) {} void operator()(coroutine_handle<> handle) { zmsg_t* msg = zmsg_recv(socket_); if (msg) { // 将消息发送到消息队列中 redisReply* reply = (redisReply*)redisCommand(redis_, "LPUSH message_queue %s", zmsg_popstr(msg)); freeReplyObject(reply); zmsg_destroy(&msg); } handle.resume(); } private: zsock_t* socket_; redisContext* redis_; }; // 封装处理消息代码 class ProcessMessage { public: ProcessMessage(redisContext* redis) : redis_(redis) {} void operator()() { while (true) { redisReply* reply = (redisReply*)redisCommand(redis_, "BRPOP message_queue 0"); if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) { string message = reply->element[1]->str; // 处理消息并将结果存储到Redis中 redisReply* result = (redisReply*)redisCommand(redis_, "SET result_key %s", "result"); freeReplyObject(result); } freeReplyObject(reply); } } private: redisContext* redis_; }; // 封装协程异步通信任务 template<typename Func> void async_communication(zsock_t* socket, redisContext* redis, Func&& func) { coroutine_handle<> handle = coroutine_handle<>::from_address(nullptr); while (true) { AsyncCommunication async_communication(socket, redis); async_communication(handle); func(); } } int main() { // 初始化ZeroMQ和Redis zctx_t* context = zctx_new(); zsock_t* socket = zsock_new_pull("tcp://*:5555"); redisContext* redis = redisConnect("localhost", 6379); // 创建处理消息任务 ProcessMessage process_message(redis); std::thread process_thread(process_message); // 创建协程异步通信任务 async_communication(socket, redis, [&process_thread]() { // 在每次通信完成后检查处理消息任务是否存活,如果已经结束则重新启动 if (process_thread.joinable()) { process_thread.join(); process_thread = std::thread(process_message); } }); // 清理资源 process_thread.join(); redisFree(redis); zsock_destroy(&socket); zctx_destroy(&context); return 0; }
这个版本的代码将协程异步通信和处理消息的代码封装成了两个类,并且将协程异步通信任务的循环放在了一个函数中,使得代码更加简洁和易于理解。同时,还添加了一个新的功能,即在每次通信完成后检查处理消息任务是否存活,如果已经结束则重新启动,以保证处理消息任务一直在运行。