2020/7/6

rust20_實作 multithread web server

  1. 學習一些 TCP 與 HTTP 知識
  2. 在 socket 上監聽 TCP connections
  3. 解析少量的 HTTP request
  4. 產生一個合適的 HTTP response
  5. 通過 thread pool 改善 server 的吞吐量

Building a Single-Threaded Web Server

產生 project heelo

$ cargo new hello
     Created binary (application) `hello` project
$ cd hello

修改 main.rs,監聽 TCP port 7878

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        println!("Connection established!");
    }
}

bind 類似 new,會回傳一個新的 TcpListener instance

bind 回傳 Result<T, E>,這代表綁定可能會失敗,例如,連接 80 端口需要管理員權限(非管理員用戶只能監聽大於 1024 的端口),所以如果不是管理員嘗試連接 80 端口,則會綁定失敗。

另外如果運行兩個此程序的實例這樣會有兩個程式監聽相同的port,綁定會失敗。因為我們是出於學習目的來編寫一個基礎的 server,將不用關心處理這類錯誤,使用 unwrap 在出現這些情況時直接停止程式。

TcpListenerincoming 方法回傳一個 iterator,它提供了一系列的 stream(更準確的說是 TcpStream 類型的流)。stream)代表一個客戶端和服務端之間打開的連線。連線connection)代表客戶端連接服務端、服務端生成 response 以及服務端關閉連接的全部 request / response 過程。為此,TcpStream 允許我們讀取它來查看客戶端發送了什麼,並可以撰寫 response。for 循環會依次處理每個連接並產生一系列的流供我們處理。

目前為止,處理流的過程包含 unwrap,如果出現任何錯誤會終止程序,如果沒有任何錯誤,則列印出信息。接下來我們將為成功的情況增加更多功能。當客戶端連接到服務端時 incoming方法回傳錯誤是可能的,因為我們實際上沒有遍歷連接,而是遍歷 連線嘗試connection attempts)。連線可能會因為很多原因不能成功,大部分是操作系統相關的。例如,很多系統限制同時打開的連接數;新連接嘗試產生錯誤,直到一些打開的連接關閉為止。

測試

在終端執行 cargo run,接著在 browser 中瀏覽 127.0.0.1:7878。瀏覽器會顯示出看起來無法連接的錯誤訊息,因為 server 目前並沒 response 任何資料。但是如果我們觀察 terminal,會發現當瀏覽器連接 server 時會印出一系列的訊息!

Connection established!
Connection established!
Connection established!

使用 ctrl-C 來停止程序。並在做出最新的修改之後執行 cargo run 重啟服務。

讀取 request

為了分離取得連線和接下來對連接的操作的相關內容,我們將開始一個新函數來處理連線。在這個新的 handle_connection 函數中,我們從 TCP 流中讀取資料並列印出來以便觀察瀏覽器發送過來的資料。

use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

// stream 參數是可變的。這是因為 TcpStream 在內部記錄了所返回的資料,內部狀態可能會改變
fn handle_connection(mut stream: TcpStream) {
    // 宣告一個 buffer 來存放讀取到的數據
    let mut buffer = [0; 512];

    // 將緩衝區中的字節轉換為字符串並打印出來。String::from_utf8_lossy 函數獲取一個 &[u8] 並產生一個 String
    // 函數名的 “lossy” 部分來源於當其遇到無效的 UTF-8 序列時的行為:它使用 �,U+FFFD REPLACEMENT CHARACTER,來代替無效序列。你可能會在緩衝區的剩餘部分看到這些替代字元
    stream.read(&mut buffer).unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}

觀察 http request

Request: GET / HTTP/1.1
Host: 127.0.0.1:7878
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,es;q=0.5,vi;q=0.4,de;q=0.3
DN

http 格式

Method Request-URI HTTP-Version CRLF
headers CRLF
message-body

第一行是 request line

/ 是 URI: Uniform Resource Identifier

CRLF序列 (CRLF代表 carriage return line feed,這是打字機時代的術語!)結束。

撰寫 response

HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body

第一行是 status line

回應 HTTP/1.1 200 OK\r\n\r\n

use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];

    stream.read(&mut buffer).unwrap();

    let response = "HTTP/1.1 200 OK\r\n\r\n";

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

當在瀏覽器中加載 127.0.0.1:7878 時,會得到一個空頁面而不是錯誤。

回應 html

在項目根目錄創建一個新文件,hello.html

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

修改 handle_connection 來讀取 HTML 文件

use std::fs;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let contents = fs::read_to_string("hello.html").unwrap();

    let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

在 browser 就能看到 hello.html 網頁內容

validating request and selectively responding

目前會回傳固定的 html,修改為可判斷網址

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    if buffer.starts_with(get) {
        let contents = fs::read_to_string("hello.html").unwrap();

        let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    } else {
        // 其他請求
        let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
        let contents = fs::read_to_string("404.html").unwrap();

        let response = format!("{}{}", status_line, contents);

        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    }
}

匹配 requst GET / HTTP/1.1,並區分不同的 response

對於任何不是 / 的請求返回 404 狀態碼的 response 和錯誤頁面

404.html

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

refactoring

將這些區別分別提取到一行 ifelse 中,對狀態行和文件名變數賦值;然後在讀取文件和寫入 response 的代碼中無條件的使用這些變數。

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Turn into a Multithreaded Server

目前 server 會依次處理每一個請求,如果 server 正接收越來越多的請求,會使性能越來越差。如果一個請求花費很長時間來處理,隨後而來的請求則不得不等待這個長請求結束。我們需要修復這種情況,不過首先讓我們實際嘗試重現一下這個問題。

模擬 slow request

在回應 /sleep 時會暫停 5s

use std::thread;
use std::time::Duration;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}

利用 thead pool 改善效能

線程池thread pool)是一組預先分配的等待或準備處理任務的 thread。當程式收到一個新任務,線程池中的一個線程會被分配任務,這個線程會離開並處理任務。其餘的線程則可用於處理在第一個線程處理任務的同時處理其他接收到的任務。當第一個線程處理完任務時,它會返回空閒線程池中等待處理新任務。線程池允許我們並發處理連接,增加 server 的吞吐量。

我們會將 pool 線程限制為較少的數量,以防拒絕服務(Denial of Service, DoS)攻擊;如果程序為每一個接收的請求都產生一個線程,某人向 server 發起千萬級的請求時會耗盡服務器的資源並導致所有請求的處理都被終止。

不同於分配無限的線程,線程池中將有固定數量的等待線程。當新進請求時,將請求發送到線程池中做處理。線程池會維護一個接收請求的隊列。每一個線程會從隊列中取出一個請求,處理請求,接著向對隊列索取另一個請求。通過這種設計,則可以並發處理 N 個請求,其中 N 為線程數。如果每一個線程都在響應慢請求,之後的請求仍然會阻塞隊列,不過相比之前增加了能處理的慢請求的數量。

這個設計僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。

先為每一個 stream 分配了一個新線程進行處理

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

一個假想的 thread pool API

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

使用 ThreadPool::new 來產生一個新的線程池,它有一個可配置的線程數的參數,在這裡是 ˋ。這樣在 for 循環中,pool.execute 有類似 thread::spawn 的 API,它獲取一個線程池運行於每一個流的閉包。pool.execute 需要實現為獲取閉包並傳遞給池中的線程。這段 code 還不能編譯,不過編譯器會告訴我們如何修復它。

建立 ThreadPool struct

src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

src/main.rs

use hello::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

因為還沒有定義 execute method

$ cargo check
    Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^

error: aborting due to previous error

我們會在 ThreadPool 上定義 execute 函數來獲取一個閉包參數。chap13 的 “使用帶有泛型和 Fn trait 的閉包” 部分,閉包作為參數時可以使用三個不同的 trait:FnFnMutFnOnce。我們需要決定這裡該使用哪種閉包。因最終需要實現的類似於標準庫的 thread::spawn,所以我們可以觀察 thread::spawn 的簽名在其參數中使用了何種 bound。

spawn 的 API 文件是這樣

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

F 是這裡我們關心的參數;T 與回傳值有關所以我們並不關心。spawn 使用 FnOnce 作為 F 的 trait bound,這可能也是我們需要的,因為最終會將傳遞給 execute 的參數傳給 spawn。因為處理請求的線程只會執行閉包一次,這也進一步確認了 FnOnce 是我們需要的 trait,這裡符合 FnOnceOnce 的意思。

F 還有 trait bound Send 和生命週期綁定 'static,這對我們的情況也是有意義的:需要 Send來將閉包從一個線程轉移到另一個線程,而 'static 是因為不知道線程會執行多久。讓我們編寫一個使用帶有這些 bound 的泛型參數 FThreadPoolexecute 方法:

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}
$ cargo check
    Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^ help: consider prefixing with an underscore: `_size`
  |
  = note: #[warn(unused_variables)] on by default

warning: unused variable: `f`
 --> src/lib.rs:8:30
  |
8 |     pub fn execute<F>(&self, f: F)
  |                              ^ help: consider prefixing with an underscore: `_f`

    Finished dev [unoptimized + debuginfo] target(s) in 1.57s

目前只能編譯,還無法運作

new 驗證 pool 中 thread 的數量

這裡仍然存在警告是因為其並沒有對 newexecute 的參數做任何操作。讓我們用期望的行為來實現這些函數。

new 開始。先前選擇使用無符號類型作為 size 參數的類型,因為線程數為負的線程池沒有意義。然而,線程數為零的線程池同樣沒有意義,不過零是一個完全有效的 u32 值。讓我們增加在返回 ThreadPool 實例之前檢查 size 是否大於零的代碼,並使用 assert! 宏在得到零時 panic,如 20-13 所示:

impl ThreadPool {
    /// 創建線程池。
    ///
    /// 線程池中線程的數量。
    ///
    /// # Panics
    ///
    /// `new` 函數在 size 為 0 時會 panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}

這裡用文檔註釋為 ThreadPool 增加了一些文件

如果要做得更好,可以改為傳回 Result

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

產生儲存 threads 的空間

現在有了一個有效的線程池線程數,就可以實際產生這些線程,並在回傳前將他們儲存在 ThreadPool 結構中。不過要如何 “儲存” 一個線程?讓我們再看看 thread::spawn 的簽名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

spawn 返回 JoinHandle<T>,其中 T 是閉包返回的類型。嘗試使用 JoinHandle 來看看會發生什麼。在我們的情況中,傳遞給線程池的閉包會處理連線並不返回任何值,所以 T 將會是單元類型 ()

20-14 中的代碼可以編譯,不過實際上還並沒有產生任何線程。我們改變了 ThreadPool 的定義來存放一個 thread::JoinHandle<()> 的 vector 實例,使用 size 容量來初始化,並設置一個 for 循環了來執行產生線程的code,並回傳包含這些線程的 ThreadPool 實例:

lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
   pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        // with_capacity 與 Vec::new 做了同樣的工作,不過它為 vector 預先分配空間。因為已經知道了 vector 中需要 size 個元素,預先進行分配比僅僅 Vec::new 要稍微有效率一些
        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}

Worker struct 負責從 ThreadPool 將 code 傳給 thread

20-14 的 for 循環中留下了一個關於產生線程的註釋。實際上如何產生線程呢?這是一個難題。標準庫提供的產生線程的方法,thread::spawn,它期望獲取一些一旦產生線程就應該執行的代碼。然而,我們希望開始線程並使其等待稍後傳遞的代碼。標準庫的線程實現並沒有包含這麼做的方法;我們必須自己實現。

我們將要實現的行為是產生線程並稍後發送代碼,這會在 ThreadPool 和線程間引入一個新類別來管理這種新行為。這個資料結構稱為 Worker:這是一個 pool 中的常見概念。想像一下在餐館廚房工作的員工:員工等待來自客戶的訂單,他們負責接受這些訂單並完成它們。

不同於在線程池中儲存一個 JoinHandle<()> 實例的 vector,我們會儲存 Worker 結構的實例。每一個 Worker 會儲存一個單獨的 JoinHandle<()> 實例。接著會在 Worker 上實現一個方法,它會獲取需要允許代碼的閉包並將其發送給已經啟動的線程執行。我們還會賦予每一個 worker id,這樣就可以在日誌和呼叫中區別線程池中的不同 worker。

首先,讓我們做出產生 ThreadPool 時所需的修改。在通過如下方式設置完 Worker 之後,我們會實現向線程發送閉包的代碼:

  1. 定義 Worker 結構存放 idJoinHandle<()>
  2. 修改 ThreadPool 存放一個 Worker 實例的 vector
  3. 定義 Worker::new 函數,它獲取一個 id 數字並返回一個帶有 id 和用空閉包分配的線程的 Worker 實例
  4. ThreadPool::new 中,使用 for 循環計數生成 id,使用這個 id 新建 Worker,並儲存進 vector 中

20-15 就是一個做出了這些修改的例子:

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
   pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}

ThreadPool 中從 threads 改為 workers,因為它現在儲存 Worker 而不是 JoinHandle<()>。使用 for 循環中的計數作為 Worker::new 的參數,並將每一個新建的 Worker 儲存在叫做 workers 的 vector 中。

Worker 結構和其 new 函數是私有的,因為外部代碼(比如 src/bin/main.rs 中的 server)並不需要知道關於 ThreadPool 中使用 Worker 結構的實現細節。Worker::new 函數使用 id 參數並儲存了使用一個 closure 產生的 JoinHandle<()>

這段code 能夠編譯並用指定給 ThreadPool::new 的參數產生儲存了一系列的 Worker 實例,不過 仍然 沒有處理 execute 中得到的閉包。

透過 Channels 發送 request 給 threads

下一個需要解決的問題是傳給 thread::spawn 的閉包完全沒有做任何工作。目前,我們在 execute 方法中獲得期望執行的閉包,不過在創建 ThreadPool 的過程中產生每一個 Worker 時需要向 thread::spawn 傳遞一個閉包。

我們希望剛創建的 Worker 結構能夠從 ThreadPool 的隊列中獲取需要執行的代碼,並發送到線程中執行他們。

在 chap 16,我們學習了 通道 —— 一個溝通兩個線程的簡單手段 —— 對於這個例子來說則是絕佳的。這裡通道將充當任務隊列的作用,execute 將通過 ThreadPool 向其中線程正在尋找工作的 Worker 實例發送任務。如下是計畫:

  1. ThreadPool 會產生一個通道並當作發送端。
  2. 每個 Worker 將會充當通道的接收端。
  3. 一個 Job 結構來存放用於向通道中發送的閉包。
  4. execute 方法會在通道發送端發出期望執行的任務。
  5. 在線程中,Worker 會歷遍通道的接收端並執行任何接收到的任務。

從在 ThreadPool::new 中產生通道並讓 ThreadPool 實例充當發送端開始,如 20-16 所示。Job 是將在通道中發出的類別,目前它是一個沒有任何內容的結構體

// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

ThreadPool::new 中,產生了一個通道,並接著讓線程池在接收端等待。這段 code 能夠編譯,不過仍有警告。


嘗試在線程池產生每個 worker 時,將通道的接收端傳遞給他們。我們希望在 worker 所分配的線程中使用通道的接收端,所以將在閉包中引用 receiver 參數。20-17 中展示的 code 還不能編譯:

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}

這是一些小而直觀的修改:將通道的接收端傳遞進了 Worker::new,並接著在閉包中使用它。

Rust 所提供的通道實現是多 生產者,單 消費者 的。這意味著不能簡單的 clone 通道的消費端來解決問題。從通道隊列中取出任務涉及到修改 receiver,所以這些線程需要一個能安全的共享和修改 receiver 的方式,否則可能導致 race condition

chap 16 討論的線程安全智能指針,為了在多個線程間共享所有權並允許線程修改其值,需要使用 Arc<Mutex<T>>Arc 使得多個 worker 擁有接收端,而 Mutex 則確保一次只有一個 worker 能從接收端得到任務。

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

ThreadPool::new 中,將通道的接收端放入一個 Arc 和一個 Mutex 中。對於每一個新 worker,clone Arc 來增加引用計數,如此這些 worker 就可以共享接收端的所有權了。

通過這些修改,code可以編譯了

實作 execute

實現 ThreadPool 上的 execute 方法。同時也要修改 Job 結構:它將不再是結構,Job 將是一個有著 execute 接收到的閉包類型的 trait 對象的類別別名。 chap 19 “類別別名用來創建類別同義詞” 部分提到過,類別別名允許將長的類別變短。

// --snip--

type Job = Box<FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

為存放每一個閉包的 Box 產生一個 Job 類型別名,接著在通道中發出任務

在使用 execute 得到的閉包產生 Job instance 之後,將這些任務從通道的發送端發出。這裡呼叫 send 上的 unwrap,因為發送可能會失敗,例如這可能發生於停止了所有線程執行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執行;只要線程池存在他們就會一直執行。使用 unwrap 是因為我們知道失敗不可能發生,即便編譯器不這麼認為。

在 worker 中,傳給 thread::spawn 的閉包仍然還只是 引用 了通道的接收端。相反我們需要閉包一直循環,向通道的接收端請求任務,並在得到任務時執行他們。如 20-20 對 Worker::new 做出修改:

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                (*job)();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

首先在 receiver 上呼叫了 lock 來獲取互斥器,接著 unwrap 在出現任何錯誤時 panic。如果互斥器處於一種叫做 被污染poisoned)的狀態時獲取鎖可能會失敗,這可能發生於其他線程在持有鎖時 panic 了且沒有釋放鎖。在這種情況下,呼叫 unwrap 使其 panic 是正確的行為。可將 unwrap 改為包含有意義錯誤信息的 expect

如果鎖定了互斥器,接著調用 recv 從通道中接收 Job。最後的 unwrap 也繞過了一些錯誤,這可能發生於持有通道發送端的線程停止的情況,類似於如果接收端關閉時 send 方法如何回傳 Err 一樣。

呼叫 recv 會阻塞當前線程,所以如果還沒有任務,其會等待直到有可用的任務。Mutex<T> 確保一次只有一個 Worker 線程嘗試請求任務。

但目前還是無法編譯


為了呼叫儲存在 Box<T> (這正是 Job 別名的類型)中的 FnOnce 閉包,該閉包需要能將自己移動 Box<T>,因為當呼叫這個閉包時,它獲取 self 的所有權。通常來說,將值移動出 Box<T> 是不被允許的,因為 Rust 不知道 Box<T> 中的值將會有多大;chap 15 能夠正常使用 Box<T> 是因為我們將未知大小的值儲存進 Box<T> 從而得到已知大小的值。

chap17 曾見過,17-15 中有使用了 self: Box<Self> 語法的方法,它允許方法獲取儲存在 Box<T> 中的 Self 值的所有權。這正是我們希望做的,然而不幸的是 Rust 不允許我們這麼做:Rust 當閉包被調用時行為的那部分並沒有使用 self: Box<Self> 實現。所以這裡 Rust 也不知道它可以使用 self: Box<Self> 來獲取閉包的所有權並將閉包移動出 Box<T>

Rust 仍在努力改進提升編譯器的過程中,未來新版的 rust,應該可讓 20-20 中的 code 正常工作。


不過目前讓我們通過一個小技巧來繞過這個問題。可以明確告訴 Rust 在這裡我們可以使用 self: Box<Self> 來獲取 Box<T> 中值的所有權,而一旦獲取了閉包的所有權就可以呼叫它了。這涉及到定義一個新 trait,它帶有一個在簽名中使用 self: Box<Self> 的方法 call_box,為任何實現了 FnOnce() 的類型定義這個 trait,修改類別別名來使用這個新 trait,並修改 Worker 使用 call_box 方法。這些修改如 20-21 所示:

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<FnBox + Send + 'static>;

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

新增一個 trait FnBox 來繞過當前 Box<FnOnce()> 的限制

新增了一個叫做 FnBox 的 trait。這個 trait 有一個方法 call_box,它類似於其他 Fn* trait 中的 call 方法,除了它獲取 self: Box<Self> 以便獲取 self 的所有權並將值從 Box<T> 中移動出來。

接下來,為任何實現了 FnOnce() trait 的類型 F 實現 FnBox trait。這實際上就等於任何 FnOnce() closure 都可以使用 call_box 方法。call_box 的實現使用 (*self)() 將閉包移動出 Box<T> 並呼叫此閉包。

現在我們需要 Job 類別別名是任何實現了新 trait FnBoxBox。這允許我們在得到 Job 值時使用 Worker 中的 call_box。為任何 FnOnce() 閉包都實現了 FnBox trait 意味著無需對實際在通道中發出的值做任何修改。現在 Rust 就能夠理解我們的行為是正確的了。

Graceful shutdown and cleanup

20-21 中的代碼如期通過使用線程池異步的響應請求。這裡有一些警告說 workersidthread 字段沒有直接被使用,這提醒了我們並沒有清理所有的內容。當使用不那麼優雅的 ctrl-C 終止主線程時,所有其他線程也會立刻停止,即便它們正處於處理請求的過程中。

現在我們要為 ThreadPool 實現 Drop trait 對線程池中的每一個線程調用 join,這樣這些線程將會執行完他們的請求。接著會為 ThreadPool 實現一個告訴線程他們應該停止接收新請求並結束的方式。為了實踐這些代碼,修改 server 在優雅停機(graceful shutdown)之前只接受兩個請求。

ThreadPool 實現 Drop Trait

當線程池被丟棄時,應該 join 所有線程以確保他們完成其操作。20-23 展示了 Drop 實現的第一次嘗試;這些代碼還不能夠編譯:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

編譯錯誤

error[E0507]: cannot move out of borrowed content
  --> src/lib.rs:61:13
   |
61 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ cannot move out of borrowed content

error: aborting due to previous error

這告訴我們並不能調用 join,因為只有每一個 worker 的可變借用,而 join 獲取其參數的所有權。為瞭解決這個問題,需要一個方法將 thread 移動出擁有其所有權的 Worker 實例以便 join可以消費這個線程。17-15 中我們曾見過這麼做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上調用 take 方法將值從 Some 成員中移動出來而對 None 成員不做處理。換句話說,正在運行的 Workerthread 將是 Some 成員值,而當需要清理 worker 時,將 Some 替換為 None,這樣 worker 就沒有可以運行的線程了。

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

當新建 Worker 時需要將 thread 值封裝進 Some

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

調用 Option 上的 takethread 移動出 worker

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

向 thread 發送訊息,使其停止接收任務

有了這些修改,code 就能編譯且沒有任何警告。不過也有壞消息,這些代碼還不能以我們期望的方式運行。問題的關鍵在於 Worker 中分配的線程所運行的閉包中的邏輯:呼叫 join 並不會關閉線程,因為他們一直 loop 來尋找任務。如果採用這個實現來嘗試丟棄 ThreadPool ,則主線程會永遠阻塞在等待第一個線程結束上。

為了修復這個問題,修改線程既監聽是否有 Job 運行也要監聽一個應該停止監聽並退出無限循環的信號。所以通道將發送這個枚舉的兩個成員之一而不是 Job 實例

enum Message {
    NewJob(Job),
    Terminate,
}

Message 要麼是存放了線程需要運行的 JobNewJob 成員,要麼是會導致線程退出循環並終止的 Terminate 成員。

同時需要修改通道來使用 Message 而不是 Job,收發 Message 值並在 Worker 收到 Message::Terminate 時退出循環

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

為了使用 Message enum 需要將兩個地方的 Job 修改為 MessageThreadPool 的定義和 Worker::new 的簽名。ThreadPoolexecute 方法需要發送封裝進 Message::NewJob 成員的任務。然後,在 Worker::new 中當從通道接收 Message 時,當獲取到 NewJob成員會處理任務而收到 Terminate 成員則會退出循環。

修改後再次能夠編譯並繼續按照期望的行為運行。不過還是會得到一個警告,因為並沒有產生任何 Terminate 成員的消息。如 20-25 所示修改 Drop 實現來修復此問題:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

在對每個 worker 線程調用 join 之前向 worker 發送 Message::Terminate

現在遍歷了 worker 兩次,一次向每個 worker 發送一個 Terminate 消息,一個呼叫每個 worker 線程上的 join。如果嘗試在同一循環中發送消息並立即 join 線程,則無法保證當前迭代的 worker 是從通道收到終止消息的 worker。

為了更好的理解為什麼需要兩個分開的循環,想像一下只有兩個 worker 的場景。如果在一個單獨的循環中遍歷每個 worker,在第一次迭代中向通道發出終止消息並對第一個 worker 線程調用 join。我們會一直等待第一個 worker 結束,不過它永遠也不會結束因為第二個線程接收了終止消息。deadlock!

為了避免此情況,首先在一個循環中向通道發出所有的 Terminate 消息,接著在另一個循環中 join 所有的線程。每個 worker 一旦收到終止消息即會停止從通道接收消息,意味著可以確保如果發送同 worker 數相同的終止消息,在 join 之前每個線程都會收到一個終止消息。

為了實踐這些代碼,如 20-26 所示修改 main 在優雅停止 server 之前只接受兩個請求:

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

你不會希望真實世界的 web server 只處理兩次請求就停機了,這只是為了展示優雅停機和清理處於正常工作狀態。

take 方法定義於 Iterator trait,這裡限制循環最多頭 2 次。ThreadPool 會在 main 的結尾離開作用域,而且還會看到 drop 實現的運行。

使用 cargo run 啟動 server,並發起三個 request。第三個request 應該會失敗

這個特定的運行過程中一個有趣的地方在於:注意我們向通道中發出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結束。與此同時,每一個線程都收到了終止消息。一旦 worker 0 結束,主線程就等待其他 worker 結束,此時他們都已經收到終止消息並能夠停止了。

以下是完整的程式

main.rs

extern crate hello;
use hello::ThreadPool;

use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

     let mut file = File::open(filename).unwrap();
     let mut contents = String::new();

     file.read_to_string(&mut contents).unwrap();

     let response = format!("{}{}", status_line, contents);

     stream.write(response.as_bytes()).unwrap();
     stream.flush().unwrap();
}

lib.rs

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

enum Message {
    NewJob(Job),
    Terminate,
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<dyn FnBox + Send + 'static>;

impl ThreadPool {
    /// 創建線程池。
    ///
    /// 線程池中線程的數量。
    ///
    /// # Panics
    ///
    /// `new` 函數在 size 為 0 時會 panic。
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

References

The Rust Programming Language

中文版

中文版 2

沒有留言:

張貼留言