2020/6/8

rust16 Concurrency

安全高速處理 concurrent programming 是 rust 的另一個主要目標。


concurrent programming 就是程式不同部分獨立執行


parallel programming 就是程式不同部分同時執行


因為在 context 中處理這些問題很容易出錯,rust 想解決這個問題


owner 與 類別系統是解決 memory leak 與 concurrency 的有效工具,使得 rust 很多 concurrency error 都變成編譯期的錯誤。因此我們可在開發時,就解決問題,可稱為 fearless concurrency。


(以下說到 concurrency 同時有兩個意思:concurrent 及 parallel programming)


Erlang 有著優雅的消息傳遞並發功能,但只有模糊不清的在線程間共享狀態的方法。高階語言只實現可能解決方案的子集合是合理的策略,因為高級語言強調犧牲一些控制,換取更高的抽象性。


  • 如何產生 thread
  • message passing concurrency,其中 channel 用來在 thread 之間傳遞資料
  • shared state concurrency,其中多個 threads 可存取同一片資料
  • SyncSend trait,可讓 rust 的 concurrency 擴充到自訂的的與 std library 提供的類別中

以 Thread 同時執行程式


大部分 OS 將程式運作於一個 process 中,而 OS 負責管理 process。程式內部也可以有多個同時運作的獨立部分,稱為 threads


將程式拆分多個 threads 可改善效能,但也會增加複雜性。因 theads 同時運作,無法保證不同 thread 間程式的運作順序,導致發生以下問題


  • race condition: 多個 thread 以不同順序存取資料或資源
  • deadlock: 兩個 threads 互相等待對方停止使用資源,而無法繼續運作
  • 只會發生在特定情況,且無法重現與修復的 bug

rust 嘗試解決使用 thread 的負面影響。


不同程式語言用不同方法實作 thread,很多 OS 提供了產生 thread 的 API。由 programming language 呼叫 OS API 產生 thread 的方法也稱為 1:1,一個 OS thread 對應一個 language thread


有些語言自己提供特殊的 thread,被稱為 green thread,使用 green thread 的語言會在不同數量的 OS thread context 中執行它們。因此 green thread model 被稱為 M:N model,M 個 green thread 對應 N 個 OS thread


對 rust 來說,最重要的取捨是 runtime support。runtime 在不同 context 有不同含意。


在目前的 context 中,runtime 代表 binary file 中包含語言自己提供的 code。這些 code 可大可小,但所有 non-assembly language 都有一定數量的 runtime code。更小的 runtime code 有較少功能,但 binary 也比較小,更容易在更多 context 中跟其他語言整合。雖然很多語言覺得增加 runtime code 換取功能是可接受的,但 rust 目標要做到幾乎沒有 runtime code,保持高效能且要能呼叫 C 語言。


rust 不使用 green thread,採用 1:1 thread model,有其他 crate 實現了 M:N thread model,那些 thread 犧牲了效能,但換來更好的 thread control 與更低的 context switching cost


spawn 產生 thead


呼叫 thread::spawn 並傳遞 closure 參數,用以產生 thread,該 closure 中包含希望在新 thread 中運作的 code


use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    // main thread 列印其他資料
    // 當 main thread 結束,整個程式都會結束
    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

thread::sleep 強制 thread 停止執行一小段時間,這會允許其他不同的線程運行。這些線程可能會輪流運行,不過並不保證如此:這跟操作系統如何切換thread 有關。即使新建的線程 code 列印直到 i 等於 9 ,但它在主線程結束之前也只列印到了 5。


hi number 1 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

使用 join 等待所有 thread 結束


由於 main thread 結束,所有 thread 就會被中止


可透過 thread::spawn 的回傳值,儲存到變數中,用來修復 thread 沒有被完全執行的問題。


thread::spawn 的回傳值類別是 JoinHandleJoinHandle 是一個擁有所有權的值,當對其呼叫 join 方法時,它會等待其線程結束。


use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    // handle 的 join 會 blocking main thread 直到 handle 所代表的線程結束
    handle.join().unwrap();
}

如果把 join 提到 for 前面, main thread 會先等待 new thread 完成,再繼續 main thread,所以列印就不會交錯出現


use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

在 thread 使用 move closure


move closure 常常跟 thread::spawn 一起使用,可讓我們在一個 thread 使用另一個 thread 的資料。


chap 13 提到可在參數列表前,使用 move 強制 closure 獲取其使用環境值的所有權。這個方法在將 closure 傳遞給 new thread 以便將資料移動到 new thread 中最實用。


以下是一個嘗試在主線程中產生一個 vector 並用於新建線程的例子


closure 使用了 v,所以 closure 會捕獲 v 並使其成為 closure 環境的一部分。因為 thread::spawn 在一個新線程中執行這個 closure,所以可以在新線程中使用 v


use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

但發生編譯錯誤


error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

Rust 會 推斷 如何捕獲 v,因為 println! 只需要 v 的引用,closure 嘗試借用 v。然而這有一個問題:Rust 不知道這個新線程會執行多久,所以無法知道 v 的引用是否一直有效。


以下是 v 的引用有可能不再有效的狀況


use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

透過在閉包之前增加 move 關鍵字,我們強制 closure 獲取其使用的值的所有權,而不是任由 Rust 推斷它應該借用值。


use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

一旦 move v 的所有權,就無法在 main thread 中再使用 v 了,也就是不能在 main thread 將 v 丟棄 drop(v);


使用 Message Passing 在 Thread 間傳遞資料


確保安全的 concurrency 方法是 message passing。Go 的口號:Do not communicate by sharing memory; instead, share memory by communicating.


rust 實現 messaging passing 的方法是 channel,可想像為一條河,將小船放在河中。


channel 有兩個部分:transmitter, receiver。


範例


use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

使用 mpsc::channel 函數產生一個新的通道;mpsc多個生產者,單個消費者multiple producer, single consumer)的縮寫。換句話說,一個 channel 可以有多個產生值的發送端,但只能有一個接收端。




可在 main thread 接收 channel 的值


use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

接收端有 recvtry_recv 兩種 method,recv 會 blocking 程式直到接收到值: Result<T, E>,當 channel 被關閉,recv 會收到 error


try_recv 不會 blocking 程式,會立刻回傳 Result<T,E>: Ok 表示有資料,Err 表示沒有資料。


Channels and Ownership Transference


ownership 規則在訊息傳遞中,有助於編寫安全的 concurrency code,避免錯誤。


以下嘗試在 new thread 中發送 val 後再使用它,結果編譯時就發生錯誤,一但把值發送到另一個 thread,就不能再使用它。


use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

編譯錯誤


error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `std::string::String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

發送多個值


main thread 中沒有任何暫停的 code,而是等待 channel 被關閉,會自動結束 for 迴圈


use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            // 每次發送後暫停 1s
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

利用 cloning transmitter 產生多個 producers


mpsc 是 multiple producer, single consumer 的縮寫,可透過 clone 發送端,產生多個 producer


use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 以 clone 產生另一個 tx,在不同 thread 使用不同的 tx
    let tx1 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("1 hi"),
            String::from("1 from"),
            String::from("1 the"),
            String::from("1 thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("2 more"),
            String::from("2 messages"),
            String::from("2 for"),
            String::from("2 you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

執行結果


Got: 1 hi
Got: 2 more
Got: 1 from
Got: 2 messages
Got: 2 for
Got: 1 the
Got: 1 thread
Got: 2 you

Shared-State Concurrency


雖然 message passing 是處理 concurrency 很好的方法,但不是唯一一種方法。


channel 都是單所有權,因為一旦將一個值傳送到通道中,將無法再使用這個值。共享內存有多所有權:多個線程可以同時使用相同的memory。rust 以 mutex 達到這個功能。


以 mutexes 一次允許一個 thread 存取資料


mutex 就是 mutual exclusion 的縮寫


為了存取 mutex 的資料,thread 必須先取得 lock,lock 是 mutex 中一部分資料結構,會記錄誰有該資料的使用權。換句話說, mutex 是利用 locking system 保護資料。


mutex 很難使用,因此很多人還是使用 channel,要記住兩個原則:


  1. 在使用資料前,要先取得 lock
  2. 用完 mutex 保護的資料後,要 release lock

不過 rust 的類別及 ownership 系統,讓我們不會在 locking, unlocking 出錯


Mutex<T>的 API


從 single thread context 使用 mutex 開始測試


use std::sync::Mutex;

fn main() {
    // 用 new 產生 mutex
    let m = Mutex::new(5);

    {
        // 用 lock 獲取鎖,使用資料,這是 blocking function call
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

如果另一個線程擁有鎖,並且那個線程 panic 了,則呼叫 lock 會失敗。在這種情況下,沒人能夠再獲取鎖,所以這裡選擇 unwrap 並在遇到這種情況時使線程 panic。


Mutex<T> 是一個智能指針。lock 會回傳一個叫做 MutexGuard 的智能指針。這個智能指針實現了 Deref 來指向其內部數據;其也提供了一個 Drop實現當 MutexGuard 離開作用域時自動釋放鎖,這正發生內部作用域的結尾。我們不會忘記釋放鎖並阻塞互斥器為其它線程,因為鎖的釋放是自動發生的。


丟棄了鎖之後,可以列印出互斥器的值,並發現能夠將其內部的 i32 改為 6。


在多個 threads 之間分享 Mutex<T>


以下嘗試用 Mutex<T> 在十個線程中對同一個計數器值加一,這樣計數器將從 0 變為 10。注意,接下來的幾個例子會出現編譯錯誤


use std::sync::Mutex;
use std::thread;

fn main() {
    // 產生 counter 變數來存放內含 i32 的 Mutex<T>
    let counter = Mutex::new(0);
    let mut handles = vec![];

    // 產生 10 個 threads,所有 thread 使用相同的 closure
    // closure 會呼叫 lock 獲取 Mutex<T> 的 lock,然後 +1
    // 離開 scope 時,會自動 release lock
    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    // 以 join 等待所有 thread 結束
    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

編譯錯誤


error[E0382]: use of moved value: `counter`
  --> src/main.rs:12:36
   |
6  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
12 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
13 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

訊息是說, counter 被移入了 closure,但這是我們需要的功能,卻發生編譯錯誤




接下來嘗試簡化程式,只產生兩個 threads


use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();

        *num += 1;
    });
    handles.push(handle);

    let handle2 = thread::spawn(move || {
        let mut num2 = counter.lock().unwrap();

        *num2 += 1;
    });
    handles.push(handle2);

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

編譯錯誤


error[E0382]: use of moved value: `counter`
  --> src/main.rs:15:33
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
9  |         let mut num = counter.lock().unwrap();
   |                       ------- variable moved due to use in closure
...
15 |     let handle2 = thread::spawn(move || {
   |                                 ^^^^^^^ value used here after move
16 |         let mut num2 = counter.lock().unwrap();
   |                        ------- use occurs due to use in closure

error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:26:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
15 |     let handle2 = thread::spawn(move || {
   |                                 ------- value moved into closure here
16 |         let mut num2 = counter.lock().unwrap();
   |                        ------- variable moved due to use in closure
...
26 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move

第一個錯誤為 counter 被移入 handle 的 thread closure 中,因此無法在第二個 thread 呼叫 lock


rust 明確說明,無法將 counter 的 ownership 移動到多個 threads 裡面,接下來透過 chap15 的 multiple-ownership 方法解決


在 multiple threads 使用 multiple ownership


chap 15 用 smart pointer Rc<T> 產生引用計數的值,以便擁有多個 owner。


嘗試將 Mutex<T> 封裝到 Rc<T>,並在 ownership 移入 thread 之前,clone Rc<T>


use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

再次發生編譯錯誤


error[E0277]: `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
   = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
   = note: required by `std::thread::spawn`

Rc<T> 並不能安全的在線程間共享。當 Rc<T> 管理引用計數時,它必須在每一個 clone 呼叫時增加計數,並在每一個克隆被丟棄時減少計數。Rc<T> 並沒有使用任何 concurrency 處理,來確保改變計數的操作不會被其他線程打斷。在計數出錯時可能會導致詭異的 bug,比如可能會造成 memory leak,或在使用結束之前就丟棄一個值。我們所需要的是一個完全類似 Rc<T>,又以一種線程安全的方式改變引用計數的類型。


Atomic Reference Counting with Arc<T>


Arc<T> 是類似 Rc<T> 但又能安全用在 multi-thread 環境的類別。這是一個 atomically reference counted 類別,可查閱 std::sync::atomic 文件得到更多資訊


因為 Arc<T> 是thread-safe,所以性能較差,因此不是所有原始類別都是 atomic。


回到剛剛的例子,將 Arc<T> 替換掉 Rc<T> 就可以編譯與執行


use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 的相似處


因為 counter 不可變,但可獲取內部值的可變引用,這表示 Mutex<T> 提供內部可變性,類似 Cell 類別。


使用 RefCell<T> 可改變 Rc<T> 的內容,同樣可使用 Mutex<T> 改變 Arc<T> 的內容


另一個要注意的是,rust 無法避免使用 Mutex<T> 全部的邏輯錯誤。在 chap15 使用 Rc<T> 可能造成引用循環,兩個 Rc<T> 互相引用,就會發生 memory leak。


同理 Mutex<T> 也有可能造成 deadlock,如果一個操作要鎖定兩個資源,而兩個 thread 各持有一個 lock,就會發生 deadlock


可參閱 Mutex<T>MutexGuard 的文件說明


以 Sync 與 Send Trait 擴充 Concurrency


rust 的 concurrency 處理,在程式語言層面來說,不需要知道太多 concurrency 處理的問題。


不過有兩個 concurrency 概念是內嵌於程式語言中:std::marker 中的 SyncSend trait。


Send 可在 threads 之間轉移 ownership


Send 標記 trait 表示類別的所有權可以在線程間傳遞。幾乎所有的 Rust 類型都有實作 Send ,不過有一些例外,包括 Rc<T>:這是不能 Send 的,因為如果clone了 Rc<T> 的值並嘗試將clone的所有權轉移到另一個線程,這兩個線程都可能同時更新引用計數。因此,Rc<T> 用於單線程,這時不需要為擁有線程安全的引用計數而付出性能代價。


Rust 類別系統和 trait bound 確保永遠也不會意外的將不安全的 Rc<T> 在線程間發送。當嘗試在示例 16-14 中這麼做的時候,會得到錯誤 the trait Send is not implemented for Rc<Mutex<i32>>。而使用標記為 SendArc<T>時,就沒有問題了。


任何完全由 Send 的類型組成的類型也會自動被標記為 Send。也就是說,幾乎所有基本類型都是 Send 的,除了 chap19 將會討論的 raw pointer。


Sync 可允許多個 threads 使用資料


Sync 標記 trait 表是一個實作了 Sync 的類別,可以安全的在多個線程中擁有其值的引用。換一種方式來說,對於任意類型 T,如果 &TT 的引用)是 Send 的話 T 就是 Sync 的,這意味著其引用就可以安全的發送到另一個線程。類似於 Send 的情況,基本類別是 Sync 的,完全由 Sync 的類別組成的類別也是 Sync 的。


智能指針 Rc<T> 也不是 Sync 的,出於其不是 Send 相同的原因。RefCell<T>Cell<T> 系列類別不是 Sync 的。RefCell<T> 在運行時所進行的借用檢查也不是線程安全的。


Mutex<T>Sync 的,正如 “在線程間共享 Mutex<T>” 部分所講的它可以被用來在多線程中共享使用。


直接實作 SendSync 是不安全的


通常並不需要實作 SendSync trait,因為由 SendSync 的類別組成的類別,自動就是 SendSync 的。因為他們是標記 trait,甚至都不需要實現任何方法。他們只是用來加強並發相關的不可變性的。


實作這些標記 trait 涉及到編寫不安全的 Rust 代碼, chap 19 將會講述具體的方法;當前重要的是,在產生新的由不是 SendSync 的部分構成的並發類別時需要多加小心,以確保維持其安全保證。The Nomicon 中有更多關於這些保證以及如何維持他們的資訊。


Summary


因為 Rust 本身很少有處理並發的內容,有很多的並發方案都由 crate 實現。他們比標準庫要發展的更快;請在網上搜索當前最新的用於多線程場景的 crate。


Rust 提供了用於消息傳遞的通道,和像 Mutex<T>Arc<T> 這樣可以安全的用於並發上下文的智能指針。類別系統和借用檢查器會確保這些場景中的代碼,不會出現 race condition 和無效的引用。一旦代碼可以編譯了,我們就可以堅信這些代碼可以正確的運行於多線程環境,而不會出現其他語言中經常出現的那些難以追蹤的 bug。


References


The Rust Programming Language


中文版


中文版 2

沒有留言:

張貼留言