文本为《Rust Atomics and Locks》翻译文章,英文原文:https://marabos.nl/atomics/basics.html
仅供学习使用
早在多核处理器普及之前,操作系统就允许一台计算机同时运行多个程序。这是通过在进程之间快速切换来实现的,允许每个进程一个接一个地重复地取得一点点进展。如今,几乎我们所有的电脑,甚至我们的手机和手表都有多核处理器,这可以真正地并行执行多个进程。
操作系统尽可能地将进程彼此隔离,允许程序在完全不知道其他进程在做什么的情况下做自己的事情。例如,如果不事先询问操作系统内核,一个进程通常不能访问另一个进程的内存,或以任何方式与其通信。
然而,一个程序可以产生额外的执行线程,作为同一个进程的一部分。同一进程内的线程并不是相互隔离的。线程共享内存,并可以通过该内存相互交互。
本章将解释Rust中线程是如何生成的,以及围绕线程的所有基本概念,比如如何在多个线程之间安全共享数据。本章解释的概念是本书其余部分的基础。
如果你已经熟悉Rust的这些部分,可以跳过前面的内容。然而,在你继续下一章之前,确保你对线程、内部可变性、“发送”和“同步”有很好的理解,并且知道互斥量、条件变量和线程停放是什么。
Rust中的线程概念
每个程序都从一个线程开始:主线程。这个线程将执行你的“main”函数,如果需要的话,可以用来生成更多的线程。
在Rust中,使用标准库中的’ std::thread::spawn ‘函数来生成新线程。它只有一个参数:新线程将要执行的函数。一旦这个函数返回,线程就会停止。
让我们来看一个例子:
1 | use std::thread; |
我们生成两个线程,它们都将执行’ f ‘作为它们的主函数。这两个线程都将打印一条消息并显示它们的thread id,而主线程也将打印它自己的消息。
Thread ID
Rust标准库为每个线程分配一个唯一的标识符。这个标识符可以通过’ Thread::id() ‘访问,类型为’ ThreadId ‘。没有太多你可以做一个’ ThreadId ‘除了复制它周围,并检查是否相等。不能保证这些id是连续分配的,只能保证每个线程的id是不同的。
如果您多次运行上面的示例程序,您可能会注意到每次运行的输出是不同的。这是我在机器上运行时得到的输出:
1 | Hello from the main thread. |
令人惊讶的是,部分输出似乎缺失了。
这里发生的情况是,主线程在新生成的线程完成执行它们的函数之前完成了’ main ‘函数的执行。
从’ main ‘返回将退出整个程序,即使其他线程仍在运行。
在这个特殊的例子中,一个新生成的线程只有足够的时间来处理第二条消息的一半,然后程序就被主线程关闭了。
如果我们想要确保线程在我们从’ main ‘返回之前完成,我们可以通过“加入”它们来等待它们。为此,我们必须使用’ spawn ‘函数返回的’ JoinHandle ‘:
1 | fn main() { |
‘ .join() ‘方法会一直等待,直到线程完成执行,并返回一个’ std::thread::Result ‘。如果线程因为恐慌而未能成功完成其功能,则该文件将包含恐慌消息。我们可以尝试处理这种情况,或者在加入一个恐慌线程时调用’ .unwrap() ‘来引发恐慌。
运行这个版本的程序将不再导致截断的输出:
1 | Hello from the main thread. |
唯一在运行之间仍然会改变的是消息打印的顺序:
1 | Hello from the main thread. |
Output Locking(输出锁定)
println宏使用’ std::io::Stdout::lock() ‘来确保它的输出不会被中断。println!() 表达式将等待任何并发运行的表达式完成后再写入任何输出。如果不是这样的话,我们可以得到更多的交错输出,比如:
1 | Hello fromHello from another thread! |
与其像上面的例子一样将函数名传递给 std::thread::spawn
,更常见的做法是传递一个 闭包。这允许我们捕获值并将其移动到新线程中:
1 | let numbers = vec![1, 2, 3]; |
在这里,由于我们使用了 move
闭包,因此将 numbers
的所有权转移到了新生成的线程。如果我们没有使用 move
关键字,则该闭包会通过引用捕获 numbers
。这将导致编译器错误,因为新线程可能会超出该变量的生命周期。 由于线程可能一直运行到程序执行结束,所以 spawn
函数在其参数类型上有一个 static
生命周期限制。换句话说,它只接受可以永久保留的函数。通过引用捕获本地变量的闭包可能无法永久保留,因为该引用将在本地变量停止存在时失效。 从线程中获取返回值是通过从闭包中返回它来完成的。可以从 join
方法返回的结果中获取此返回值:
1 | let numbers = Vec::from_iter(0..=1000); |
这里,线程闭包返回的值(1)通过 join
方法(2)发送回主线程。
如果 numbers
为空,线程在尝试除以零时会出现 panic(1),join
将返回该 panic 消息,导致主线程也因为 unwrap
而 panic(2)。
Thread Builder
std::thread::spawn
函数实际上只是 std::thread::Builder::new().spawn().unwrap()
的简便写法。 std::thread::Builder
允许您在生成新线程之前设置一些设置。 您可以使用它来配置新线程的堆栈大小并为其命名。 线程名称可通过 std :: thread :: current() .name()
获得,在恐慌消息中将被使用,并且在大多数平台上的监视和调试工具中可见。 此外,Builder
的 spawn
函数返回一个 std :: io :: Result
, 允许您处理生成新线程失败的情况。 如果操作系统耗尽内存或者资源限制已应用于程序,则可能会发生这种情况。 如果无法生成新线程,则 std :: thread :: spawn
函数仅会引发 panic。
作用域Threads
如果我们确定一个生成的线程肯定不会超出某个范围,那么该线程可以安全地借用一些不会永久存在的东西,比如局部变量,只要它们在该范围内存活。 Rust标准库提供了std::thread::scope
函数来生成这样的作用域线程。它允许我们生成不能超出传递给该函数的闭包作用域的线程,从而使得安全地借用局部变量成为可能。 最好通过示例来说明其工作原理:
1 | let numbers = vec![1, 2, 3]; |
1 | 我们使用闭包调用 std::thread::scope 函数。我们的闭包直接执行并获得一个参数 s ,表示作用域。 |
---|---|
2 | 我们使用 s 来生成线程。闭包可以借用像 numbers 这样的局部变量。 |
3 | 当作用域结束时,所有尚未加入的线程将自动加入。 |
这种模式保证了在作用域中生成的线程都不会超出该作用域的生命周期。因此,这个有范围限制的 spawn
方法没有对其参数类型施加 static
约束,允许我们引用任何东西只要它在作用域内存在,比如 numbers
。 在上面的例子中,两个新线程都同时访问 numbers
。这是可以的,因为它们(以及主线程)都没有修改它。如果我们将第一个线程更改为修改 numbers
,就像下面所示那样,则编译器将不允许我们再次生成使用 numbers
的另一个线程:
1 | let mut numbers = vec![1, 2, 3]; |
确切的错误信息取决于 Rust 编译器的版本,因为它经常改进以产生更好的诊断结果,但尝试编译上述代码将导致类似于以下内容的结果:
1 | error[E0499]: cannot borrow `numbers` as mutable more than once at a time |
The Leakpocalypse
在 Rust 1.0 之前,标准库有一个名为 std::thread::scoped
的函数,它可以直接生成线程,就像 std::thread::spawn
一样。它允许非 'static
捕获,因为它返回的是一个 JoinGuard
而不是 JoinHandle
,在其被丢弃时会加入线程。任何借用的数据只需要比这个 JoinGuard
存活时间更长即可。这似乎很安全,只要确保在某个时刻丢弃了该 JoinGuard
.
就在 Rust 1.0 发布之前,在“泄漏启示录”中逐渐清楚地表明无法保证某些东西将被丢弃。有许多方法可以使人们忘记某些东西或者泄漏掉而没有释放它。
最终得出结论:(安全)接口设计不能依赖于对象总是在生命周期结束时被删除的假设。泄漏对象可能合理地导致更多对象泄漏(例如泄漏 Vec 将同时泄漏其元素),但可能不会导致未定义行为。由于这个结论,“std :: thread :: scoped” 不再被认为是安全的,并从标准库中移除了。“std :: mem :: forget”也从“unsafe”函数升级到了safe函数以强调遗忘(或泄漏)总是有可能的。
直到 Rust 1.63,才添加了一个新的 std::thread::scope
函数,其设计不依赖于 Drop
来保证正确性。
共享所有权和引用计数
到目前为止,我们已经学习了如何使用 move
闭包(”Rust 中的线程”)将值的所有权转移给线程,并从寿命更长的父线程中借用数据(”作用域线程”)。当在两个不保证互相存活的线程之间共享数据时,它们都不能成为该数据的所有者。任何在它们之间共享的数据都需要与最长寿命的线程一样长。
Statics(静态)
有几种方法可以创建一个不属于单个线程的东西。最简单的方法是使用“静态”值,它由整个程序而不是单个线程“拥有”。在以下示例中,两个线程都可以访问X
,但没有一个线程拥有它:
1 | static X: [i32; 3] = [1, 2, 3]; |
static
项目具有恒定的初始化程序,永远不会被丢弃,并且在程序的主函数甚至开始之前就已经存在。每个线程都可以借用它,因为它保证始终存在。
Leaking(泄露)
另一种共享所有权的方式是通过泄露分配。使用Box::leak
,可以释放对Box
的所有权,并承诺永远不会丢弃它。从那时起,该 Box
将永远存在,没有所有者,允许任何线程在程序运行期间借用它。
1 | let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3])); |
move
闭包可能会让我们觉得我们正在将所有权移动到线程中,但仔细查看x
的类型会发现,我们只是给线程一个对数据的引用。
引用是Copy
的,这意味着当您“移动”它们时,原始副本仍然存在,就像整数或布尔值一样。
请注意,static
生命周期并不意味着该值从程序开始就一直存在,而只是表示它在程序结束之前一直存在。过去根本不重要。
泄漏Box
的缺点是我们正在泄漏内存。 我们分配了某些东西,但从未删除和释放它。 如果这种情况仅发生有限次数,则可以接受。 但如果我们继续这样做,则程序将慢慢耗尽内存。
Reference Counting(引用计数)
为了确保共享数据被丢弃和释放,我们不能完全放弃其所有权。相反,我们可以共享所有权。通过跟踪拥有者的数量,我们可以确保只有在没有剩余拥有者时才会删除该值。
Rust标准库通过std::rc::Rc
类型提供此功能,简称“引用计数”。它与Box
非常相似,但是克隆它不会分配任何新内容,而是增加存储在所包含值旁边的计数器。原始的和克隆的 Rc
都将指向同一内存分配; 它们共享所有权。
1 | use std::rc::Rc; |
放弃一个 Rc
将会减少计数器。只有最后的 Rc
,它将看到计数器降至零,才是放弃和释放包含数据的那个。
然而,如果我们尝试将一个 Rc
发送到另一个线程中,则会遇到以下编译器错误:
1 | error[E0277]: `Rc` cannot be sent between threads safely |
事实证明,Rc
不是线程安全的(更多信息请参见“Thread Safety: Send and Sync”)。如果多个线程都有一个指向相同分配的 Rc
,它们可能会尝试同时修改引用计数器,这可能会导致不可预测的结果。 相反,我们可以使用 std::sync::Arc
,它代表”原子引用计数”。它与 Rc
相同,只是保证对引用计数进行的修改是不可分割的原子操作,使其能够安全地与多个线程一起使用(更多信息请参见第2章 )。
1 | use std::sync::Arc; |
1 | 我们将一个数组和一个引用计数器放在新的分配中,该计数器从一开始。 |
---|---|
2 | 克隆 Arc 将引用计数增加到两个,并为我们提供了第二个指向同一分配的 Arc 。 |
3 | 两个线程都通过自己的 Arc 访问共享数组。当它们丢弃其 Arc 时,都会减少引用计数器。最后一个丢弃其 Arc 的线程将看到计数器降至零,并将是释放和回收数组的线程。 |
克隆命名
不得不为每个 Arc
的克隆体分配一个不同的名称,这可能会使代码变得混乱且难以跟踪。虽然每个 Arc
的克隆体都是单独的对象,但每个克隆体代表着相同的共享值,这并不能通过给它们命名来很好地反映出来。
Rust 允许(并鼓励)您通过定义具有相同名称的新变量来 遮盖 变量。如果在同一作用域中执行此操作,则原始变量将无法再被命名。但是通过打开一个新作用域,在该作用域内可以使用类似于 let a = a.clone();
这样的语句重复使用相同的名称,并在作用域外保留原始变量。
通过将闭包包装在新范围内(使用 {}
),我们可以在将其移动到闭包中之前对变量进行克隆,而无需重新命名它们。
| let a = Arc::new([1, 2, 3]); let b = a.clone(); thread::spawn(move || { dbg!(b); }); dbg!(a);
The clone of the Arc
lives in the same scope. Each thread gets its own clone with a different name. | let a = Arc::new([1, 2, 3]); thread::spawn({ let a = a.clone(); move || { dbg!(a); } }); dbg!(a);
The clone of the Arc
lives in a different scope. We can use the same name in each thread. |
| ———————————————————— | ———————————————————— |
| | |
因为所有权是共享的,引用计数指针(Rc<T>
和 Arc<T>
)与共享引用(&T
)具有相同的限制。它们不会给您对其包含值的可变访问权限,因为该值可能同时被其他代码借用。 例如,如果我们尝试对 Arc<[i32]>
中的整数切片进行排序,则编译器将阻止我们这样做,并告诉我们不允许修改数据:
1 | error[E0596]: cannot borrow data in an `Arc` as mutable |
Borrowing and Data Races(借用和数据竞争)
在 Rust 中,值可以通过两种方式进行借用:
不可变借用 使用
&
借用某个东西会得到一个 不可变引用。这样的引用是可以复制的。对其所指向数据的访问在所有此类引用副本之间共享。正如名称所示,编译器通常不允许您通过这样的引用来 改变 某些东西,因为那可能会影响当前正在借用相同数据的其他代码。可变借用 使用
&mut
借出某个东西会得到一个 可变引用。可变借贷保证它是该数据唯一活动租赁者。这确保了修改数据不会更改其他代码正在查看的任何内容。这两个概念结合起来完全防止了 数据竞争:其中一个线程在修改数据而另一个线程同时访问它的情况。 数据竞争通常是 未定义行为,这意味着编译器无需考虑这些情况,并且只需假定它们不会发生即可。
为了澄清其含义,让我们看一个例子,在此例中编译器可以使用借贷规则做出有益假设:
1 | fn f(a: &i32, b: &mut i32) { |
在这里,我们获得了一个不可变的整数引用,并存储了 b
所引用的整数在增加之前和之后的值。编译器可以自由地假设借用和数据竞争方面的基本规则得到遵守,这意味着 b
不可能指向与 a
相同的整数。实际上,在 a
借用该整数期间,程序中没有任何东西可以对其进行可变借用。因此,编译器可以轻松地推断出 *a
不会改变,并且 if 语句条件永远不会为真,并完全将调用 x 的代码作为优化从程序中删除。 除非使用 unsafe 块禁止一些编译器安全检查,否则无法编写破坏 Rust 编译器假设的程序。
Undefined Behavior(未定义行为)
像C、C++和Rust这样的语言有一组需要遵循的规则,以避免出现所谓的未定义行为。例如,Rust的一个规则是任何对象都不能有多个可变引用。
在Rust中,只有使用unsafe
代码时才可能违反这些规则。 “不安全”并不意味着代码是错误或永远不安全使用,而是编译器没有验证代码是否安全。如果代码确实违反了这些规则,则称其为不完整。
编译器可以假设这些规则从未被破坏,而无需检查。当破坏时,会导致所谓的未定义行为,我们必须尽一切努力避免它。如果我们允许编译器做出实际上并非如此的假设,则很容易对您代码中其他部分产生更多错误结论,并影响整个程序。
作为具体示例,请看下面一个小片段,在其中使用了slice上的get_unchecked
方法:
1 | let a = [123, 456, 789]; |
get_unchecked
方法可以通过索引获取切片中的元素,就像 a[index]
一样,但是允许编译器假设索引始终在边界内,而不进行任何检查。 这意味着在此代码段中,因为a
的长度为3,编译器可以假设index
小于三。我们需要确保它的假设成立。 如果我们打破了这个假设,例如将index
设置为3,则可能会发生任何事情。它可能导致从存储在a
右侧字节中的内存读取任何内容。它可能导致程序崩溃。它可能最终执行某些完全无关的程序部分。它会造成各种混乱。 也许令人惊讶的是,在未定义行为甚至可以 “穿越时间”,影响到先前代码中出现问题。要理解如何发生这种情况,请想象我们之前有一个 match
语句:
1 | match index { |
由于不安全的代码,编译器可以假设 index
只会是 0、1 或 2。它可能会逻辑上得出结论,我们 match
语句的最后一个分支只能匹配到数字 2,并且因此 z
函数只被调用为 z(2)
。这个结论不仅可以优化 match
,还可以优化 z
函数本身。这包括抛弃未使用的代码部分。
如果我们将其执行时传入了一个值为3 的index
,则程序可能尝试执行已经被优化掉的部分,导致完全无法预测的行为,在我们到达最后一行的 unsafe
块之前就发生了。就像那样,未定义行为可能通过整个程序向前和向后传播,并以通常非常意外的方式表现出来。
在调用任何一个带有 unsafe
标记函数时,请仔细阅读其文档并确保您充分理解其安全要求:作为调用者需要遵守哪些假设才能避免未定义行为。
Interior Mutability(内部可变性)
在前面的章节中介绍了借用规则,这些规则很简单,但是在涉及多个线程时可能会非常限制——特别是当没有数据可以被多个线程访问和修改时。遵循这些规则使得线程之间的通信极其有限且几乎不可能。
幸运的是,有一个逃生口:内部可变性。具有内部可变性的数据类型略微弯曲了借用规则。在某些条件下,这些类型可以允许通过“不可变”引用进行突变。
在[“引用计数”](#Reference Counting(引用计数))一节中,我们已经看到了一个涉及内部可变性的微妙示例。无论是否存在多个克隆使用相同的引用计数器,Rc
和Arc
都会突变引用计数器。
一旦涉及到具有内部可变类型,则调用引用为“不可变”或“可变”的术语将会令人困惑和不准确,因为某些东西可以通过两者进行改动。更准确地说,“共享”和“独占”才是更精确的术语:共享引用(&T
)可以复制并与其他人分享,而独占引用(&mut T
)则保证它是该 T
的唯一 独占借用。对于大多数类型,共享引用不允许修改,但也有例外情况。由于在本书中我们将主要使用这些异常情况,因此在本书的其余部分中我们将使用更准确的术语。
请记住,内部可变性仅弯曲了共享借用规则以允许在共享时进行突变。它并不改变任何关于独占借用的事情。无论是否具有内部可变性,在某个地方导致超过一个活动独占引用的不安全代码总是会调用未定义行为。
让我们看一下几种具有内部可变性的类型及其如何通过共享引用允许突变而不会导致未定义行为。
Cell
std::cell::Cell<T>
简单地包装了一个 T
,但允许通过共享引用进行突变。为避免未定义行为,它只允许您复制值(如果 T
是 Copy
),或者整体替换另一个值。此外,它只能在单个线程中使用。 接下来看一个类似前面章节示例的示例,但这次使用的是 Cell<i32>
而不是 i32
:
1 | use std::cell::Cell; |
与上次不同,现在if
条件可能为真。因为Cell<i32>
具有内部可变性,编译器不能再假设只要我们拥有共享引用就不会改变其值。 a
和b
都可能引用相同的值,这样通过b
进行突变也可能影响到 a
. 但是它仍然可以假定没有其他线程同时访问单元格。
对于一个 Cell
, 其限制并不总是易于使用。由于它无法直接让我们借用其持有的值,因此我们需要将一个值移出(留下另一个东西),修改它,然后将其放回以更改其内容:
1 | fn f(v: &Cell<Vec<i32>>) { |
RefCell(引用计数单元)
与常规的 Cell
不同,std::cell::RefCell
允许您借用其内容,但会带来一些运行时成本。 RefCell<T>
不仅保存了一个 T
,还保存了一个计数器来跟踪任何未完成的借用。如果在它已经被可变地借用(或反之亦然)时尝试进行借用,则会引发 panic,从而避免了未定义的行为。就像 Cell
一样,只能在单个线程中使用 RefCell
。 通过调用 borrow
或 borrow_mut
来借用 RefCell
的内容:
1 | use std::cell::RefCell; |
虽然 Cell
和 RefCell
可以非常有用,但当我们需要在多个线程中执行某些操作时,它们变得相当无用。因此,让我们继续介绍与并发相关的类型。
Mutex and RwLock(互斥锁和读写锁)
一个 RwLock
或 读写锁 是 RefCell
的并发版本。一个 RwLock<T>
持有一个类型为 T
的值,并跟踪任何未完成的借用。然而,与 RefCell
不同的是,在出现冲突借用时它不会 panic。相反,它会阻塞当前线程——将其置于睡眠状态——等待冲突借用消失。我们只需耐心地等待轮到我们使用数据,其他线程使用完后再进行。
从一个 RwLock
中获取内容被称为 锁定 。通过 锁定 它,我们暂时阻止了并发的冲突借用,使得我们可以在不引起数据竞争的情况下进行借用。
一个 Mutex
非常类似,但概念上稍微简单一些。它不像 RwLock
一样跟踪共享和独占式借用的数量,而是仅允许独占式借用。
关于这些类型更详细的信息,请参见 “锁:Mutexes 和 RwLocks”。
Atomics
原子类型代表了Cell
的并发版本,是第2章和第3章的主要内容。与Cell
一样,它们通过让我们整体复制值来避免未定义行为,而不直接借用其内容。 但与Cell
不同的是,它们不能是任意大小。因此,并没有通用的适用于任何T类型的Atomic
UnsafeCell(unsafe单元)
一个 UnsafeCell
是内部可变性的基本构建块。
一个 UnsafeCell<T>
包装了一个 T
,但没有任何条件或限制来避免未定义行为。相反,它的 get()
方法只是给出了它包装的值的原始指针,这个指针只能在 unsafe
块中有意义地使用。它把如何使用留给用户,在不引起任何未定义行为的情况下使用。
最常见的情况是,一个 UnsafeCell
不会直接使用,而是被包含在另一种类型中,并通过有限接口提供安全性,例如 Cell
或者 Mutex
. 所有具有内部可变性(包括上面讨论过的所有类型)都建立在 UnsafeCell
的基础之上。
线程安全:发送和同步
在本章中,我们看到了几种不是线程安全的类型,这些类型只能在单个线程上使用,例如Rc
、Cell
等。由于需要这种限制以避免未定义行为,因此编译器需要理解并检查您是否可以使用这些类型而无需使用“unsafe”块。
语言使用两个特殊的trait来跟踪哪些类型可以安全地跨越线程使用:
Send
如果一个值得所有权可以转移到另一个线程,则该类型是“Send”。例如,“Arc
”是“Send”,但“Rc ”不是。 Sync
如果一个共享引用到该类型,“&T”,也是“Send”,则该类型就是“Sync”。例如,“i32” 是 “Sync”,但 “Cell
” 不是。(然而,“Cell ” 是 “Send”的。) 所有基本数据类型如 i32
,bool
, 和str
都同时实现了Send
和Sync
. 这两个 trait 都属于 自动 trait ,这意味着它们会根据字段自动实现你的自定义结构体。如果一个结构体的所有字段都满足了Send
和Sync
, 则该结构体本身也具有相应特性。 要退出其中任何一项,请向您的类型添加未实现该 trait 的字段。为此,通常会用到特殊的 std::marker::PhantomData类型。该类型被编译器视为 T
,但实际上在运行时并不存在。它是一个零大小的类型,不占用空间。 让我们看一下以下结构体:
1 | use std::marker::PhantomData; |
在这个例子中,如果 handle
是它唯一的字段,则 X
将同时是 Send
和 Sync
。然而,我们添加了一个大小为零的 PhantomData<Cell<()>>
字段,该字段被视为一个 Cell<()>
。由于 Cell<()>
不是可同步的(not Sync),因此 X 也不是可同步的(not Sync)。但它仍然是可发送的(Send),因为它所有的字段都实现了 Send。
原始指针(*const T
和 *mut T`)既不是 Send 也不是 Sync,因为编译器对其所代表内容知之甚少。
选择加入任何其他 trait 的方式相同;使用 impl 块来实现您类型上要用到的 trait:
1 | struct X { |
请注意,实现这些特性需要使用 unsafe
关键字,因为编译器无法检查它是否正确。这是您向编译器做出的承诺,它只能信任您。 如果您尝试将某个东西移动到另一个不是 Send
的线程中,则编译器会礼貌地阻止您这样做。以下是一个小例子来演示:
1 | fn main() { |
在这里,我们试图将一个 Rc<i32>
发送到一个新线程中,但是与 Arc<i32>
不同,Rc<i32>
没有实现 Send
。 如果我们尝试编译上面的示例,则会遇到类似于以下内容的错误:
1 | error[E0277]: `Rc<i32>` cannot be sent between threads safely |
thread::spawn
”函数要求其参数为“Send”,而闭包只有在其所有捕获的内容都是“Send”时才是“Send”的。如果我们尝试捕获某些不是“Send”的东西,就会被发现错误,从而保护我们免受未定义行为的影响。
锁定:互斥锁和读写锁
在线程之间共享(可变)数据的最常用工具是互斥锁,简称“mutex”。互斥锁的作用是通过暂时阻止试图同时访问它的其他线程来确保线程对某些数据具有独占访问权。
从概念上讲,互斥锁只有两种状态:已锁定和未锁定。当一个线程将一个未锁定的互斥锁加锁时,该互斥锁被标记为已锁定,并且该线程可以立即继续执行。然后,当另一个线程尝试去获取已经被加了锁的互斥量时,这个操作就会阻塞。在等待解除阻塞期间,该线程将进入睡眠状态。仅在已经加了锁的情况下才能解除阻塞,并且应由相同的线程进行解除阻塞操作。如果其他线程正在等待获取该互斥量,则解除阻塞将唤醒其中一个等待中的线程以再次尝试获取并继续其任务。
使用mutex保护数据只需所有参与者达成一致意见:他们只会在拥有mutex时才能访问数据。这样一来,任何两个或多个不同的进城都无法同时访问该数据,从而避免了数据竞争。
Rust的互斥锁
Rust标准库通过std::sync::Mutex<T>
提供此功能。它是针对类型T
的泛型,该类型是互斥锁所保护数据的类型。通过将这个T
作为互斥锁的一部分,数据只能通过互斥锁访问,从而实现了安全接口,并确保所有线程都遵守协议。
为确保被锁定的互斥锁只能由锁定它的线程解除锁定,它没有一个名为“unlock()” 的方法。相反,其“lock()”方法返回一种特殊类型称为“MutexGuard”。该guard表示我们已经成功地获取了互斥锁。 它通过 DerefMut
trait 表现得像独占引用, 使我们可以独占地访问受到mutex保护的数据。释放guard时会解除mutex上的加锁状态。当我们放弃使用guard时,就失去了访问数据的权利,“Drop” guard 的实现将解开mutex。
下面看一个例子来看看如何在实践中使用mutex:
1 | use std::sync::Mutex; |
在这里,我们有一个 Mutex<i32>
,它是保护整数的互斥锁,并且我们生成十个线程来每次将整数增加一百次。每个线程都会首先锁定互斥锁以获取 MutexGuard
,然后使用该 guard
访问并修改整数。当变量超出作用域时,guard
会被隐式释放。
在线程完成后,我们可以通过 into_inner()
安全地从整数中删除保护。into_inner()
方法拥有互斥锁的所有权,这保证了没有其他东西可以再引用该互斥锁了,因此不需要进行加锁操作。
即使增量按照步长为1发生,在观察整数的线程只能看到100的倍数值, 因为它只能在解除互斥锁时查看该整数。有效地说, 多个一百次递增现在成为单个不可分割 - 原子 - 操作得益于互斥体。
要清楚地看到互斥体的效果,请让每个线程等待一秒钟才解除互斥体:
1 | use std::time::Duration; |
现在运行程序,你会发现它需要大约10秒才能完成。每个线程只等待一秒钟,但互斥锁确保同一时间只有一个线程可以这样做。
如果我们在睡眠一秒钟之前放弃保护(因此解锁互斥锁),我们将看到它并行发生:
1 | fn main() { |
通过这个改变,该程序现在只需要大约一秒钟的时间,因为现在10个线程可以同时执行它们的一秒睡眠。这表明了保持互斥锁锁定时间尽可能短的重要性。将互斥锁锁定时间超过必要时间会完全抵消并行性带来的任何好处,有效地强制所有事情按顺序发生。
锁中毒
上面示例中的 unwrap()
调用与 锁中毒 有关。
在 Rust 中,当一个线程在持有锁时发生 panic 时,Mutex
就会被标记为 已中毒。这种情况下,Mutex
不再被锁定,但调用其 lock
方法将导致返回一个 Err
来指示它已经被中毒了。
这是一种机制来防止保护互斥量所保护的数据处于不一致状态。在我们上面的示例中,如果一个线程在递增整数少于100次后发生 panic,则互斥量将解锁,并且整数将处于意外状态,在那里它不再是100的倍数,可能破坏其他线程所做出的假设。自动标记互斥量为已污染可以强制用户处理此可能性。
对受感染的互斥体调用 lock()
仍然会锁定该互斥体。由 lock()
返回的 Err
包含了 MutexGuard
, 允许我们根据需要纠正不一致状态。
虽然看起来像是强大机制, 但实际上从潜在不一致状态恢复并不常见。大多数代码要么忽略 poison 或使用 unwrap() 在 lock 被污染时 panic,从而将 panic 传播给互斥量的所有用户。
MutexGuard 的生命周期
虽然隐式地丢弃 guard 以解锁互斥体很方便,但有时会导致微妙的意外。如果我们使用 let
语句为 guard 分配一个名称(就像上面的示例中一样),那么相对来说比较容易看出它何时被丢弃,因为局部变量在定义它们的作用域结束时被丢弃。尽管如此,不显式地放弃 guard 可能会导致保持互斥体锁定时间超过必要时间,在上面的示例中已经演示了这一点。
在不给 guard 分配名称的情况下使用它也是可能的,并且有时非常方便。由于 MutexGuard
表现得像受保护数据的独占引用,因此我们可以直接使用它而无需先将其分配给一个变量名。例如,如果您有一个 Mutex<Vec<i32>>
,则可以在单个语句中锁定互斥体、将项目推入到 Vec
中并再次解锁互斥体:
1 | list.lock().unwrap().push(1); |
在较大的表达式中产生的任何临时变量,例如lock()
返回的保护条件,都将在语句结束时被删除。虽然这似乎很明显和合理,但它会导致一个常见陷阱,通常涉及到match
、 if let
或者 while let
语句。以下是一个遇到此问题的示例:
1 | if let Some(item) = list.lock().unwrap().pop() { |
如果我们的意图是锁定列表、弹出一个项目、解锁列表,然后在解锁列表之后处理该项目,那么我们在这里犯了一个微妙但重要的错误。临时保护程序直到整个 if let
语句结束才会被释放,这意味着我们在处理该项时不必要地持有锁。
令人惊讶的是,在类似于此示例中的类似 if
语句中不会发生这种情况:
1 | if list.lock().unwrap().pop() == Some(1) { |
这里,在执行 if
语句体之前,临时守卫确实会被删除。原因是常规 if
语句的条件始终是一个普通布尔值,不能借用任何东西。没有理由将从条件到语句末尾的临时变量寿命延长。然而,对于 if let
语句可能不是这种情况。例如,如果我们使用了 front()
而不是 pop()
,则 item 将从列表中借用,这使得必须保留守卫。由于借用检查器只是一个检查,并不影响事物何时以及以什么顺序被删除,即使我们使用了 pop() ,同样也会发生这种情况。
我们可以通过将弹出操作移动到单独的 let 语句中来避免这种情况。然后在该声明结束之前放弃 guard,在 if let 中:
1 | let item = list.lock().unwrap().pop(); |
读者-写者锁
互斥锁只关心独占访问。即使我们只想查看数据并且共享引用(&T)已经足够,MutexGuard
也会为我们提供对受保护数据的独占引用(&mut T)。
读者-写者锁是互斥锁的稍微复杂一点的版本,它理解排他和共享访问之间的区别,并可以提供任何一种类型。它有三个状态:未加锁、由单个写入器(用于独占访问)加锁以及由任意数量的读取器(用于共享访问)加锁。它通常用于多线程频繁读取但偶尔更新数据。
Rust标准库通过 std::sync::RwLock<T>
类型提供了这种类型的锁。它与标准 Mutex
的工作方式类似,但其接口大部分被拆分成两部分。它具有一个 read()
和一个 write()
方法来进行阻塞式地以读或写模式进行加锁操作。 它带有两种警戒类型,一种是针对读取器而言,另一种则是针对编写器而言:RwLockReadGuard
和 RwLockWriteGuard
. 前者仅实现了Deref以表现为受保护数据的共享引用,而后者还实现了DerefMut以表现为独占引用。
它有效地是 RefCell
的多线程版本,动态跟踪引用数量以确保借用规则得到遵守。
无论是 Mutex<T>
还是 RwLock<T>
都需要T为Send,因为它们可以被用于将T发送到另一个线程。此外,RwLock<T>
还要求 T 也实现 Sync ,因为它允许多个线程持有对受保护数据的共享引用(&T)。 (严格来说,您可以创建一个不满足这些要求的 T 的锁定器,但您将无法在线程之间共享该锁定器本身不会实施同步)。
Rust标准库仅提供一种通用目的的 RwLock
类型,但其实现取决于操作系统。读者-写者锁实现之间存在许多微妙差异。当等待编写器时大多数情况下都会阻止新读取器加入即使已经处于读取状态下也一样。这样做是为了防止编写程序饥饿(writer starvation),即许多读取器集体阻止锁从未解除并永远不允许任何编写程序更新数据的情况。
其他语言中的互斥锁
Rust 的标准 Mutex
和 RwLock
类型与其他语言(如 C 或 C++)中的类型有所不同。
最大的区别在于 Rust 的 Mutex<T>
包含 它所保护的数据。例如,在 C++ 中,std::mutex
并不包含它所保护的数据,也不知道它正在保护什么。这意味着用户需要记住哪些数据受到保护以及由哪个互斥锁进行了保护,并确保每次访问“受保护”数据时都正确地锁定相应的互斥锁。当阅读涉及其他语言中互斥锁代码或与不熟悉 Rust 的程序员交流时,这一点很有用。一个 Rust 程序员可能会谈论“互斥体内部的数据”,或者说像“将其包装在一个互斥体中”,这对那些只熟悉其他语言中互斥体而非 Rust 时可能会感到困惑。
如果您真正需要一个独立于任何内容且并未包含任何东西的 mutex,例如用于保护某些外部硬件,则可以使用 Mutex<()>
。但即使是在这种情况下,您也最好定义一个(可能为零大小)类型来接口该硬件,并将其包装在 Mutex
中。这样,您仍然需要在与硬件交互之前锁定互斥锁。
等待:停车和条件变量
当数据被多个线程改变时,有许多情况需要等待某些事件发生,等待一些关于数据的条件成为真。例如,如果我们有一个互斥锁来保护 Vec
,我们可能希望等到它包含任何内容。
虽然互斥锁允许线程等待直到它解锁,但它不提供等待任何其他条件的功能。如果只有互斥锁可用,我们将不得不继续锁定互斥锁以重复检查是否已经在 Vec
中包含了任何东西。
线程停车
一种从另一个线程中等待通知的方法称为线程停车。一个线程可以 park 自己,这会使其进入睡眠状态,并阻止其消耗任何 CPU 周期。然后另一个线程可以 unpark 已经停放的线程并唤醒它。
通过 std::thread::park()
函数可以实现线程停车。对于取消停放,则调用表示要取消停放的线 程序所代表的 Thread
对象上的 unpark()
方法即可完成操作。这样的对象可以通过由 spawn
返回的 join 句柄获得或者通过当前正在运行该函数本身获取(使用 std :: thread :: current())。
下面我们来看一个使用互斥锁在两个线程之间共享队列的示例。在下面的示例中,新生成的线程将从队列中消耗项目,而主线程将每秒向队列插入一个新项目。当队列为空时,使用线程停车使得消费者线程等待。
1 | use std::collections::VecDeque; |
消费线程运行一个无限循环,从队列中弹出项目并使用“dbg”宏显示它们。当队列为空时,它停止并使用“park()”函数休眠。如果被唤醒,则“park()”调用返回,“loop”继续,再次从队列中弹出项目直到为空为止。
生产线程每秒钟生成一个新数字,并将其推入队列中。每次添加项时,它都会在引用消费线程的Thread对象上使用unpark()方法来取消挂起。这样,消费线程就会被唤醒以处理新元素。
这里需要注意的一点是:即使我们删除了parking操作,该程序仍然在理论上是正确的但效率低下。这很重要,因为”park()”不能保证只有匹配的”unpark()”才能返回。虽然罕见, 但可能存在虚假唤醒 。我们的示例可以很好地处理这个问题, 因为消费者线程将锁定队列、检查是否为空, 然后直接解锁并重新进入休眠状态。
线程停车的一个重要属性是,在线程自己停车之前调用unpark()
不会丢失请求. 请求取消挂起仍然记录在案,并且下一次尝试让该线程进入休眠状态时清除该请求,并直接继续而不实际进入休眠状态。为了看到这对正确操作的关键性,让我们通过两个线程执行的步骤可能的排序来进行说明:
- 消费线程(称之为C)锁定队列。
- C尝试从队列中弹出一个项目,但它是空的,结果返回“None”。
- C解锁队列。
- 生产线程(我们将其称为P)锁定队列。
- P将新项推送到队列中。
- P再次解锁该队列。
- P调用
unpark()
通知C有新项可用. - C调用
park()
进入休眠状态, 等待更多项目。
虽然在第3步释放队列和第8步停车之间只有非常短暂的时刻,但第4至7步可能会在该时刻发生。如果’unpark()’如果线程没有挂起,则什么也不做,则通知将丢失。即使在队列中有一个项目, 消费者线程仍然会等待。由于取消挂起请求被保存以供未来调用’park()’, 我们无需担心这一点。
但是, 取消挂起请求并不堆叠。连续两次调用’unpark()’, 然后再连续两次调用’park()’, 仍然导致该线程进入睡眠状态。第一个’park()’清除请求并直接返回,但第二个则像往常一样进入睡眠状态。
这意味着在上面的示例中,重要的是只有在看到队列为空时才将线程挂起,而不是在处理每个项目后都将其挂起。虽然由于巨大(一秒钟)的休眠时间,在此示例中极不可能发生, 但多次’unpark()’调用可能会唤醒仅单个’park()’调用.
不幸的是,这意味着如果在队列被锁定并清空之前,park()
返回后立即调用了unpark()
,则unpark()
调用是不必要的但仍会导致下一个park()
调用立即返回。这将导致(空)队列多次被锁定和解锁。虽然这不影响程序的正确性,但它确实影响了其效率和性能。
对于像我们示例中那样简单的情况,此机制运作良好,但当事情变得更加复杂时很快就会崩溃。例如,如果我们有多个消费者线程从同一队列中取出项目,则生产者线程将无法知道哪个消费者正在等待并应该唤醒。生产者将必须准确地知道何时有消费者在等待以及它正在等待什么条件。
条件变量
条件变量是等待由互斥锁保护的数据发生某些事情的更常用选项。它们有两个基本操作:wait和notify。线程可以在条件变量上等待,然后当另一个线程通知同一条件变量时,它们可以被唤醒。多个线程可以在同一条件变量上等待,并且通知可以发送给一个等待线程或所有等待线程。
这意味着我们可以为特定事件或我们感兴趣的条件创建一个条件变量,例如队列非空,并在该条件下进行等待。任何导致该事件或条件发生的线程都会通知该条件变量,而无需知道哪些或多少个线程对该通知感兴趣。
为了避免在解锁互斥锁并等待条件变量之间短暂时刻内错过通知问题, 条件变量提供了一种以原子方式解锁互斥锁并开始等待的方法。这意味着根本没有可能让通知丢失。
Rust标准库提供了std::sync::Condvar
作为一个条件变量。其wait
方法需要一个证明我们已经锁定互斥锁的 MutexGuard
。它首先解除互斥锁并进入休眠状态,在稍后被唤醒时重新获取互斥锁并返回一个新的MutexGuard
(证明互斥锁再次被锁定)。
它有两个通知函数: notify_one
用于唤醒等待线程中的一个(如果有),而 notify_all
则将它们全部唤醒。
让我们修改我们用于线程停车的示例,改为使用 Condvar
:
1 | use std::sync::Condvar; |
我们不得不改变一些东西:
- 现在我们不仅有一个包含队列的
Mutex
,还有一个用于通信“非空”条件的Condvar
。 - 我们不再需要知道要唤醒哪个线程,因此我们不再存储从
spawn
返回的值。相反,我们通过条件变量使用notify_one
方法通知消费者。 - 解锁、等待和重新锁定都由
wait
方法完成。为了能够将守卫传递给wait
方法,同时在处理项目之前放弃它,我们必须稍微重构控制流程。
现在我们可以生成尽可能多的消费线程,甚至稍后再生成更多线程,而无需更改任何内容。条件变量负责将通知传递给感兴趣的任何线程。
如果我们有一个更复杂的系统,其中不同条件下感兴趣的线程,则可以为每个条件定义一个“Condvar”。例如,我们可以定义一个指示队列非空和另一个指示队列为空的“Condvar”。然后每个线程都会等待与其正在执行任务相关联的条件。
通常,“Condvar”仅与单个“Mutex”一起使用。如果两个线程尝试使用两个不同互斥锁并发地等待条件变量,则可能会导致恐慌。
“Condvar”的缺点是它只能在与“Mutex”一起使用时才有效,但对于大多数用例来说这完全没问题,因为已经用于保护数据了。
thread::park()
和 Condvar::wait()
还具有带时间限制的变体:thread::park_timeout()
和 Condvar::wait_timeout()
。这些需要额外提供持续时间(Duration)参数,在此之后应放弃等待通知并无条件唤醒。
总结
- 多个线程可以在同一个程序中同时运行,并且可以随时生成。
- 当主线程结束时,整个程序也会结束。
- 数据竞争是未定义的行为,在 Rust 的类型系统中完全被防止(在安全代码中)。
- 可以将“Send”数据发送到其他线程,“Sync”数据可在多个线程之间共享。
- 常规线程可能会一直运行到程序结束,因此只能借用
static
数据,例如静态变量和泄漏分配内存等。 - 引用计数 (
Arc
) 可以用于共享所有权,确保数据至少有一个线程正在使用它的时间与其生命周期相同。 - 作用域限定的线程对于限制线程寿命以允许其借用非
static
数据(例如局部变量)非常有用。 &T
是共享引用。&mut T
是独占引用。普通类型不允许通过共享引用进行突变操作。- 由于
UnsafeCell
的存在,某些类型具有内部可变性,这使得通过共享引用进行突变成为可能。 Cell
和RefCell
是单线程内部可变性的标准类型。原子、互斥锁和读写锁则是它们的多线程等效物品- Cell 和原子仅允许整体替换值,而 RefCell、Mutex 和 RwLock 允许通过动态执行访问规则直接修改值。
- 线程停车可以是等待某些条件的便捷方式。
- 当条件涉及由
Mutex
保护的数据时,使用Condvar
比线程停车更方便,并且可能更有效。