22 KiB
并发
Rust 的主要目标之一是处理并发编程,保证程序的内存安全和高效运行。通过利用 Rust 的所有权和类型检查系统,使许多并发错误可以在编译时检测到,从而避免了运行时错误。Rust 的 无畏并发(fearless concurrency
) 使得编写代码变得更加容易,而且易于重构,不会引入新的错误。
使用线程同时运行代码
Rust 中的线程(threads)是用于同时运行程序中独立部分的功能。在多线程上下文中编程需要仔细思考,因为线程可以同时运行,所以不同线程上代码的运行顺序无法预知,可能会导致问题,例如竞争状态(线程按照不一致的顺序访问数据或资源)、死锁(两个线程彼此等待,阻止两个线程继续执行)或 bug。通过使用 Rust 的线程实现模型,可以减轻使用线程的负面影响。Rust 的标准库使用了一种 1:1 的线程实现模型,即程序使用一个操作系统线程来支持一个语言线程。此外,还有其他线程模型的 crates,这些线程模型做出了与 1:1 模型不同的权衡。
绿色线程与OS线程
绿色线程
是由语言运行时或编程语言本身实现的线程,而系统线程
则是由操作系统内核提供的线程。在使用绿色线程
时,通常需要将它们映射到系统线程
上执行。具体来说,这些绿色线程通常在运行时的线程池中运行,线程池由少量的系统线程
负责管理,这些系统线程
负责调度和执行绿色线程
。
绿色线程
模式被称为 M:N 模型,其中 M 个绿色线程
对应 N 个操作系统
线程,这里 M 和 N 不必相同。
使用 spawn 创建一个线程
创建一个新线程,我们需要调用 thread::spawn
函数并传递一个闭包,包含我们想要在新线程中运行的代码。
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 0..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
当 Rust 程序的主线程完成时,无论它们是否已完成运行,所有生成的线程都会被关闭,故线程并没有打印到9。输出结果类似下图:
扩大主函数循环次数后,运行图:
thread::sleep
函数是Rust标准库中的一个函数,用于使当前线程休眠(暂停执行)一段时间,以便让其他线程有机会运行。它接受一个时间参数,以指定休眠的时间长度,可以使用Duration
类型或者std::time::Duration
模块的方法来指定休眠时间。在多线程编程中,使用thread::sleep
函数可以帮助我们控制线程执行的顺序和时间片分配,从而更好地利用系统资源。但需要注意的是,使用thread::sleep
并不能保证线程的执行顺序,这取决于操作系统的调度算法和硬件资源的可用情况。thread::sleep
函数是阻塞的,不应该在async函数(异步函数)中使用。
join 等待所有线程完成
为防止主线程结束时,子线程未运行完成问题。使用join
方法,等待线程完成。
添加join方法后,主函数会在for运行后,等待子线程运行完毕。
移动join代码位置(join调用放在主线程的for循环之前):
主线程将等待子线程完成,然后运行其for循环,因此输出将不再交错。
在线程中使用 move 闭包
move
关键字,使闭包获得它使用的值的所有权,可通过 move
将值的所有权从一个线程转移到另一个线程。
当子线程中使用主线程数据,但未使用 move
关键字:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
运行图:
错误信息:
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src\main.rs:5:32
|
5 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
6 | println!("Here's a vector: {:?}", v);
| - `v` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src\main.rs:5:18
|
5 | let handle = thread::spawn(|| {
| __________________^
6 | | println!("Here's a vector: {:?}", v);
7 | | });
| |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
|
5 | let handle = thread::spawn(move || {
| ++++
For more information about this error, try `rustc --explain E0373`.
error: could not compile `move_thread` due to previous error
因为 Rust 不知道这个新建线程会执行多久,所以无法知晓 v
的引用是否一直有效。例如以下情况:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
drop(v); // oh no!
handle.join().unwrap();
}
根据错误提示修改代码:
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
|
5 | let handle = thread::spawn(move || {
使用 move
将所有权转移至闭包中:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
// drop(v); // error: value used here after move
handle.join().unwrap();
}
运行图:
消息传递(message passing):在线程中传递信息
通道
是一种保证并发安全的消息传递机制,其核心思想是通过消息传递
来共享内存,而不是通过共享内存来进行通信。可以把通道
看作一个水道,其中有发送器
和接收器
两个部分。发送器可以向通道中发送数据,而接收器可以从通道中接收数据。通道的一个重要特点是可以保证线程之间的同步
,避免了数据竞争的问题。通道不仅可以用于线程
间通信,还可以用于不同进程
之间的通信。同时,通道的使用也可以避免一些并发编程中常见的陷阱和错误。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// 不可编译,因未做任何事,不知 通道 发送类型
// error: cannot infer type of the type parameter `T` declared on the function `channel`
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
创建一个新的异步通道,并返回发送端(tx
)和接收端(rx
)。通道的实现方式允许一个通道有多个发送端,但只能有一个接收端
。当至少有一个发送端存在(包括克隆的发送端),recv
操作将阻塞,直到有可用的消息。try_recv
不会阻塞,会立即返回Result<T,E>
:有消息则返回一个Ok
值,没有消息则返回一个Err
值。
如果在使用发送端进行发送时接收端已经断开连接,send
方法将返回一个SendError
。类似地,如果在接收消息时发送端已经断开连接,recv
方法将返回一个RecvError
。
use std::sync::mpsc;
use std::thread;
use std::sync::mpsc::RecvError;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap()
});
let receive = rx.recv().unwrap();
println!("Got: {}", receive);
assert_eq!(Err(RecvError), rx.recv());
}
通道与所有权转移
并发编程中因所有权的存在会避免很多错误的产生。以下为错误案例(原因为:转移变量所有权后仍使用变量):
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val); // 不可继续使用val
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
错误信息:
error[E0382]: borrow of moved value: `val`
--> src\main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
发送多个值
为证明并发性,使线程发送多个消息,并在每个消息之间暂停一秒钟。以观察到两个线程在通道上进行交互的情况。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
主线程中,不再显式调用 recv
函数,将 rx 视为一个迭代器来读取值。因为for
循环中没有任何暂停或延迟的代码,可知主线程正等待从线程接收值。
通过克隆创建多个发送端
mpsc是多生产者、单消费者的缩写。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
String::from("thread2"),
String::from("thread3"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals{
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
调用clone
函数来克隆发送端。获得一个新的发送端,将其传递给第一个线程。同时将原始的发送端传递给第二个生成的线程。这样,我们就创建了两个线程,每个线程都向同一个接收者发送不同的消息。
因为并发性,输出的顺序可能会有所不同,这取决于系统。当在不同的线程中使用不同的thread::sleep
值进行时,每次运行的结果将更加不确定。
共享状态并发性
编程语言中的通道(channels)类似于单一所有权(single ownership),因为一旦将值传输到通道中,就无法再使用该值。而共享内存并发性则类似于多重所有权(multiple ownership):多个线程可以同时访问相同的内存位置。
使用互斥锁实现逐个线程访问数据
互斥锁(Mutex)是互斥(mutual exclusion)的缩写,它在任何给定时间只允许一个线程访问某些数据。要访问互斥锁中的数据,线程必须首先发出请求以获取互斥锁的锁。锁是互斥锁的一部分,它是一个数据结构,用于跟踪当前谁拥有对数据的独占访问权限。因此,互斥锁通过锁定系统,保护其持有的数据。
互斥锁的两个规则:
-
在使用数据之前,必须尝试获取锁。
-
在使用完互斥锁保护的数据后,必须解锁数据,以便其他线程可以获取锁。
Mutex API
单线程环境中使用互斥锁:
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
要访问互斥锁内部的数据,需使用lock
方法获取锁。这个调用会阻塞当前线程,直到获得锁为止。如果在调用lock
时,有另一个线程持有了锁并发生了panic,那么当前线程无法获取到锁。为了处理这种情况,代码中选择使用unwrap
函数来解包(unwrap)LockResult
类型,如果发生了上述情况,当前线程会发生恐慌。
因为m
的类型是Mutex<i32>
,而不是直接的i32
类型,因此我们必须调用lock
方法来获取对i32
值的访问权限。这是为了确保在使用m
中的值之前先获取到了锁。如果我们忘记调用lock
,类型系统将不允许我们访问内部的i32
值。Mutex<T>
实际上是一个智能指针。当我们调用lock
方法时,它返回一个叫做MutexGuard
的智能指针,它包装了被互斥锁保护的数据。MutexGuard
实现了Deref
trait,使得我们可以通过它访问内部数据。此外,MutexGuard
还实现了Drop
trait,在MutexGuard
超出作用域时自动释放锁,这发生在内部作用域的末尾。这样一来,我们就不会遗忘手动释放锁,从而阻止其他线程使用互斥锁,因为锁的释放是自动进行的。
Mutex<T>
在多个线程之间共享一个值
创建10个线程,让它们每个线程将计数器的值增加1,使计数器从0增加到10。(代码无法通过编译)
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
错误信息:
error[E0382]: use of moved value: `counter`
--> src\main.rs:9:36
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
10 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
For more information about this error, try `rustc --explain E0382`.
因线程是同时运行的,并且它们在获取锁和修改计数器值的时候没有进行同步。这就导致了数据竞争,使得编译器无法保证计数器的值在运行时是正确的。
要解决这个问题,需要在多线程中正确地使用互斥锁。可以通过引入一个Arc
(原子引用计数)来解决问题。Arc
允许多个线程共享同一个计数器,而不会造成数据竞争。然后,我们在每个线程中使用Arc
的克隆来处理计数器。
多线程和多重所有权
通过使用智能指针Rc<T>
创建引用计数值来使一个值拥有多个所有者。使用Rc<T>
将Mutex<T>
进行包装,并在将所有权移交给线程之前克隆Rc<T>
use std::sync::Mutex;
use std::rc::Rc;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
错误信息:
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src\main.rs:10:36
|
10 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `[closure@src\main.rs:10:36: 10:43]`
| | |
| | required by a bound introduced by this call
11 | | let mut num = counter.lock().unwrap();
12 | | *num += 1;
13 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `[closure@src\main.rs:10:36: 10:43]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
关键信息:Rc<Mutex<i32>>
cannot be sent between threads safely 即Rc<Mutex<i32>>
不能安全地在线程之间发送。Rc<Mutex<i32>>
没有实现Send
trait。
原子引用计数 Arc
Arc<T>
是一种类似于Rc<T>
的类型,可以在并发情况下安全使用。其中的Arc
代表原子引用计数(Atomic Reference Counting)
。有关原子性可查看std::sync::atomic
文档。原子引用计数类型的工作方式类似于普通类型,但它可以安全地在线程之间共享。线程安全性会带来性能损失。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
// Result: 10
RefCell
/Rc
与 Mutex
/Arc
的相似性
当我们使用 Mutex<T>
时,我们可以通过获取锁来获得对被保护数据的可变引用,即使 Mutex<T>
本身是不可变的。这就是 Mutex<T>
提供的 内部可变性
特性,类似于 Cell
系列一样。正如第十五章中使用 RefCell
可以改变 Rc
中的内容那样,同样的可以使用 Mutex
来改变 Arc
中的内容。
需要注意的是 Rust 并不能避免所有可能的逻辑错误。就像在第15章使用 Rc<T>
时可能出现引用循环(两个 Rc<T>
值相互引用导致内存泄漏)一样,使用 Mutex<T>
也可能导致死锁。死锁发生在两个或多个线程互相等待对方所拥有的锁,导致它们无法继续执行。这是程序设计中常见的并发问题之一。
可扩展的并发:Sync
和 Send
trait
Rust 语言本身的并发功能非常少。到目前为止,在本章中讨论的几乎所有并发特性都是标准库中的,而不是语言本身的一部分。然而,两个并发概念内置在语言中:std::marker
中的 Sync
和 Send
标记trait。
标记 trait(Marker Trait)是一种特殊的 trait,它不包含方法定义,主要用于为类型附加特定的意义或属性,通过静态检查或约束保证类型的安全和正确性。而普通 trait 则定义了具体的行为,通过实现具体的方法来描述类型的行为和功能。
通过 Send
允许线程之间转移所有权
Send
标记trait 表明类型的所有权可以在线程间传递。几乎所有的 Rust 类型都是 Send
的。任何完全由 Send
类型组成的类型也会自动被标记为 Send
。几乎所有的基本类型都是 Send
,除了裸指针。
但 Rc<T>
不是 Send
类型,因为Rc<T>
被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。在多线程下转移 Rc<T>
所有权会得到:the trait Send is not implemented for Rc<Mutex<i32>>
。Arc<T>
是 Send
的,故可以在并发情况下安全使用。
Sync
允许多线程访问
Sync
标记trait 表明一个实现了 Sync
的类型可以安全的在多个线程中拥有其值的引用。换句话说,任意类型 T
如果 &T
(T
的不可变引用)是 Send
的,那么 T
就是 Sync
的,这意味着该引用可以安全地发送到另一个线程。与 Send
类似,原始类型是 Sync
的,完全由 Sync
类型组成的类型也是 Sync
。
智能指针 Rc<T>
、RefCell<T>
和 Cell<T>
不满足 Sync
特性的要求,因为它们的内部状态更新不是线程安全的。而 Mutex<T>
是 Sync
的,可以在多个线程中安全地共享访问。在多线程编程中,Sync
特性确保了类型可以在多个线程之间安全共享访问,避免了数据竞争和并发问题。
手动实现 Send 和 Sync 是不安全的
由 Send
和 Sync
特性组成的类型也是 Send
和 Sync
的,故不必手动实现这些特性。因为他们是标记 trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的。创建新的不由 Send
和 Sync
组成的新的并发类型需要多加小心,以确保维持其安全保证。