多线程并发编程
使用多线程
use std::thread; use std::time::Duration; fn main() { thread::spawn(|| { for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } }
有几点值得注意:
- 线程内部的代码使用闭包来执行
- main 线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务
- thread::sleep 会让当前线程休眠指定的时间,随后其它线程会被调度运行,因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
在线程闭包中使用 move
move 关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move 来将所有权从一个线程转移到另外一个线程。
use std::thread; fn main() { let v = vec![1, 2, 3]; let handle = thread::spawn(move || { println!("Here's a vector: {:?}", v); }); handle.join().unwrap(); // 下面代码会报错borrow of moved value: `v` // println!("{:?}",v); }
线程局部变量(Thread Local Variable)
标准库
use std::cell::RefCell; use std::thread; thread_local!(static FOO: RefCell<u32> = RefCell::new(1)); FOO.with(|f| { assert_eq!(*f.borrow(), 1); *f.borrow_mut() = 2; }); // 每个线程开始时都会拿到线程局部变量的FOO的初始值 let t = thread::spawn(move|| { FOO.with(|f| { assert_eq!(*f.borrow(), 1); *f.borrow_mut() = 3; }); }); // 等待线程完成 t.join().unwrap(); // 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2 FOO.with(|f| { assert_eq!(*f.borrow(), 2); });
第三方库
use thread_local::ThreadLocal; use std::sync::Arc; use std::cell::Cell; use std::thread; let tls = Arc::new(ThreadLocal::new()); // 创建多个线程 for _ in 0..5 { let tls2 = tls.clone(); thread::spawn(move || { // 将计数器加1 let cell = tls2.get_or(|| Cell::new(0)); cell.set(cell.get() + 1); }).join().unwrap(); } // 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和 let tls = Arc::try_unwrap(tls).unwrap(); let total = tls.into_iter().fold(0, |x, y| x + y.get()); // 和为5 assert_eq!(total, 5);
线程间的消息传递
use std::sync::mpsc; use std::thread; fn main() { // 创建一个消息通道, 返回一个元组:(发送者,接收者) let (tx, rx) = mpsc::channel(); // 创建线程,并发送消息 thread::spawn(move || { // 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理 tx.send(1).unwrap(); // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误 // tx.send(Some(1)).unwrap() }); // 在主线程中接收子线程发送的消息并输出 println!("receive {}", rx.recv().unwrap()); }
- 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
- 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
使用多发送者
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { tx.send(String::from("hi from raw tx")).unwrap(); }); thread::spawn(move || { tx1.send(String::from("hi from cloned tx")).unwrap(); }); for received in rx { println!("Got: {}", received); } }
以上代码并不复杂,但仍有几点需要注意:
- tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender和mpsc::Receiver类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
- 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
- 需要使用move将tx的所有权转移到子线程的闭包中
同步和异步通道
异步通道
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx)= mpsc::channel(); let handle = thread::spawn(move || { println!("发送之前"); tx.send(1).unwrap(); println!("发送之后"); }); println!("睡眠之前"); thread::sleep(Duration::from_secs(3)); println!("睡眠之后"); println!("receive {}", rx.recv().unwrap()); handle.join().unwrap(); }
同步通道
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { // 设置消息队列大小 let (tx, rx)= mpsc::sync_channel(0); let handle = thread::spawn(move || { println!("发送之前"); tx.send(1).unwrap(); println!("发送之后"); }); println!("睡眠之前"); thread::sleep(Duration::from_secs(3)); println!("睡眠之后"); println!("receive {}", rx.recv().unwrap()); handle.join().unwrap(); }
传输多种类型的数据
use std::sync::mpsc::{self, Receiver, Sender}; enum Fruit { Apple(u8), Orange(String) } fn main() { let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel(); tx.send(Fruit::Orange("sweet".to_string())).unwrap(); tx.send(Fruit::Apple(2)).unwrap(); for _ in 0..2 { match rx.recv().unwrap() { Fruit::Apple(count) => println!("received {} apples", count), Fruit::Orange(flavor) => println!("received {} oranges", flavor), } } }
线程同步:锁、Condvar 和信号量
共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:
- 共享内存相对消息传递能节省多次内存拷贝的成本
- 共享内存的实现简洁的多
- 共享内存的锁竞争更多
消息传递适用的场景很多,我们下面列出了几个主要的使用场景: - 需要可靠和简单的(简单不等于简洁)实现时
- 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
- 需要一个任务处理流水线(管道)时,等等
互斥锁 Mutex
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
读写锁 RwLock
use std::sync::RwLock; fn main() { let lock = RwLock::new(5); // 同一时间允许多个读 { let r1 = lock.read().unwrap(); let r2 = lock.read().unwrap(); assert_eq!(*r1, 5); assert_eq!(*r2, 5); } // 读锁在此处被drop // 同一时间只允许一个写 { let mut w = lock.write().unwrap(); *w += 1; assert_eq!(*w, 6); // 以下代码会panic,因为读和写不允许同时存在 // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中 // let r1 = lock.read(); // println!("{:?}",r1); }// 写锁在此处被drop }
同时允许多个读,但最多只能有一个写
读和写不能同时存在
读可以使用read、try_read,写write、try_write, 在实际项目中,try_xxx会安全的多
线程同步:Atomic 原子类型与内存顺序
原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。
use std::ops::Sub; use std::sync::atomic::{AtomicU64, Ordering}; use std::thread::{self, JoinHandle}; use std::time::Instant; const N_TIMES: u64 = 10000000; const N_THREADS: usize = 10; static R: AtomicU64 = AtomicU64::new(0); fn add_n_times(n: u64) -> JoinHandle<()> { thread::spawn(move || { for _ in 0..n { R.fetch_add(1, Ordering::Relaxed); } }) } fn main() { let s = Instant::now(); let mut threads = Vec::with_capacity(N_THREADS); for _ in 0..N_THREADS { threads.push(add_n_times(N_TIMES)); } for thread in threads { thread.join().unwrap(); } assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed)); println!("{:?}",Instant::now().sub(s)); }
Atomic 能替代锁吗
对于复杂的场景下,锁的使用简单粗暴,不容易有坑
std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型
在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar