Rust学习笔记之并发(二)

简介: Rust学习笔记之并发(二)

使用消息传递在线程间传送数据

确保安全并发的方式是 {消息传递|message passing},这里线程或 actor 通过发送包含数据的消息来相互沟通。这个思想来源于 Go 编程语言文档中 的口号:不要通过共享内存来通讯;而是通过通讯来共享内存。

Rust 中一个实现消息传递并发的主要工具是 {通道|channel}Rust 标准库提供了其实现的编程概念。

编程中的通道有两部分组成,一个{发送者|transmitter}和一个{接收者|receiver}

  • 发送者位于上游位置,在这里可以将橡皮鸭放入河中,
  • 接收者则位于下游,橡皮鸭最终会漂流至此。

代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为通道被 关闭(closed)了。

use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
}

这里使用 mpsc::channel 函数创建一个新的通道;mpsc{多个生产者,单个消费者|multiple producer, single consumer}的缩写。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个产生值的{发送|sending}端,但只能有一个消费这些值的{接收|receiving}端。

mpsc::channel 函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。

由于历史原因,txrx 通常作为 发送者(transmitter)和 接收者(receiver)的缩写,所以这就是我们将用来绑定这两端变量的名字。这里使用了一个 let 语句和模式来解构了此元组。

将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

使用 thread::spawn 来创建一个新线程并使用 move 将 tx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有通道的发送端以便能向通道发送消息。

通道的发送端有一个 send 方法用来获取需要放入通道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。

随后,在主线程中从通道的接收端获取值。

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

通道的接收端有两个有用的方法:recvtry_recv。这里,我们使用了 recv,它是 receive 的缩写。这个方法会阻塞主线程执行直到从通道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。当通道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。

try_recv不会阻塞,相反它立刻返回一个 Result<T, E>:Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。


通道与所有权转移

所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。防止并发编程中的错误是在 Rust 程序中考虑所有权的一大优势。

存在如下代码:

use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

尝试在通过 tx.send 发送 val 到通道中之后将其打印出来。

一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。

Rust 会给出一个错误:

error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |

发送多个值并观察接收者的等待

构建一个示例:新建线程现在会发送多个消息并在每个消息之间暂停一秒钟。

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒。

在主线程中,不再显式调用 recv 函数:而是rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当通道被关闭时,迭代器也将结束。

将看到如下输出,每一行都会暂停一秒

Got: hi
Got: from
Got: the
Got: thread

通过克隆发送者来创建多个生产者

mpscmultiple producer, single consumer 的缩写。

可以运用 mpsc 来创建向同一接收者发送值的多个线程。这可以通过克隆通道的发送端来做到。

let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];
    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});
thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];
    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});
for received in rx {
    println!("Got: {}", received);
}

在创建新线程之前,我们对通道的发送端调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的通道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向通道的接收端发送不同的消息


共享状态并发

某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将一个值传送到通道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。


互斥器一次只允许一个线程访问数据

{互斥器|mutex}mutual exclusion 的缩写,也就是说,任意时刻,其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 {锁|lock}来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 保护(guarding)其数据。

互斥锁有如下的规则:

  1. 在使用数据之前尝试获取锁。
  2. 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。

Mutex<T>的 API

use std::sync::Mutex;
fn main() {
    let m = Mutex::new(5);
    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }
    println!("m = {:?}", m);
}

像很多类型一样,我们使用关联函数new 来创建一个 Mutex<T>使用 lock 方法获取锁,以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。

如果另一个线程拥有锁,并且那个线程 panic 了,则 lock 调用会失败。在这种情况下,没人能够再获取锁,所以这里选择 unwrap 并在遇到这种情况时使线程 panic

一旦获取了锁,就可以将返回值(在这里是num)视为一个其内部数据的可变引用了。类型系统确保了我们在使用 m 中的值之前获取锁:Mutex<i32> 并不是一个 i32,所以必须获取锁才能使用这个 i32 值。

Mutex<T> 是一个智能指针。更准确的说,lock 调用返回一个叫做 MutexGuard 的智能指针。这个智能指针实现了 Deref 来指向其内部数据;其也提供了一个 Drop 实现当 MutexGuard离开作用域时自动释放锁。为此,我们不会冒忘记释放锁并阻塞互斥器为其它线程所用的风险,因为锁的释放是自动发生的

丢弃了锁之后,可以打印出互斥器的值,并发现能够将其内部的 i32 改为 6


在线程间共享 Mutex<T>

尝试使用 Mutex<T> 在多个线程间共享值。我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10。

use std::sync::Mutex;
use std::thread;
fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];
    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}

创建了一个 counter 变量来存放内含 i32Mutex<T>。接下来遍历 range 创建了 10 个线程。使用了 thread::spawn 并对所有线程使用了相同的闭包:他们每一个都将调用 lock 方法来获取 Mutex<T> 上的锁,接着将互斥器中的值加一。当一个线程结束执行,num 会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。

在主线程中,收集了所有的 join 句柄,调用它们的 join 方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果。

编译上述代码,会发生如下错误:

error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here,
in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

错误信息表明 counter 值在上一次循环中被移动了。所以 Rust 告诉我们不能将 counter 锁的所有权移动到多个线程中。


多线程和多所有权

通过使用智能指针 Rc<T> 来创建引用计数的值,以便拥有多所有者。将上述代码中的 Mutex<T> 封装进 Rc<T> 中并在将所有权移入线程之前克隆了 Rc<T>

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}

不幸的是,Rc<T> 并不能安全的在线程间共享。当 Rc<T> 管理引用计数时,它必须在每一个 clone 调用时增加计数,并在每一个克隆被丢弃时减少计数。Rc<T>没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断。


原子引用计数 Arc<T>

所幸 Arc<T> 正是 这么一个类似 Rc<T> 并可以安全的用于并发环境的类型。字母 “a” 代表 {原子性|atomic},所以这是一个{原子引用计数|atomically reference counted}类型。

Arc<T>Rc<T> 有着相同的 API,所以修改程序中的 use 行和 new 调用。

use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}


使用 Sync 和 Send trait 的可扩展并发

通过 Send 允许在线程间转移所有权

Send 标记 trait 表明类型的所有权可以在线程间传递。几乎所有的 Rust 类型都是 Send 的,不过有一些例外,包括 Rc<T>:这是不能 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。为此,Rc<T> 被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。


Sync 允许多线程访问

Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T,如果 &T(T 的引用)是 Send 的话 T 就是 Sync,这意味着其引用就可以安全的发送到另一个线程。


后记

分享是一种态度

参考资料:《Rust权威指南》

全文完,既然看到这里了,如果觉得不错,随手点个赞和“在看”吧。

相关文章
|
1月前
|
Rust 安全 云计算
Rust语言入门:安全性与并发性的完美结合
【10月更文挑战第25天】Rust 是一种系统级编程语言,以其独特的安全性和并发性保障而著称。它提供了与 C 和 C++ 相当的性能,同时确保内存安全,避免了常见的安全问题。Rust 的所有权系统通过编译时检查保证内存安全,其零成本抽象设计使得抽象不会带来额外的性能开销。Rust 还提供了强大的并发编程工具,如线程、消息传递和原子操作,确保了数据竞争的编译时检测。这些特性使 Rust 成为编写高效、安全并发代码的理想选择。
28 0
|
3月前
|
Rust 安全 调度
30天拿下Rust之并发
30天拿下Rust之并发
37 0
|
4月前
|
数据采集 Rust 安全
Rust在网络爬虫中的应用与实践:探索内存安全与并发处理的奥秘
【8月更文挑战第31天】网络爬虫是自动化程序,用于从互联网抓取数据。随着互联网的发展,构建高效、安全的爬虫成为热点。Rust语言凭借内存安全和高性能特点,在此领域展现出巨大潜力。本文探讨Rust如何通过所有权、借用及生命周期机制保障内存安全;利用`async/await`模型和`tokio`运行时处理并发请求;借助WebAssembly技术处理动态内容;并使用`reqwest`和`js-sys`库解析CSS和JavaScript,确保代码的安全性和可维护性。未来,Rust将在网络爬虫领域扮演更重要角色。
89 1
|
4月前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
76 0
|
4月前
|
安全 开发者 数据安全/隐私保护
Xamarin 的安全性考虑与最佳实践:从数据加密到网络防护,全面解析构建安全移动应用的六大核心技术要点与实战代码示例
【8月更文挑战第31天】Xamarin 的安全性考虑与最佳实践对于构建安全可靠的跨平台移动应用至关重要。本文探讨了 Xamarin 开发中的关键安全因素,如数据加密、网络通信安全、权限管理等,并提供了 AES 加密算法的代码示例。
70 0
|
4月前
|
开发框架 Android开发 iOS开发
跨平台开发的双重奏:Xamarin在不同规模项目中的实战表现与成功故事解析
【8月更文挑战第31天】在移动应用开发领域,选择合适的开发框架至关重要。Xamarin作为一款基于.NET的跨平台解决方案,凭借其独特的代码共享和快速迭代能力,赢得了广泛青睐。本文通过两个案例对比展示Xamarin的优势:一是初创公司利用Xamarin.Forms快速开发出适用于Android和iOS的应用;二是大型企业借助Xamarin实现高性能的原生应用体验及稳定的后端支持。无论是资源有限的小型企业还是需求复杂的大公司,Xamarin均能提供高效灵活的解决方案,彰显其在跨平台开发领域的强大实力。
53 0
|
4月前
|
Rust 安全 数据处理
【揭秘异步编程】Rust带你走进并发设计的神秘世界——高效、安全的并发原来是这样实现的!
【8月更文挑战第31天】《异步编程的艺术:使用Rust进行并发设计》一文探讨了如何利用Rust的`async`/`await`机制实现高效并发。Rust凭借内存安全和高性能优势,成为构建现代系统的理想选择。文章通过具体代码示例介绍了异步函数基础、并发任务执行及异步I/O操作,展示了Rust在提升程序吞吐量和可维护性方面的强大能力。通过学习这些技术,开发者可以更好地利用Rust的并发特性,构建高性能、低延迟的应用程序。
51 0
|
7月前
|
Rust 并行计算 安全
Rust中的并行与并发优化:释放多核性能
Rust语言以其内存安全和高效的并发模型在并行计算领域脱颖而出。本文深入探讨了Rust中的并行与并发优化技术,包括使用多线程、异步编程、以及并行算法等。通过理解并应用这些技术,Rust开发者可以有效地利用多核处理器,提高程序的性能和响应能力。
|
Rust JavaScript 前端开发
Rust学习笔记之并发(一)
Rust学习笔记之并发(一)
|
缓存 Rust 前端开发
GIAC-2022sh 学习笔记 | QEngine: 支付宝实时行情服务 Rust 跨端实践
GIAC-2022sh 学习笔记 | QEngine: 支付宝实时行情服务 Rust 跨端实践
218 0
GIAC-2022sh 学习笔记 | QEngine: 支付宝实时行情服务 Rust 跨端实践