使用消息传递在线程间传送数据
确保安全并发的方式是 {消息传递|message passing},这里线程或 actor
通过发送包含数据的消息来相互沟通。这个思想来源于 Go 编程语言文档中
的口号:不要通过共享内存来通讯;而是通过通讯来共享内存。
Rust
中一个实现消息传递并发的主要工具是 {通道|channel},Rust
标准库提供了其实现的编程概念。
编程中的通道有两部分组成,一个{发送者|transmitter}和一个{接收者|receiver}。
- 发送者位于上游位置,在这里可以将橡皮鸭放入河中,
- 接收者则位于下游,橡皮鸭最终会漂流至此。
代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为通道被 关闭(closed
)了。
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); }
这里使用 mpsc::channel
函数创建一个新的通道;mpsc
是 {多个生产者,单个消费者|multiple producer, single consumer}的缩写。简而言之,Rust
标准库实现通道的方式意味着一个通道可以有多个产生值的{发送|sending}端,但只能有一个消费这些值的{接收|receiving}端。
mpsc::channel
函数返回一个元组
:第一个元素是发送端,而第二个元素是接收端。
由于历史原因,tx
和 rx
通常作为 发送者(transmitter
)和 接收者(receiver
)的缩写,所以这就是我们将用来绑定这两端变量的名字。这里使用了一个 let
语句和模式来解构了此元组。
将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
使用 thread::spawn 来创建一个新线程并使用 move 将 tx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有通道的发送端以便能向通道发送消息。
通道的发送端有一个 send 方法用来获取需要放入通道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。
随后,在主线程中从通道的接收端获取值。
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
通道的接收端有两个有用的方法:recv
和 try_recv
。这里,我们使用了 recv
,它是 receive
的缩写。这个方法会阻塞主线程执行直到从通道中接收一个值。一旦发送了一个值,recv
会在一个 Result<T, E>
中返回它。当通道发送端关闭,recv
会返回一个错误表明不会再有新的值到来了。
try_recv
不会阻塞,相反它立刻返回一个 Result<T, E>:Ok
值包含可用的信息,而 Err
值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv
很有用:可以编写一个循环来频繁调用 try_recv
,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。
通道与所有权转移
所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。防止并发编程中的错误是在 Rust
程序中考虑所有权的一大优势。
存在如下代码:
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); println!("val is {}", val); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
尝试在通过 tx.send
发送 val
到通道中之后将其打印出来。
一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。
Rust 会给出一个错误:
error[E0382]: use of moved value: `val` --> src/main.rs:10:31 | 9 | tx.send(val).unwrap(); | --- value moved here 10 | println!("val is {}", val); | ^^^ value used here after move |
发送多个值并观察接收者的等待
构建一个示例:新建线程现在会发送多个消息并在每个消息之间暂停一秒钟。
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
在新建线程中有一个字符串 vector
希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个 Duration
值调用 thread::sleep
函数来暂停一秒。
在主线程中,不再显式调用 recv
函数:而是将 rx
当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当通道被关闭时,迭代器也将结束。
将看到如下输出,每一行都会暂停一秒:
Got: hi Got: from Got: the Got: thread
通过克隆发送者来创建多个生产者
mpsc
是 multiple producer, single consumer
的缩写。
可以运用
mpsc
来创建向同一接收者发送值的多个线程。这可以通过克隆通道的发送端来做到。
let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); }
在创建新线程之前,我们对通道的发送端调用了 clone
方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的通道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向通道的接收端发送不同的消息。
共享状态并发
某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将一个值传送到通道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。
互斥器一次只允许一个线程访问数据
{互斥器|mutex}是 mutual exclusion
的缩写,也就是说,任意时刻,其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 {锁|lock}来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 保护(guarding
)其数据。
互斥锁有如下的规则:
- 在使用数据之前尝试获取锁。
- 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。
Mutex<T>
的 API
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println!("m = {:?}", m); }
像很多类型一样,我们使用关联函数new
来创建一个 Mutex<T>
。使用 lock
方法获取锁,以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。
如果另一个线程拥有锁,并且那个线程 panic
了,则 lock
调用会失败。在这种情况下,没人能够再获取锁,所以这里选择 unwrap
并在遇到这种情况时使线程 panic
。
一旦获取了锁,就可以将返回值(在这里是num
)视为一个其内部数据的可变引用了。类型系统确保了我们在使用 m
中的值之前获取锁:Mutex<i32>
并不是一个 i32
,所以必须获取锁才能使用这个 i32
值。
Mutex<T>
是一个智能指针。更准确的说,lock
调用返回一个叫做 MutexGuard
的智能指针。这个智能指针实现了 Deref
来指向其内部数据;其也提供了一个 Drop
实现当 MutexGuard
离开作用域时自动释放锁。为此,我们不会冒忘记释放锁并阻塞互斥器为其它线程所用的风险,因为锁的释放是自动发生的。
丢弃了锁之后,可以打印出互斥器的值,并发现能够将其内部的 i32
改为 6
。
在线程间共享 Mutex<T>
尝试使用 Mutex<T>
在多个线程间共享值。我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10。
use std::sync::Mutex; use std::thread; fn main() { let counter = Mutex::new(0); let mut handles = vec![]; for _ in 0..10 { let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
创建了一个 counter
变量来存放内含 i32
的 Mutex<T>
。接下来遍历 range
创建了 10
个线程。使用了 thread::spawn
并对所有线程使用了相同的闭包:他们每一个都将调用 lock
方法来获取 Mutex<T>
上的锁,接着将互斥器中的值加一。当一个线程结束执行,num
会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。
在主线程中,收集了所有的 join
句柄,调用它们的 join
方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果。
编译上述代码,会发生如下错误:
error[E0382]: use of moved value: `counter` --> src/main.rs:9:36 | 9 | let handle = thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 10 | let mut num = counter.lock().unwrap(); | ------- use occurs due to use in closure
错误信息表明 counter
值在上一次循环中被移动了。所以 Rust
告诉我们不能将 counter
锁的所有权移动到多个线程中。
多线程和多所有权
通过使用智能指针 Rc<T>
来创建引用计数的值,以便拥有多所有者。将上述代码中的 Mutex<T>
封装进 Rc<T>
中并在将所有权移入线程之前克隆了 Rc<T>
。
use std::rc::Rc; use std::sync::Mutex; use std::thread; fn main() { let counter = Rc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Rc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
不幸的是,Rc<T>
并不能安全的在线程间共享。当 Rc<T>
管理引用计数时,它必须在每一个 clone
调用时增加计数,并在每一个克隆被丢弃时减少计数。Rc<T>
并没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断。
原子引用计数 Arc<T>
所幸 Arc<T>
正是 这么一个类似 Rc<T>
并可以安全的用于并发环境的类型。字母 “a”
代表 {原子性|atomic},所以这是一个{原子引用计数|atomically reference counted}类型。
Arc<T>
和 Rc<T>
有着相同的 API,所以修改程序中的 use
行和 new
调用。
use std::sync::{Mutex, Arc}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
使用 Sync 和 Send trait 的可扩展并发
通过 Send 允许在线程间转移所有权
Send
标记 trait
表明类型的所有权可以在线程间传递。几乎所有的 Rust
类型都是 Send
的,不过有一些例外,包括 Rc<T>
:这是不能 Send
的,因为如果克隆了 Rc<T>
的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。为此,Rc<T>
被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。
Sync 允许多线程访问
Sync
标记 trait
表明一个实现了 Sync
的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T
,如果 &T
(T 的引用)是 Send
的话 T
就是 Sync
的,这意味着其引用就可以安全的发送到另一个线程。
后记
分享是一种态度。
参考资料:《Rust权威指南》
全文完,既然看到这里了,如果觉得不错,随手点个赞和“在看”吧。