安全高速處理 concurrent programming 是 rust 的另一個主要目標。
concurrent programming 就是程式不同部分獨立執行
parallel programming 就是程式不同部分同時執行
因為在 context 中處理這些問題很容易出錯,rust 想解決這個問題
owner 與 類別系統是解決 memory leak 與 concurrency 的有效工具,使得 rust 很多 concurrency error 都變成編譯期的錯誤。因此我們可在開發時,就解決問題,可稱為 fearless concurrency。
(以下說到 concurrency 同時有兩個意思:concurrent 及 parallel programming)
Erlang 有著優雅的消息傳遞並發功能,但只有模糊不清的在線程間共享狀態的方法。高階語言只實現可能解決方案的子集合是合理的策略,因為高級語言強調犧牲一些控制,換取更高的抽象性。
- 如何產生 thread
- message passing concurrency,其中 channel 用來在 thread 之間傳遞資料
- shared state concurrency,其中多個 threads 可存取同一片資料
Sync
與Send
trait,可讓 rust 的 concurrency 擴充到自訂的的與 std library 提供的類別中
以 Thread 同時執行程式
大部分 OS 將程式運作於一個 process 中,而 OS 負責管理 process。程式內部也可以有多個同時運作的獨立部分,稱為 threads
將程式拆分多個 threads 可改善效能,但也會增加複雜性。因 theads 同時運作,無法保證不同 thread 間程式的運作順序,導致發生以下問題
- race condition: 多個 thread 以不同順序存取資料或資源
- deadlock: 兩個 threads 互相等待對方停止使用資源,而無法繼續運作
- 只會發生在特定情況,且無法重現與修復的 bug
rust 嘗試解決使用 thread 的負面影響。
不同程式語言用不同方法實作 thread,很多 OS 提供了產生 thread 的 API。由 programming language 呼叫 OS API 產生 thread 的方法也稱為 1:1,一個 OS thread 對應一個 language thread
有些語言自己提供特殊的 thread,被稱為 green thread,使用 green thread 的語言會在不同數量的 OS thread context 中執行它們。因此 green thread model 被稱為 M:N model,M 個 green thread 對應 N 個 OS thread
對 rust 來說,最重要的取捨是 runtime support。runtime 在不同 context 有不同含意。
在目前的 context 中,runtime 代表 binary file 中包含語言自己提供的 code。這些 code 可大可小,但所有 non-assembly language 都有一定數量的 runtime code。更小的 runtime code 有較少功能,但 binary 也比較小,更容易在更多 context 中跟其他語言整合。雖然很多語言覺得增加 runtime code 換取功能是可接受的,但 rust 目標要做到幾乎沒有 runtime code,保持高效能且要能呼叫 C 語言。
rust 不使用 green thread,採用 1:1 thread model,有其他 crate 實現了 M:N thread model,那些 thread 犧牲了效能,但換來更好的 thread control 與更低的 context switching cost
以 spawn
產生 thead
呼叫 thread::spawn
並傳遞 closure 參數,用以產生 thread,該 closure 中包含希望在新 thread 中運作的 code
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
// main thread 列印其他資料
// 當 main thread 結束,整個程式都會結束
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
thread::sleep
強制 thread 停止執行一小段時間,這會允許其他不同的線程運行。這些線程可能會輪流運行,不過並不保證如此:這跟操作系統如何切換thread 有關。即使新建的線程 code 列印直到 i
等於 9 ,但它在主線程結束之前也只列印到了 5。
hi number 1 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
使用 join 等待所有 thread 結束
由於 main thread 結束,所有 thread 就會被中止
可透過 thread::spawn
的回傳值,儲存到變數中,用來修復 thread 沒有被完全執行的問題。
thread::spawn
的回傳值類別是 JoinHandle
。JoinHandle
是一個擁有所有權的值,當對其呼叫 join
方法時,它會等待其線程結束。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// handle 的 join 會 blocking main thread 直到 handle 所代表的線程結束
handle.join().unwrap();
}
如果把 join 提到 for 前面, main thread 會先等待 new thread 完成,再繼續 main thread,所以列印就不會交錯出現
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
在 thread 使用 move
closure
move
closure 常常跟 thread::spawn
一起使用,可讓我們在一個 thread 使用另一個 thread 的資料。
chap 13 提到可在參數列表前,使用 move
強制 closure 獲取其使用環境值的所有權。這個方法在將 closure 傳遞給 new thread 以便將資料移動到 new thread 中最實用。
以下是一個嘗試在主線程中產生一個 vector 並用於新建線程的例子
closure 使用了 v
,所以 closure 會捕獲 v
並使其成為 closure 環境的一部分。因為 thread::spawn
在一個新線程中執行這個 closure,所以可以在新線程中使用 v
。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
但發生編譯錯誤
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src/main.rs:6:32
|
6 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
7 | println!("Here's a vector: {:?}", v);
| - `v` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/main.rs:6:18
|
6 | let handle = thread::spawn(|| {
| __________________^
7 | | println!("Here's a vector: {:?}", v);
8 | | });
| |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
|
6 | let handle = thread::spawn(move || {
| ^^^^^^^
Rust 會 推斷 如何捕獲 v
,因為 println!
只需要 v
的引用,closure 嘗試借用 v
。然而這有一個問題:Rust 不知道這個新線程會執行多久,所以無法知道 v
的引用是否一直有效。
以下是 v
的引用有可能不再有效的狀況
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});
drop(v); // oh no!
handle.join().unwrap();
}
透過在閉包之前增加 move
關鍵字,我們強制 closure 獲取其使用的值的所有權,而不是任由 Rust 推斷它應該借用值。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
一旦 move
v 的所有權,就無法在 main thread 中再使用 v
了,也就是不能在 main thread 將 v 丟棄 drop(v);
使用 Message Passing 在 Thread 間傳遞資料
確保安全的 concurrency 方法是 message passing。Go 的口號:Do not communicate by sharing memory; instead, share memory by communicating.
rust 實現 messaging passing 的方法是 channel,可想像為一條河,將小船放在河中。
channel 有兩個部分:transmitter, receiver。
範例
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
使用 mpsc::channel
函數產生一個新的通道;mpsc
是 多個生產者,單個消費者(multiple producer, single consumer)的縮寫。換句話說,一個 channel 可以有多個產生值的發送端,但只能有一個接收端。
可在 main thread 接收 channel 的值
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
接收端有 recv
與 try_recv
兩種 method,recv
會 blocking 程式直到接收到值: Result<T, E>
,當 channel 被關閉,recv
會收到 error
try_recv
不會 blocking 程式,會立刻回傳 Result<T,E>
: Ok
表示有資料,Err
表示沒有資料。
Channels and Ownership Transference
ownership 規則在訊息傳遞中,有助於編寫安全的 concurrency code,避免錯誤。
以下嘗試在 new thread 中發送 val
後再使用它,結果編譯時就發生錯誤,一但把值發送到另一個 thread,就不能再使用它。
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val);
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
編譯錯誤
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:31
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `std::string::String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
發送多個值
main thread 中沒有任何暫停的 code,而是等待 channel 被關閉,會自動結束 for 迴圈
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
// 每次發送後暫停 1s
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
利用 cloning transmitter 產生多個 producers
mpsc
是 multiple producer, single consumer 的縮寫,可透過 clone 發送端,產生多個 producer
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 以 clone 產生另一個 tx,在不同 thread 使用不同的 tx
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1 hi"),
String::from("1 from"),
String::from("1 the"),
String::from("1 thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("2 more"),
String::from("2 messages"),
String::from("2 for"),
String::from("2 you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
執行結果
Got: 1 hi
Got: 2 more
Got: 1 from
Got: 2 messages
Got: 2 for
Got: 1 the
Got: 1 thread
Got: 2 you
Shared-State Concurrency
雖然 message passing 是處理 concurrency 很好的方法,但不是唯一一種方法。
channel 都是單所有權,因為一旦將一個值傳送到通道中,將無法再使用這個值。共享內存有多所有權:多個線程可以同時使用相同的memory。rust 以 mutex
達到這個功能。
以 mutexes 一次允許一個 thread 存取資料
mutex 就是 mutual exclusion 的縮寫
為了存取 mutex 的資料,thread 必須先取得 lock,lock 是 mutex 中一部分資料結構,會記錄誰有該資料的使用權。換句話說, mutex 是利用 locking system 保護資料。
mutex 很難使用,因此很多人還是使用 channel,要記住兩個原則:
- 在使用資料前,要先取得 lock
- 用完 mutex 保護的資料後,要 release lock
不過 rust 的類別及 ownership 系統,讓我們不會在 locking, unlocking 出錯
Mutex<T>
的 API
從 single thread context 使用 mutex 開始測試
use std::sync::Mutex;
fn main() {
// 用 new 產生 mutex
let m = Mutex::new(5);
{
// 用 lock 獲取鎖,使用資料,這是 blocking function call
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
如果另一個線程擁有鎖,並且那個線程 panic 了,則呼叫 lock
會失敗。在這種情況下,沒人能夠再獲取鎖,所以這裡選擇 unwrap
並在遇到這種情況時使線程 panic。
Mutex<T>
是一個智能指針。lock
會回傳一個叫做 MutexGuard
的智能指針。這個智能指針實現了 Deref
來指向其內部數據;其也提供了一個 Drop
實現當 MutexGuard
離開作用域時自動釋放鎖,這正發生內部作用域的結尾。我們不會忘記釋放鎖並阻塞互斥器為其它線程,因為鎖的釋放是自動發生的。
丟棄了鎖之後,可以列印出互斥器的值,並發現能夠將其內部的 i32
改為 6。
在多個 threads 之間分享 Mutex<T>
以下嘗試用 Mutex<T>
在十個線程中對同一個計數器值加一,這樣計數器將從 0 變為 10。注意,接下來的幾個例子會出現編譯錯誤
use std::sync::Mutex;
use std::thread;
fn main() {
// 產生 counter 變數來存放內含 i32 的 Mutex<T>
let counter = Mutex::new(0);
let mut handles = vec![];
// 產生 10 個 threads,所有 thread 使用相同的 closure
// closure 會呼叫 lock 獲取 Mutex<T> 的 lock,然後 +1
// 離開 scope 時,會自動 release lock
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
// 以 join 等待所有 thread 結束
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
編譯錯誤
error[E0382]: use of moved value: `counter`
--> src/main.rs:12:36
|
6 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
12 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
13 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
訊息是說, counter 被移入了 closure,但這是我們需要的功能,卻發生編譯錯誤
接下來嘗試簡化程式,只產生兩個 threads
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
let handle2 = thread::spawn(move || {
let mut num2 = counter.lock().unwrap();
*num2 += 1;
});
handles.push(handle2);
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
編譯錯誤
error[E0382]: use of moved value: `counter`
--> src/main.rs:15:33
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
8 | let handle = thread::spawn(move || {
| ------- value moved into closure here
9 | let mut num = counter.lock().unwrap();
| ------- variable moved due to use in closure
...
15 | let handle2 = thread::spawn(move || {
| ^^^^^^^ value used here after move
16 | let mut num2 = counter.lock().unwrap();
| ------- use occurs due to use in closure
error[E0382]: borrow of moved value: `counter`
--> src/main.rs:26:29
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
15 | let handle2 = thread::spawn(move || {
| ------- value moved into closure here
16 | let mut num2 = counter.lock().unwrap();
| ------- variable moved due to use in closure
...
26 | println!("Result: {}", *counter.lock().unwrap());
| ^^^^^^^ value borrowed here after move
第一個錯誤為 counter 被移入 handle 的 thread closure 中,因此無法在第二個 thread 呼叫 lock
rust 明確說明,無法將 counter 的 ownership 移動到多個 threads 裡面,接下來透過 chap15 的 multiple-ownership 方法解決
在 multiple threads 使用 multiple ownership
chap 15 用 smart pointer Rc<T>
產生引用計數的值,以便擁有多個 owner。
嘗試將 Mutex<T>
封裝到 Rc<T>
,並在 ownership 移入 thread 之前,clone Rc<T>
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
再次發生編譯錯誤
error[E0277]: `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:22
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
|
= help: within `[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
= note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
= note: required by `std::thread::spawn`
Rc<T>
並不能安全的在線程間共享。當 Rc<T>
管理引用計數時,它必須在每一個 clone
呼叫時增加計數,並在每一個克隆被丟棄時減少計數。Rc<T>
並沒有使用任何 concurrency 處理,來確保改變計數的操作不會被其他線程打斷。在計數出錯時可能會導致詭異的 bug,比如可能會造成 memory leak,或在使用結束之前就丟棄一個值。我們所需要的是一個完全類似 Rc<T>
,又以一種線程安全的方式改變引用計數的類型。
Atomic Reference Counting with Arc<T>
Arc<T>
是類似 Rc<T>
但又能安全用在 multi-thread 環境的類別。這是一個 atomically reference counted 類別,可查閱 std::sync::atomic
文件得到更多資訊
因為 Arc<T>
是thread-safe,所以性能較差,因此不是所有原始類別都是 atomic。
回到剛剛的例子,將 Arc<T>
替換掉 Rc<T>
就可以編譯與執行
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
RefCell<T>
/Rc<T>
與 Mutex<T>
/Arc<T>
的相似處
因為 counter 不可變,但可獲取內部值的可變引用,這表示 Mutex<T>
提供內部可變性,類似 Cell
類別。
使用 RefCell<T>
可改變 Rc<T>
的內容,同樣可使用 Mutex<T>
改變 Arc<T>
的內容
另一個要注意的是,rust 無法避免使用 Mutex<T>
全部的邏輯錯誤。在 chap15 使用 Rc<T>
可能造成引用循環,兩個 Rc<T>
互相引用,就會發生 memory leak。
同理 Mutex<T>
也有可能造成 deadlock,如果一個操作要鎖定兩個資源,而兩個 thread 各持有一個 lock,就會發生 deadlock
可參閱 Mutex<T>
與 MutexGuard
的文件說明
以 Sync 與 Send Trait 擴充 Concurrency
rust 的 concurrency 處理,在程式語言層面來說,不需要知道太多 concurrency 處理的問題。
不過有兩個 concurrency 概念是內嵌於程式語言中:std::marker
中的 Sync
和 Send
trait。
Send
可在 threads 之間轉移 ownership
Send
標記 trait 表示類別的所有權可以在線程間傳遞。幾乎所有的 Rust 類型都有實作 Send
,不過有一些例外,包括 Rc<T>
:這是不能 Send
的,因為如果clone了 Rc<T>
的值並嘗試將clone的所有權轉移到另一個線程,這兩個線程都可能同時更新引用計數。因此,Rc<T>
用於單線程,這時不需要為擁有線程安全的引用計數而付出性能代價。
Rust 類別系統和 trait bound 確保永遠也不會意外的將不安全的 Rc<T>
在線程間發送。當嘗試在示例 16-14 中這麼做的時候,會得到錯誤 the trait Send is not implemented for Rc<Mutex<i32>>
。而使用標記為 Send
的 Arc<T>
時,就沒有問題了。
任何完全由 Send
的類型組成的類型也會自動被標記為 Send
。也就是說,幾乎所有基本類型都是 Send
的,除了 chap19 將會討論的 raw pointer。
Sync
可允許多個 threads 使用資料
Sync
標記 trait 表是一個實作了 Sync
的類別,可以安全的在多個線程中擁有其值的引用。換一種方式來說,對於任意類型 T
,如果 &T
(T
的引用)是 Send
的話 T
就是 Sync
的,這意味著其引用就可以安全的發送到另一個線程。類似於 Send
的情況,基本類別是 Sync
的,完全由 Sync
的類別組成的類別也是 Sync
的。
智能指針 Rc<T>
也不是 Sync
的,出於其不是 Send
相同的原因。RefCell<T>
和 Cell<T>
系列類別不是 Sync
的。RefCell<T>
在運行時所進行的借用檢查也不是線程安全的。
Mutex<T>
是 Sync
的,正如 “在線程間共享 Mutex<T>
” 部分所講的它可以被用來在多線程中共享使用。
直接實作 Send
與 Sync
是不安全的
通常並不需要實作 Send
和 Sync
trait,因為由 Send
和 Sync
的類別組成的類別,自動就是 Send
和 Sync
的。因為他們是標記 trait,甚至都不需要實現任何方法。他們只是用來加強並發相關的不可變性的。
實作這些標記 trait 涉及到編寫不安全的 Rust 代碼, chap 19 將會講述具體的方法;當前重要的是,在產生新的由不是 Send
和 Sync
的部分構成的並發類別時需要多加小心,以確保維持其安全保證。The Nomicon 中有更多關於這些保證以及如何維持他們的資訊。
Summary
因為 Rust 本身很少有處理並發的內容,有很多的並發方案都由 crate 實現。他們比標準庫要發展的更快;請在網上搜索當前最新的用於多線程場景的 crate。
Rust 提供了用於消息傳遞的通道,和像 Mutex<T>
和 Arc<T>
這樣可以安全的用於並發上下文的智能指針。類別系統和借用檢查器會確保這些場景中的代碼,不會出現 race condition 和無效的引用。一旦代碼可以編譯了,我們就可以堅信這些代碼可以正確的運行於多線程環境,而不會出現其他語言中經常出現的那些難以追蹤的 bug。
沒有留言:
張貼留言