rust 多线程

简介: rust 多线程

多线程并发编程

使用多线程

use std::thread;
use std::time::Duration;
fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

有几点值得注意:

  • 线程内部的代码使用闭包来执行
  • main 线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务
  • thread::sleep 会让当前线程休眠指定的时间,随后其它线程会被调度运行,因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
在线程闭包中使用 move

move 关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move 来将所有权从一个线程转移到另外一个线程。

use std::thread;
fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    handle.join().unwrap();
    // 下面代码会报错borrow of moved value: `v`
    // println!("{:?}",v);
}
线程局部变量(Thread Local Variable)
标准库
use std::cell::RefCell;
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
FOO.with(|f| {
    assert_eq!(*f.borrow(), 1);
    *f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 3;
    });
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
    assert_eq!(*f.borrow(), 2);
});
第三方库
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;
let tls = Arc::new(ThreadLocal::new());
// 创建多个线程
for _ in 0..5 {
    let tls2 = tls.clone();
    thread::spawn(move || {
        // 将计数器加1
        let cell = tls2.get_or(|| Cell::new(0));
        cell.set(cell.get() + 1);
    }).join().unwrap();
}
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());
// 和为5
assert_eq!(total, 5);

线程间的消息传递

use std::sync::mpsc;
use std::thread;
fn main() {
    // 创建一个消息通道, 返回一个元组:(发送者,接收者)
    let (tx, rx) = mpsc::channel();
    // 创建线程,并发送消息
    thread::spawn(move || {
        // 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
        tx.send(1).unwrap();
        // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
        // tx.send(Some(1)).unwrap()
    });
    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}
  • 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
  • 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
使用多发送者
use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();
    thread::spawn(move || {
        tx.send(String::from("hi from raw tx")).unwrap();
    });
    thread::spawn(move || {
        tx1.send(String::from("hi from cloned tx")).unwrap();
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

以上代码并不复杂,但仍有几点需要注意:

  • tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender和mpsc::Receiver类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
  • 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
  • 需要使用move将tx的所有权转移到子线程的闭包中
同步和异步通道
异步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx)= mpsc::channel();
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}
同步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    // 设置消息队列大小
    let (tx, rx)= mpsc::sync_channel(0);
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}
传输多种类型的数据
use std::sync::mpsc::{self, Receiver, Sender};
enum Fruit {
    Apple(u8),
    Orange(String)
}
fn main() {
    let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
    tx.send(Fruit::Orange("sweet".to_string())).unwrap();
    tx.send(Fruit::Apple(2)).unwrap();
    for _ in 0..2 {
        match rx.recv().unwrap() {
            Fruit::Apple(count) => println!("received {} apples", count),
            Fruit::Orange(flavor) => println!("received {} oranges", flavor),
        }
    }
}

线程同步:锁、Condvar 和信号量

共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:

  • 共享内存相对消息传递能节省多次内存拷贝的成本
  • 共享内存的实现简洁的多
  • 共享内存的锁竞争更多
    消息传递适用的场景很多,我们下面列出了几个主要的使用场景:
  • 需要可靠和简单的(简单不等于简洁)实现时
  • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
  • 需要一个任务处理流水线(管道)时,等等
互斥锁 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}
读写锁 RwLock
use std::sync::RwLock;
fn main() {
    let lock = RwLock::new(5);
    // 同一时间允许多个读
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        assert_eq!(*r1, 5);
        assert_eq!(*r2, 5);
    } // 读锁在此处被drop
    // 同一时间只允许一个写
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        assert_eq!(*w, 6);
        // 以下代码会panic,因为读和写不允许同时存在
        // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
        // let r1 = lock.read();
        // println!("{:?}",r1);
    }// 写锁在此处被drop
}

同时允许多个读,但最多只能有一个写

读和写不能同时存在

读可以使用read、try_read,写write、try_write, 在实际项目中,try_xxx会安全的多

线程同步:Atomic 原子类型与内存顺序

原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n {
            R.fetch_add(1, Ordering::Relaxed);
        }
    })
}
fn main() {
    let s = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);
    for _ in 0..N_THREADS {
        threads.push(add_n_times(N_TIMES));
    }
    for thread in threads {
        thread.join().unwrap();
    }
    assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
    println!("{:?}",Instant::now().sub(s));
}
Atomic 能替代锁吗

对于复杂的场景下,锁的使用简单粗暴,不容易有坑

std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型

在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar

基于 Send 和 Sync 的线程安全


相关文章
|
6月前
|
存储 Rust 安全
Rust中的线程基础:创建与管理
本文将深入探讨Rust中的线程基础,包括线程的创建、同步与通信机制,以及Rust提供的线程管理工具。通过理解这些概念,读者将能够更好地利用Rust的并发特性,编写高效且安全的多线程程序。
|
4月前
|
Rust 安全 程序员
Rust与C++的区别及使用问题之Rust解决多线程下的共享的问题如何解决
Rust与C++的区别及使用问题之Rust解决多线程下的共享的问题如何解决
|
4月前
|
Rust 编译器 程序员
Rust与C++的区别及使用问题之Rust避免多线程中的lifetime的问题如何解决
Rust与C++的区别及使用问题之Rust避免多线程中的lifetime的问题如何解决
|
Rust 监控 并行计算
用Rust构建电脑网络监控软件:内存安全性和多线程编程
在当今数字化世界中,网络安全一直是至关重要的问题。电脑网络监控软件是确保网络系统安全和高效运行的关键工具。然而,编写电脑网络监控软件需要处理复杂的多线程编程和内存安全性问题。Rust编程语言提供了一种强大的方式来构建安全的电脑网络监控软件,同时避免了许多常见的编程错误。
348 0
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
23天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
17 3
|
23天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2
|
23天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2
|
23天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
28 1
|
23天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
33 1