Async 编程简介
- OS 线程, 它最简单,也无需改变任何编程模型(业务/代码逻辑),因此非常适合作为语言的原生并发模型,我们在多线程章节也提到过,Rust 就选择了原生支持线程级的并发编程。但是,这种模型也有缺点,例如线程间的同步将变得更加困难,线程间的上下文切换损耗较大。使用线程池在一定程度上可以提升性能,但是对于 IO 密集的场景来说,线程池还是不够。
- 事件驱动(Event driven),如果说事件驱动常常跟回调( Callback )一起使用,相信大家就恍然大悟了。这种模型性能相当的好,但最大的问题就是存在回调地狱的风险:非线性的控制流和结果处理导致了数据流向和错误传播变得难以掌控,还会导致代码可维护性和可读性的大幅降低。
- 协程(Coroutines) Go 语言的协程设计就非常优秀,这也是 Go 语言能够迅速火遍全球的杀手锏之一。协程跟线程类似,无需改变编程模型,同时,它也跟 async 类似,可以支持大量的任务并发运行。但协程抽象层次过高,导致用户无法接触到底层的细节,这对于系统编程语言和自定义异步运行时是难以接受的
- actor 模型是 erlang 的杀手锏之一,它将所有并发计算分割成一个一个单元,这些单元被称为 actor , 单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。由于 actor 模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用
- async/await, 该模型性能高,还能支持底层编程,同时又像线程和协程那样无需过多的改变编程模型,但有得必有失,async 模型的问题就是内部实现机制过于复杂,对于用户来说,理解和使用起来也没有线程和协程简单,好在前者的复杂性开发者们已经帮我们封装好,而理解和使用起来不够简单。
对于长时间运行的 CPU 密集型任务,例如并行计算,使用线程将更有优势。 这种密集任务往往会让所在的线程持续运行,任何不必要的线程切换都会带来性能损耗,因此高并发反而在此时成为了一种多余。同时你所创建的线程数应该等于 CPU 核心数,充分利用 CPU 的并行能力,甚至还可以将线程绑定到 CPU 核心上,进一步减少线程上下文切换。
而高并发更适合 IO 密集型任务,例如 web 服务器、数据库连接等等网络服务,因为这些任务绝大部分时间都处于等待状态,如果使用多线程,那线程大量时间会处于无所事事的状态,再加上线程上下文切换的高昂代价,让多线程做 IO 密集任务变成了一件非常奢侈的事。而使用async,既可以有效的降低 CPU 和内存的负担,又可以让大量的任务并发的运行,一个任务一旦处于IO或者其他等待(阻塞)状态,就会被立刻切走并执行另一个任务,而这里的任务切换的性能开销要远远低于使用多线程时的线程上下文切换。
async/.await 简单入门
async/.await 是 Rust 内置的语言特性,可以让我们用同步的方式去编写异步的代码。
通过 async 标记的语法块会被转换成实现了Future特征的状态机。 与同步调用阻塞当前线程不同,当Future执行并遇到阻塞时,它会让出当前线程的控制权,这样其它的Future就可以在该线程中运行,这种方式完全不会导致当前线程的阻塞。
下面我们来通过例子学习 async/.await 关键字该如何使用,在开始之前,需要先引入 futures 包。编辑 Cargo.toml 文件并添加以下内容:
[dependencies] futures = "0.3"
// `block_on`会阻塞当前线程直到指定的`Future`执行完成,这种阻塞当前线程以等待任务完成的方式较为简单、粗暴, // 好在其它运行时的执行器(executor)会提供更加复杂的行为,例如将多个`future`调度到同一个线程上执行。 use futures::executor::block_on; async fn hello_world() { hello_cat().await;//等待异步方法完成 println!("hello, world!"); } async fn hello_cat() { println!("hello, kitty!"); } fn main() { let future = hello_world(); // 返回一个Future, 因此不会打印任何输出 block_on(future); // 执行`Future`并等待其运行完成,此时"hello, world!"会被打印输出 }
总之,在async fn函数中使用.await可以等待另一个异步调用的完成。但是与block_on不同,.await并不会阻塞当前的线程,而是异步的等待Future A的完成,在等待的过程中,该线程还可以继续执行其它的Future B,最终实现了并发处理的效果。
Future 执行器与任务调度
Future 特征
首先,来给出 Future 的定义:它是一个能产出值的异步计算(虽然该值可能为空,例如 () )。光看这个定义,一个简化版的 Future 特征:
trait SimpleFuture { // 设置关联类型 type Output; // 输出计算完成的结果或者输出Pending表示本次不能够完成计算 fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } // Future 需要被执行器poll(轮询)后才能运行,并不能保证在一次 poll 中就被执行完, enum Poll<T> { Ready(T), Pending, }
若在当前 poll 中, Future 可以被完成,则会返回 Poll::Ready(result) ,反之则返回 Poll::Pending, 并且安排一个 wake 函数:当未来 Future 准备好进一步执行时, 该函数会被调用,然后管理该 Future 的执行器会再次调用 poll 方法,此时 Future 就可以继续执行了。
用一个例子来说明下。考虑一个需要从 socket 读取数据的场景:如果有数据,可以直接读取数据并返回 Poll::Ready(data), 但如果没有数据,Future 会被阻塞且不会再继续执行,此时它会注册一个 wake 函数,当 socket 数据准备好时,该函数将被调用以通知执行器:我们的 Future 已经准备好了,可以继续执行。
pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // socket有数据,写入buffer中并返回 Poll::Ready(self.socket.read_buf()) } else { // socket中还没数据 // 注册一个`wake`函数,当数据可用时,该函数会被调用, // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据 self.socket.set_readable_callback(wake); Poll::Pending } } }
Future 模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个 Future或链式调用多个 Future ,也可以通过无内存分配的状态机实现,例如:
trait SimpleFuture { // 关联类型是 trait 定义中的类型占位符。定义的时候,并不定义它的具体的类型是什么。在 impl 这个 trait 的时候,才为这个关联类型赋予确定的类型。 type Output;//关联类型 fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } /// 一个SimpleFuture,它会并发地运行两个Future直到它们完成 /// /// 之所以可以并发,是因为两个Future的轮询可以交替进行,一个阻塞,另一个就可以立刻执行,反之亦然 pub struct Join<FutureA, FutureB> { // 结构体的每个字段都包含一个Future,可以运行直到完成. // 等到Future完成后,字段会被设置为 `None`. 这样Future完成后,就不会再被轮询 a: Option<FutureA>, b: Option<FutureB>, } impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { // 尝试去完成一个 Future `a` if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } } // 尝试去完成一个 Future `b` if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } } if self.a.is_none() && self.b.is_none() { // 两个 Future都已完成 - 我们可以成功地返回了 Poll::Ready(()) } else { // 至少还有一个 Future 没有完成任务,因此返回 `Poll::Pending`. // 当该 Future 再次准备好时,通过调用`wake()`函数来继续执行 Poll::Pending } } }
上面代码展示了如何同时运行多个 Future, 且在此过程中没有任何内存分配,让并发编程更加高效。 类似的,多个Future也可以一个接一个的连续运行:
/// 一个SimpleFuture, 它使用顺序的方式,一个接一个地运行两个Future // // 而真实的`Andthen`允许根据第一个`Future`的输出来创建第二个`Future`,因此复杂的多。 pub struct AndThenFut<FutureA, FutureB> { first: Option<FutureA>, second: FutureB, } impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if let Some(first) = &mut self.first { match first.poll(wake) { // 我们已经完成了第一个 Future, 可以将它移除, 然后准备开始运行第二个 Poll::Ready(()) => self.first.take(), // 第一个 Future 还不能完成 Poll::Pending => return Poll::Pending, }; } // 运行到这里,说明第一个Future已经完成,尝试去完成第二个 self.second.poll(wake) } }
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 Future 特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 Future 特征有何不同之处。
trait Future { type Output; fn poll( // 首先值得注意的地方是,`self`的类型从`&mut self`变成了`Pin<&mut Self>`: self: Pin<&mut Self>, // 其次将`wake: fn()` 修改为 `cx: &mut Context<'_>`: cx: &mut Context<'_>, ) -> Poll<Self::Output>; }
首先这里多了一个 Pin ,现在你只需要知道使用它可以创建一个无法被移动的 Future ,因为无法被移动,所以它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: struct MyFut { a: i32, ptr_to_a: *const i32 }。
其次,从 wake: fn() 变成了 &mut Context<’_> 。意味着 wake 函数可以携带数据了。
使用 Waker 来唤醒任务
对于 Future 来说,第一次被 poll 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 poll 进而继续往下执行,该通知就是通过 Waker 类型完成的。
Waker 提供了一个 wake() 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 Future 再次进行 poll 操作。
构建一个定时器
下面一起来实现一个简单的定时器 Future 。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 Future 。
注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 cargo new --lib timer_future 来创建一个新工程,在 lib 包的根路径 src/lib.rs 中添加以下内容:
use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, };
继续来实现 Future 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 Future ,由于是多线程环境,我们需要使用 Arc//> 来作为一个共享状态,用于在新线程和 Future 定时器间共享。
pub struct TimerFuture { // Arc是一种能够使得数据在线程间安全共享的智能指针.它的工作方式从本质上来讲,是对将要共享的数据进行包装,并表现为此数据的一个指针。 // Arc会追踪这个指针的所有拷贝,当最后一份拷贝离开作用域时,它就会安全释放内存。 shared_state: Arc<Mutex<SharedState>>, } /// 在Future和等待的线程间共享状态 struct SharedState { /// 定时(睡眠)是否结束 completed: bool, /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务 waker: Option<Waker>, }
下面给出 Future 的具体实现:
impl Future for TimerFuture { type Output = (); // 函数没有返回值,那么返回一个 () // 通过 ; 结尾的表达式返回一个 () fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // 通过检查共享状态,来确定定时器是否已经完成 let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { // 计算完成,弹出计算数据 Poll::Ready(()) } else { // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作, // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。 // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次, // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务 shared_state.waker = Some(cx.waker().clone()); // 设置pending状态 Poll::Pending } } }
代码很简单,只要新线程设置了 shared_state.completed = true ,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 Waker ,这样新线程就可以使用它来唤醒当前的任务。
最后,再来创建一个 API 用于构建定时器和启动计时线程:
impl TimerFuture { /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成 pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // 创建新线程 let thread_shared_state = shared_state.clone(); thread::spawn(move || { // 睡眠指定时间实现计时功能 thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了 shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } }
rust高级 异步编程 一 future(2)https://developer.aliyun.com/article/1392109