You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

22 KiB

并发

Rust 的主要目标之一是处理并发编程,保证程序的内存安全和高效运行。通过利用 Rust 的所有权和类型检查系统,使许多并发错误可以在编译时检测到,从而避免了运行时错误。Rust 的 无畏并发fearless concurrency) 使得编写代码变得更加容易,而且易于重构,不会引入新的错误。

使用线程同时运行代码

Rust 中的线程(threads)是用于同时运行程序中独立部分的功能。在多线程上下文中编程需要仔细思考,因为线程可以同时运行,所以不同线程上代码的运行顺序无法预知,可能会导致问题,例如竞争状态(线程按照不一致的顺序访问数据或资源)、死锁(两个线程彼此等待,阻止两个线程继续执行)或 bug。通过使用 Rust 的线程实现模型,可以减轻使用线程的负面影响。Rust 的标准库使用了一种 1:1 的线程实现模型,即程序使用一个操作系统线程来支持一个语言线程。此外,还有其他线程模型的 crates,这些线程模型做出了与 1:1 模型不同的权衡。

绿色线程与OS线程

绿色线程是由语言运行时或编程语言本身实现的线程,而系统线程则是由操作系统内核提供的线程。在使用绿色线程时,通常需要将它们映射到系统线程上执行。具体来说,这些绿色线程通常在运行时的线程池中运行,线程池由少量的系统线程负责管理,这些系统线程负责调度和执行绿色线程

绿色线程模式被称为 M:N 模型,其中 M 个绿色线程对应 N 个操作系统线程,这里 M 和 N 不必相同。

使用 spawn 创建一个线程

创建一个新线程,我们需要调用 thread::spawn 函数并传递一个闭包,包含我们想要在新线程中运行的代码。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    for i in 0..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

当 Rust 程序的主线程完成时,无论它们是否已完成运行,所有生成的线程都会被关闭,故线程并没有打印到9。输出结果类似下图:

image-20230511170548126

扩大主函数循环次数后,运行图:

image-20230511172627602

thread::sleep函数是Rust标准库中的一个函数,用于使当前线程休眠(暂停执行)一段时间,以便让其他线程有机会运行。它接受一个时间参数,以指定休眠的时间长度,可以使用Duration类型或者std::time::Duration模块的方法来指定休眠时间。在多线程编程中,使用thread::sleep函数可以帮助我们控制线程执行的顺序和时间片分配,从而更好地利用系统资源。但需要注意的是,使用thread::sleep并不能保证线程的执行顺序,这取决于操作系统的调度算法和硬件资源的可用情况。thread::sleep函数是阻塞的,不应该在async函数(异步函数)中使用。

join 等待所有线程完成

为防止主线程结束时,子线程未运行完成问题。使用join方法,等待线程完成。

image-20230512093253398

添加join方法后,主函数会在for运行后,等待子线程运行完毕。

移动join代码位置(join调用放在主线程的for循环之前):

image-20230512095104846

主线程将等待子线程完成,然后运行其for循环,因此输出将不再交错。

在线程中使用 move 闭包

move 关键字,使闭包获得它使用的值的所有权,可通过 move 将值的所有权从一个线程转移到另一个线程。

当子线程中使用主线程数据,但未使用 move 关键字:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(|| {
       println!("Here's a vector: {:?}", v);
    });
    handle.join().unwrap();
}

运行图:

image-20230512103423445

错误信息:

error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src\main.rs:5:32
  |
5 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
6 |        println!("Here's a vector: {:?}", v);
  |                                          - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src\main.rs:5:18
  |
5 |       let handle = thread::spawn(|| {
  |  __________________^
6 | |        println!("Here's a vector: {:?}", v);
7 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
5 |     let handle = thread::spawn(move || {
  |                                ++++

For more information about this error, try `rustc --explain E0373`.
error: could not compile `move_thread` due to previous error

因为 Rust 不知道这个新建线程会执行多久,所以无法知晓 v 的引用是否一直有效。例如以下情况:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

根据错误提示修改代码:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
5 |     let handle = thread::spawn(move || {

使用 move 将所有权转移至闭包中:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
       println!("Here's a vector: {:?}", v);
    });
    // drop(v);  // error: value used here after move
    handle.join().unwrap();
}

运行图:

image-20230512110106611

消息传递(message passing):在线程中传递信息

通道是一种保证并发安全的消息传递机制,其核心思想是通过消息传递来共享内存,而不是通过共享内存来进行通信。可以把通道看作一个水道,其中有发送器接收器两个部分。发送器可以向通道中发送数据,而接收器可以从通道中接收数据。通道的一个重要特点是可以保证线程之间的同步,避免了数据竞争的问题。通道不仅可以用于线程间通信,还可以用于不同进程之间的通信。同时,通道的使用也可以避免一些并发编程中常见的陷阱和错误。

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
    // 不可编译,因未做任何事,不知 通道 发送类型
    // error: cannot infer type of the type parameter `T` declared on the function `channel`
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) 创建一个新的异步通道,并返回发送端(tx)和接收端(rx)。通道的实现方式允许一个通道有多个发送端,但只能有一个接收端。当至少有一个发送端存在(包括克隆的发送端),recv操作将阻塞,直到有可用的消息。try_recv不会阻塞,会立即返回Result<T,E>:有消息则返回一个Ok值,没有消息则返回一个Err值。

如果在使用发送端进行发送时接收端已经断开连接,send方法将返回一个SendError。类似地,如果在接收消息时发送端已经断开连接,recv方法将返回一个RecvError

use std::sync::mpsc;
use std::thread;
use std::sync::mpsc::RecvError;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap()
    });
    let receive = rx.recv().unwrap();
    println!("Got: {}", receive);
    assert_eq!(Err(RecvError), rx.recv());
}

通道与所有权转移

并发编程中因所有权的存在会避免很多错误的产生。以下为错误案例(原因为:转移变量所有权后仍使用变量):

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);    // 不可继续使用val 
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

错误信息:

error[E0382]: borrow of moved value: `val`
  --> src\main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

发送多个值

为证明并发性,使线程发送多个消息,并在每个消息之间暂停一秒钟。以观察到两个线程在通道上进行交互的情况。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

image-20230608114743104

主线程中,不再显式调用 recv 函数,将 rx 视为一个迭代器来读取值。因为for循环中没有任何暂停或延迟的代码,可知主线程正等待从线程接收值。

通过克隆创建多个发送端

mpsc是多生产者、单消费者的缩写。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
            String::from("thread2"),
            String::from("thread3"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];
        for val in vals{
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

调用clone函数来克隆发送端。获得一个新的发送端,将其传递给第一个线程。同时将原始的发送端传递给第二个生成的线程。这样,我们就创建了两个线程,每个线程都向同一个接收者发送不同的消息。

因为并发性,输出的顺序可能会有所不同,这取决于系统。当在不同的线程中使用不同的thread::sleep值进行时,每次运行的结果将更加不确定。

image-20230608172654544

共享状态并发性

编程语言中的通道(channels)类似于单一所有权(single ownership),因为一旦将值传输到通道中,就无法再使用该值。而共享内存并发性则类似于多重所有权(multiple ownership):多个线程可以同时访问相同的内存位置。

使用互斥锁实现逐个线程访问数据

互斥锁(Mutex)是互斥(mutual exclusion)的缩写,它在任何给定时间只允许一个线程访问某些数据。要访问互斥锁中的数据,线程必须首先发出请求以获取互斥锁的锁。锁是互斥锁的一部分,它是一个数据结构,用于跟踪当前谁拥有对数据的独占访问权限。因此,互斥锁通过锁定系统,保护其持有的数据。

互斥锁的两个规则:

  1. 在使用数据之前,必须尝试获取锁。

  2. 在使用完互斥锁保护的数据后,必须解锁数据,以便其他线程可以获取锁。

Mutex API

单线程环境中使用互斥锁:

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);
    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }
    println!("m = {:?}", m);
}

要访问互斥锁内部的数据,需使用lock方法获取锁。这个调用会阻塞当前线程,直到获得锁为止。如果在调用lock时,有另一个线程持有了锁并发生了panic,那么当前线程无法获取到锁。为了处理这种情况,代码中选择使用unwrap函数来解包(unwrap)LockResult类型,如果发生了上述情况,当前线程会发生恐慌。

因为m的类型是Mutex<i32>,而不是直接的i32类型,因此我们必须调用lock方法来获取对i32值的访问权限。这是为了确保在使用m中的值之前先获取到了锁。如果我们忘记调用lock,类型系统将不允许我们访问内部的i32值。Mutex<T>实际上是一个智能指针。当我们调用lock方法时,它返回一个叫做MutexGuard的智能指针,它包装了被互斥锁保护的数据。MutexGuard实现了Deref trait,使得我们可以通过它访问内部数据。此外,MutexGuard还实现了Drop trait,在MutexGuard超出作用域时自动释放锁,这发生在内部作用域的末尾。这样一来,我们就不会遗忘手动释放锁,从而阻止其他线程使用互斥锁,因为锁的释放是自动进行的。

image-20230628093116722

Mutex<T>在多个线程之间共享一个值

创建10个线程,让它们每个线程将计数器的值增加1,使计数器从0增加到10。(代码无法通过编译)

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

错误信息:

error[E0382]: use of moved value: `counter`
  --> src\main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

For more information about this error, try `rustc --explain E0382`.

因线程是同时运行的,并且它们在获取锁和修改计数器值的时候没有进行同步。这就导致了数据竞争,使得编译器无法保证计数器的值在运行时是正确的。

要解决这个问题,需要在多线程中正确地使用互斥锁。可以通过引入一个Arc(原子引用计数)来解决问题。Arc允许多个线程共享同一个计数器,而不会造成数据竞争。然后,我们在每个线程中使用Arc的克隆来处理计数器。

image-20230728151937984

多线程和多重所有权

通过使用智能指针Rc<T>创建引用计数值来使一个值拥有多个所有者。使用Rc<T>Mutex<T>进行包装,并在将所有权移交给线程之前克隆Rc<T>

use std::sync::Mutex;
use std::rc::Rc;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}

错误信息:

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
   --> src\main.rs:10:36
    |
10  |           let handle = thread::spawn(move || {
    |                        ------------- ^------
    |                        |             |
    |  ______________________|_____________within this `[closure@src\main.rs:10:36: 10:43]`
    | |                      |
    | |                      required by a bound introduced by this call
11  | |             let mut num = counter.lock().unwrap();
12  | |             *num += 1;
13  | |         });
    | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
    |
    = help: within `[closure@src\main.rs:10:36: 10:43]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`

关键信息:Rc<Mutex<i32>> cannot be sent between threads safelyRc<Mutex<i32>>不能安全地在线程之间发送。Rc<Mutex<i32>>没有实现Send trait。

原子引用计数 Arc

Arc<T>是一种类似于Rc<T>的类型,可以在并发情况下安全使用。其中的Arc代表原子引用计数(Atomic Reference Counting)。有关原子性可查看std::sync::atomic文档。原子引用计数类型的工作方式类似于普通类型,但它可以安全地在线程之间共享。线程安全性会带来性能损失。

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}


// Result: 10

RefCell/RcMutex/Arc 的相似性

当我们使用 Mutex<T> 时,我们可以通过获取锁来获得对被保护数据的可变引用,即使 Mutex<T> 本身是不可变的。这就是 Mutex<T> 提供的 内部可变性 特性,类似于 Cell 系列一样。正如第十五章中使用 RefCell 可以改变 Rc 中的内容那样,同样的可以使用 Mutex 来改变 Arc 中的内容。

需要注意的是 Rust 并不能避免所有可能的逻辑错误。就像在第15章使用 Rc<T> 时可能出现引用循环(两个 Rc<T> 值相互引用导致内存泄漏)一样,使用 Mutex<T> 也可能导致死锁。死锁发生在两个或多个线程互相等待对方所拥有的锁,导致它们无法继续执行。这是程序设计中常见的并发问题之一。

可扩展的并发:SyncSend trait

Rust 语言本身的并发功能非常少。到目前为止,在本章中讨论的几乎所有并发特性都是标准库中的,而不是语言本身的一部分。然而,两个并发概念内置在语言中:std::marker 中的 SyncSend 标记trait。

标记 trait(Marker Trait)是一种特殊的 trait,它不包含方法定义,主要用于为类型附加特定的意义或属性,通过静态检查或约束保证类型的安全和正确性。而普通 trait 则定义了具体的行为,通过实现具体的方法来描述类型的行为和功能。

通过 Send 允许线程之间转移所有权

Send 标记trait 表明类型的所有权可以在线程间传递。几乎所有的 Rust 类型都是 Send 的。任何完全由 Send 类型组成的类型也会自动被标记为 Send。几乎所有的基本类型都是 Send,除了裸指针。

Rc<T> 不是 Send 类型,因为Rc<T> 被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。在多线程下转移 Rc<T> 所有权会得到:the trait Send is not implemented for Rc<Mutex<i32>>Arc<T>Send 的,故可以在并发情况下安全使用。

Sync 允许多线程访问

Sync 标记trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换句话说,任意类型 T 如果 &TT 的不可变引用)是 Send 的,那么 T 就是 Sync 的,这意味着该引用可以安全地发送到另一个线程。与 Send 类似,原始类型是 Sync 的,完全由 Sync 类型组成的类型也是 Sync

智能指针 Rc<T>RefCell<T>Cell<T> 不满足 Sync 特性的要求,因为它们的内部状态更新不是线程安全的。而 Mutex<T>Sync 的,可以在多个线程中安全地共享访问。在多线程编程中,Sync 特性确保了类型可以在多个线程之间安全共享访问,避免了数据竞争和并发问题。

手动实现 Send 和 Sync 是不安全的

SendSync 特性组成的类型也是 SendSync的,故不必手动实现这些特性。因为他们是标记 trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的。创建新的不由 SendSync 组成的新的并发类型需要多加小心,以确保维持其安全保证。