rust 多线程

简介: rust 多线程

多线程并发编程

使用多线程

use std::thread;
use std::time::Duration;
fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

有几点值得注意:

  • 线程内部的代码使用闭包来执行
  • main 线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务
  • thread::sleep 会让当前线程休眠指定的时间,随后其它线程会被调度运行,因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
在线程闭包中使用 move

move 关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move 来将所有权从一个线程转移到另外一个线程。

use std::thread;
fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    handle.join().unwrap();
    // 下面代码会报错borrow of moved value: `v`
    // println!("{:?}",v);
}
线程局部变量(Thread Local Variable)
标准库
use std::cell::RefCell;
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
FOO.with(|f| {
    assert_eq!(*f.borrow(), 1);
    *f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 3;
    });
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
    assert_eq!(*f.borrow(), 2);
});
第三方库
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;
let tls = Arc::new(ThreadLocal::new());
// 创建多个线程
for _ in 0..5 {
    let tls2 = tls.clone();
    thread::spawn(move || {
        // 将计数器加1
        let cell = tls2.get_or(|| Cell::new(0));
        cell.set(cell.get() + 1);
    }).join().unwrap();
}
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());
// 和为5
assert_eq!(total, 5);

线程间的消息传递

use std::sync::mpsc;
use std::thread;
fn main() {
    // 创建一个消息通道, 返回一个元组:(发送者,接收者)
    let (tx, rx) = mpsc::channel();
    // 创建线程,并发送消息
    thread::spawn(move || {
        // 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
        tx.send(1).unwrap();
        // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
        // tx.send(Some(1)).unwrap()
    });
    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}
  • 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
  • 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
使用多发送者
use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();
    thread::spawn(move || {
        tx.send(String::from("hi from raw tx")).unwrap();
    });
    thread::spawn(move || {
        tx1.send(String::from("hi from cloned tx")).unwrap();
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

以上代码并不复杂,但仍有几点需要注意:

  • tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender和mpsc::Receiver类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
  • 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
  • 需要使用move将tx的所有权转移到子线程的闭包中
同步和异步通道
异步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx)= mpsc::channel();
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}
同步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    // 设置消息队列大小
    let (tx, rx)= mpsc::sync_channel(0);
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}
传输多种类型的数据
use std::sync::mpsc::{self, Receiver, Sender};
enum Fruit {
    Apple(u8),
    Orange(String)
}
fn main() {
    let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
    tx.send(Fruit::Orange("sweet".to_string())).unwrap();
    tx.send(Fruit::Apple(2)).unwrap();
    for _ in 0..2 {
        match rx.recv().unwrap() {
            Fruit::Apple(count) => println!("received {} apples", count),
            Fruit::Orange(flavor) => println!("received {} oranges", flavor),
        }
    }
}

线程同步:锁、Condvar 和信号量

共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:

  • 共享内存相对消息传递能节省多次内存拷贝的成本
  • 共享内存的实现简洁的多
  • 共享内存的锁竞争更多
    消息传递适用的场景很多,我们下面列出了几个主要的使用场景:
  • 需要可靠和简单的(简单不等于简洁)实现时
  • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
  • 需要一个任务处理流水线(管道)时,等等
互斥锁 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}
读写锁 RwLock
use std::sync::RwLock;
fn main() {
    let lock = RwLock::new(5);
    // 同一时间允许多个读
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        assert_eq!(*r1, 5);
        assert_eq!(*r2, 5);
    } // 读锁在此处被drop
    // 同一时间只允许一个写
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        assert_eq!(*w, 6);
        // 以下代码会panic,因为读和写不允许同时存在
        // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
        // let r1 = lock.read();
        // println!("{:?}",r1);
    }// 写锁在此处被drop
}

同时允许多个读,但最多只能有一个写

读和写不能同时存在

读可以使用read、try_read,写write、try_write, 在实际项目中,try_xxx会安全的多

线程同步:Atomic 原子类型与内存顺序

原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n {
            R.fetch_add(1, Ordering::Relaxed);
        }
    })
}
fn main() {
    let s = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);
    for _ in 0..N_THREADS {
        threads.push(add_n_times(N_TIMES));
    }
    for thread in threads {
        thread.join().unwrap();
    }
    assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
    println!("{:?}",Instant::now().sub(s));
}
Atomic 能替代锁吗

对于复杂的场景下,锁的使用简单粗暴,不容易有坑

std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型

在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar

基于 Send 和 Sync 的线程安全


相关文章
|
8月前
|
存储 Rust 安全
Rust中的线程基础:创建与管理
本文将深入探讨Rust中的线程基础,包括线程的创建、同步与通信机制,以及Rust提供的线程管理工具。通过理解这些概念,读者将能够更好地利用Rust的并发特性,编写高效且安全的多线程程序。
|
6月前
|
Rust 安全 程序员
Rust与C++的区别及使用问题之Rust解决多线程下的共享的问题如何解决
Rust与C++的区别及使用问题之Rust解决多线程下的共享的问题如何解决
|
6月前
|
Rust 编译器 程序员
Rust与C++的区别及使用问题之Rust避免多线程中的lifetime的问题如何解决
Rust与C++的区别及使用问题之Rust避免多线程中的lifetime的问题如何解决
|
Rust 监控 并行计算
用Rust构建电脑网络监控软件:内存安全性和多线程编程
在当今数字化世界中,网络安全一直是至关重要的问题。电脑网络监控软件是确保网络系统安全和高效运行的关键工具。然而,编写电脑网络监控软件需要处理复杂的多线程编程和内存安全性问题。Rust编程语言提供了一种强大的方式来构建安全的电脑网络监控软件,同时避免了许多常见的编程错误。
358 0
|
2月前
|
Rust 安全 Java
探索Rust语言的并发编程模型
探索Rust语言的并发编程模型
|
2月前
|
Rust 安全 区块链
探索Rust语言:系统编程的新选择
【10月更文挑战第27天】Rust语言以其安全性、性能和并发性在系统编程领域受到广泛关注。本文介绍了Rust的核心特性,如内存安全、高性能和强大的并发模型,以及开发技巧和实用工具,展示了Rust如何改变系统编程的面貌,并展望了其在WebAssembly、区块链和嵌入式系统等领域的未来应用。
|
2月前
|
Rust 安全 Java
编程语言新宠:Rust语言的特性、优势与实战入门
【10月更文挑战第27天】Rust语言以其独特的特性和优势在编程领域迅速崛起。本文介绍Rust的核心特性,如所有权系统和强大的并发处理能力,以及其性能和安全性优势。通过实战示例,如“Hello, World!”和线程编程,帮助读者快速入门Rust。
90 1
|
2月前
|
Rust 安全 编译器
编程语言新宠:Rust语言的特性、优势与实战入门
【10月更文挑战第26天】Rust语言诞生于2006年,由Mozilla公司的Graydon Hoare发起。作为一门系统编程语言,Rust专注于安全和高性能。通过所有权系统和生命周期管理,Rust在编译期就能消除内存泄漏等问题,适用于操作系统、嵌入式系统等高可靠性场景。
116 2
|
2月前
|
Rust 安全
深入理解Rust语言的所有权系统
深入理解Rust语言的所有权系统
41 0
|
2月前
|
Rust 安全 前端开发
探索Rust语言的异步编程模型
探索Rust语言的异步编程模型