rust高级 异步编程 一 future(1)https://developer.aliyun.com/article/1392108
执行器 Executor
Rust 的 Future 是惰性的。其中一个推动它的方式就是在 async 函数中使用 .await 来调用另一个 async 函数,但是这个只能解决 async 内部的问题,那么这些最外层的 async 函数由执行器 executor控制 。
执行器会管理一批 Future (最外层的 async 函数),然后通过不停地 poll 推动它们直到完成。 最开始,执行器会先 poll 一次 Future ,后面就不会主动去 poll 了,而是等待 Future 通过调用 wake 函数来通知它可以继续,它才会继续去 poll 。这种 wake 通知然后 poll 的方式会不断重复,直到 Future 完成。
构建执行器
下面我们将实现一个简单的执行器,它可以同时并发运行多个 Future 。例子中,需要用到 futures 包的 ArcWake 特征,它可以提供一个方便的途径去构建一个 Waker 。编辑 Cargo.toml ,添加下面依赖:
[dependencies] futures = "0.3"
在之前的内容中,我们在 src/lib.rs 中创建了定时器 Future ,现在在 src/main.rs 中来创建程序的主体内容,开始之前,先引入所需的包:
use { futures::{ future::{BoxFuture, FutureExt}, task::{waker_ref, ArcWake}, }, std::{ future::Future, sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, task::{Context, Poll}, time::Duration, }, // 引入之前实现的定时器模块 timer_future::TimerFuture, };
执行器需要从一个消息通道( channel )中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器 poll 。
/// 任务执行器,负责从通道中接收任务然后执行 struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner`负责创建新的`Future`然后将它发送到任务通道中 #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll` struct Task { /// 进行中的Future,在未来的某个时间点会被完成 /// /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于 /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此 /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。 /// /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell` future: Mutex<Option<BoxFuture<'static, ()>>>, /// 可以将该任务自身放回到任务通道中,等待执行器的poll task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // 任务通道允许的最大缓冲数(任务队列的最大长度) // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用 const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender }) }
下面再来添加一个方法用于生成 Future , 然后将它放入任务通道中:
impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("任务队列已满"); } }
在执行器 poll 一个 Future 之前,首先需要调用 wake 方法进行唤醒,然后再由 Waker 负责调度该任务并将其放入任务通道中。创建 Waker 的最简单的方式就是实现 ArcWake 特征,先来为我们的任务实现 ArcWake 特征,这样它们就能被转变成 Waker 然后被唤醒:
impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll` let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("任务队列已满"); } }
当任务实现了 ArcWake 特征后,它就变成了 Waker ,在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc ),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 poll 执行:
impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它 let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // 基于任务自身创建一个 `LocalWaker` let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名 // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>` if future.as_mut().poll(context).is_pending() { // Future还没执行完,因此将它放回任务中,等待下次被poll *future_slot = Some(future); } } } } }
恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 Future :
fn main() { let (executor, spawner) = new_executor_and_spawner(); // 生成一个任务 spawner.spawn(async { println!("howdy!"); // 创建定时器Future,并等待它完成 TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来 drop(spawner); // 运行执行器直到任务队列为空 // 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!` executor.run(); }
完整代码
main.rs
use { futures::{ future::{BoxFuture, FutureExt}, task::{waker_ref, ArcWake}, }, std::{ future::Future, sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, task::{Context, Poll}, time::Duration, }, // 引入之前实现的定时器模块 timer_future::TimerFuture, }; /// 任务执行器,负责从通道中接收任务然后执行 struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner`负责创建新的`Future`然后将它发送到任务通道中 #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll` struct Task { /// 进行中的Future,在未来的某个时间点会被完成 /// /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于 /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此 /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。 /// /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell` future: Mutex<Option<BoxFuture<'static, ()>>>, /// 可以将该任务自身放回到任务通道中,等待执行器的poll task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // 任务通道允许的最大缓冲数(任务队列的最大长度) // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用 const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);//设置缓冲通道 (Executor { ready_queue }, Spawner { task_sender }) } impl Spawner { // 派生任务函数 fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("任务队列已满"); } } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll` let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("任务队列已满"); } } impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它 let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // 基于任务自身创建一个 `LocalWaker` let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名 // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>` if future.as_mut().poll(context).is_pending() { // Future还没执行完,因此将它放回任务中,等待下次被poll *future_slot = Some(future); } } } } } fn main() { let (executor, spawner) = new_executor_and_spawner(); // 生成一个任务 spawner.spawn(async { println!("howdy!"); // 创建定时器Future,并等待它完成 TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来 drop(spawner); // 运行执行器直到任务队列为空 // 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!` executor.run(); }
lib.rs
use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; pub struct TimerFuture { // Arc是一种能够使得数据在线程间安全共享的智能指针.它的工作方式从本质上来讲,是对将要共享的数据进行包装,并表现为此数据的一个指针。 // Arc会追踪这个指针的所有拷贝,当最后一份拷贝离开作用域时,它就会安全释放内存。 shared_state: Arc<Mutex<SharedState>>, } /// 在Future和等待的线程间共享状态 struct SharedState { /// 定时(睡眠)是否结束 completed: bool, /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务 waker: Option<Waker>, } impl Future for TimerFuture { type Output = (); // 函数没有返回值,那么返回一个 () // 通过 ; 结尾的表达式返回一个 () fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // 通过检查共享状态,来确定定时器是否已经完成 let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { // 计算完成,弹出计算数据 Poll::Ready(()) } else { // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作, // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。 // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次, // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务 shared_state.waker = Some(cx.waker().clone()); // 设置pending状态 Poll::Pending } } } impl TimerFuture { /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成 pub fn new(duration: Duration) -> Self { // 设置状态和唤醒函数 let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // 创建新线程 let thread_shared_state = shared_state.clone(); // thread::spawn 函数的返回值类型是 JoinHandle, 通过 JoinHandle 来等待所有线程完成就可以解决上面执行不完的问题。 thread::spawn(move || { //move 闭包通常和 thread::spawn 函数一起使用,它允许你使用其它线程的数据。创建线程时,把值的所有权从一个线程转移到另一个线程。 // 睡眠指定时间实现计时功能 thread::sleep(duration); // 线程加锁 let mut shared_state = thread_shared_state.lock().unwrap(); // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了 shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); // 返回创建的对象 TimerFuture { shared_state } } }