- 學習一些 TCP 與 HTTP 知識
- 在 socket 上監聽 TCP connections
- 解析少量的 HTTP request
- 產生一個合適的 HTTP response
- 通過 thread pool 改善 server 的吞吐量
Building a Single-Threaded Web Server
產生 project heelo
$ cargo new hello
Created binary (application) `hello` project
$ cd hello
修改 main.rs,監聽 TCP port 7878
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!("Connection established!");
}
}
bind 類似 new,會回傳一個新的 TcpListener instance
bind
回傳 Result<T, E>
,這代表綁定可能會失敗,例如,連接 80 端口需要管理員權限(非管理員用戶只能監聽大於 1024 的端口),所以如果不是管理員嘗試連接 80 端口,則會綁定失敗。
另外如果運行兩個此程序的實例這樣會有兩個程式監聽相同的port,綁定會失敗。因為我們是出於學習目的來編寫一個基礎的 server,將不用關心處理這類錯誤,使用 unwrap
在出現這些情況時直接停止程式。
TcpListener
的 incoming
方法回傳一個 iterator,它提供了一系列的 stream(更準確的說是 TcpStream
類型的流)。流(stream)代表一個客戶端和服務端之間打開的連線。連線(connection)代表客戶端連接服務端、服務端生成 response 以及服務端關閉連接的全部 request / response 過程。為此,TcpStream
允許我們讀取它來查看客戶端發送了什麼,並可以撰寫 response。for
循環會依次處理每個連接並產生一系列的流供我們處理。
目前為止,處理流的過程包含 unwrap
,如果出現任何錯誤會終止程序,如果沒有任何錯誤,則列印出信息。接下來我們將為成功的情況增加更多功能。當客戶端連接到服務端時 incoming
方法回傳錯誤是可能的,因為我們實際上沒有遍歷連接,而是遍歷 連線嘗試(connection attempts)。連線可能會因為很多原因不能成功,大部分是操作系統相關的。例如,很多系統限制同時打開的連接數;新連接嘗試產生錯誤,直到一些打開的連接關閉為止。
測試
在終端執行 cargo run
,接著在 browser 中瀏覽 127.0.0.1:7878
。瀏覽器會顯示出看起來無法連接的錯誤訊息,因為 server 目前並沒 response 任何資料。但是如果我們觀察 terminal,會發現當瀏覽器連接 server 時會印出一系列的訊息!
Connection established!
Connection established!
Connection established!
使用 ctrl-C 來停止程序。並在做出最新的修改之後執行 cargo run
重啟服務。
讀取 request
為了分離取得連線和接下來對連接的操作的相關內容,我們將開始一個新函數來處理連線。在這個新的 handle_connection
函數中,我們從 TCP 流中讀取資料並列印出來以便觀察瀏覽器發送過來的資料。
use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
// stream 參數是可變的。這是因為 TcpStream 在內部記錄了所返回的資料,內部狀態可能會改變
fn handle_connection(mut stream: TcpStream) {
// 宣告一個 buffer 來存放讀取到的數據
let mut buffer = [0; 512];
// 將緩衝區中的字節轉換為字符串並打印出來。String::from_utf8_lossy 函數獲取一個 &[u8] 並產生一個 String
// 函數名的 “lossy” 部分來源於當其遇到無效的 UTF-8 序列時的行為:它使用 �,U+FFFD REPLACEMENT CHARACTER,來代替無效序列。你可能會在緩衝區的剩餘部分看到這些替代字元
stream.read(&mut buffer).unwrap();
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
觀察 http request
Request: GET / HTTP/1.1
Host: 127.0.0.1:7878
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,es;q=0.5,vi;q=0.4,de;q=0.3
DN
http 格式
Method Request-URI HTTP-Version CRLF
headers CRLF
message-body
第一行是 request line
/
是 URI: Uniform Resource Identifier
以 CRLF序列 (CRLF代表 carriage return line feed,這是打字機時代的術語!)結束。
撰寫 response
HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body
第一行是 status line
回應 HTTP/1.1 200 OK\r\n\r\n
use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let response = "HTTP/1.1 200 OK\r\n\r\n";
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
當在瀏覽器中加載 127.0.0.1:7878 時,會得到一個空頁面而不是錯誤。
回應 html
在項目根目錄創建一個新文件,hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
修改 handle_connection
來讀取 HTML 文件
use std::fs;
// --snip--
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let contents = fs::read_to_string("hello.html").unwrap();
let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
在 browser 就能看到 hello.html 網頁內容
validating request and selectively responding
目前會回傳固定的 html,修改為可判斷網址
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
if buffer.starts_with(get) {
let contents = fs::read_to_string("hello.html").unwrap();
let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
} else {
// 其他請求
let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
let contents = fs::read_to_string("404.html").unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
}
匹配 requst GET / HTTP/1.1
,並區分不同的 response
對於任何不是 / 的請求返回 404
狀態碼的 response 和錯誤頁面
404.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
refactoring
將這些區別分別提取到一行 if
和 else
中,對狀態行和文件名變數賦值;然後在讀取文件和寫入 response 的代碼中無條件的使用這些變數。
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
Turn into a Multithreaded Server
目前 server 會依次處理每一個請求,如果 server 正接收越來越多的請求,會使性能越來越差。如果一個請求花費很長時間來處理,隨後而來的請求則不得不等待這個長請求結束。我們需要修復這種情況,不過首先讓我們實際嘗試重現一下這個問題。
模擬 slow request
在回應 /sleep
時會暫停 5s
use std::thread;
use std::time::Duration;
// --snip--
fn handle_connection(mut stream: TcpStream) {
// --snip--
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
// --snip--
}
利用 thead pool 改善效能
線程池(thread pool)是一組預先分配的等待或準備處理任務的 thread。當程式收到一個新任務,線程池中的一個線程會被分配任務,這個線程會離開並處理任務。其餘的線程則可用於處理在第一個線程處理任務的同時處理其他接收到的任務。當第一個線程處理完任務時,它會返回空閒線程池中等待處理新任務。線程池允許我們並發處理連接,增加 server 的吞吐量。
我們會將 pool 線程限制為較少的數量,以防拒絕服務(Denial of Service, DoS)攻擊;如果程序為每一個接收的請求都產生一個線程,某人向 server 發起千萬級的請求時會耗盡服務器的資源並導致所有請求的處理都被終止。
不同於分配無限的線程,線程池中將有固定數量的等待線程。當新進請求時,將請求發送到線程池中做處理。線程池會維護一個接收請求的隊列。每一個線程會從隊列中取出一個請求,處理請求,接著向對隊列索取另一個請求。通過這種設計,則可以並發處理 N
個請求,其中 N
為線程數。如果每一個線程都在響應慢請求,之後的請求仍然會阻塞隊列,不過相比之前增加了能處理的慢請求的數量。
這個設計僅僅是多種改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和單線程異步 I/O 模型。
先為每一個 stream 分配了一個新線程進行處理
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
一個假想的 thread pool API
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
使用 ThreadPool::new
來產生一個新的線程池,它有一個可配置的線程數的參數,在這裡是 ˋ。這樣在 for
循環中,pool.execute
有類似 thread::spawn
的 API,它獲取一個線程池運行於每一個流的閉包。pool.execute
需要實現為獲取閉包並傳遞給池中的線程。這段 code 還不能編譯,不過編譯器會告訴我們如何修復它。
建立 ThreadPool struct
src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
src/main.rs
use hello::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
因為還沒有定義 execute method
$ cargo check
Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| ^^^^^^^
error: aborting due to previous error
我們會在 ThreadPool
上定義 execute
函數來獲取一個閉包參數。chap13 的 “使用帶有泛型和 Fn
trait 的閉包” 部分,閉包作為參數時可以使用三個不同的 trait:Fn
、FnMut
和 FnOnce
。我們需要決定這裡該使用哪種閉包。因最終需要實現的類似於標準庫的 thread::spawn
,所以我們可以觀察 thread::spawn
的簽名在其參數中使用了何種 bound。
spawn 的 API 文件是這樣
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
F
是這裡我們關心的參數;T
與回傳值有關所以我們並不關心。spawn
使用 FnOnce
作為 F
的 trait bound,這可能也是我們需要的,因為最終會將傳遞給 execute
的參數傳給 spawn
。因為處理請求的線程只會執行閉包一次,這也進一步確認了 FnOnce
是我們需要的 trait,這裡符合 FnOnce
中 Once
的意思。
F
還有 trait bound Send
和生命週期綁定 'static
,這對我們的情況也是有意義的:需要 Send
來將閉包從一個線程轉移到另一個線程,而 'static
是因為不知道線程會執行多久。讓我們編寫一個使用帶有這些 bound 的泛型參數 F
的 ThreadPool
的 execute
方法:
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
$ cargo check
Checking hello v0.1.0 (/Users/charley/project/idea/rust/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^ help: consider prefixing with an underscore: `_size`
|
= note: #[warn(unused_variables)] on by default
warning: unused variable: `f`
--> src/lib.rs:8:30
|
8 | pub fn execute<F>(&self, f: F)
| ^ help: consider prefixing with an underscore: `_f`
Finished dev [unoptimized + debuginfo] target(s) in 1.57s
目前只能編譯,還無法運作
在 new
驗證 pool 中 thread 的數量
這裡仍然存在警告是因為其並沒有對 new
和 execute
的參數做任何操作。讓我們用期望的行為來實現這些函數。
從 new
開始。先前選擇使用無符號類型作為 size
參數的類型,因為線程數為負的線程池沒有意義。然而,線程數為零的線程池同樣沒有意義,不過零是一個完全有效的 u32
值。讓我們增加在返回 ThreadPool
實例之前檢查 size
是否大於零的代碼,並使用 assert!
宏在得到零時 panic,如 20-13 所示:
impl ThreadPool {
/// 創建線程池。
///
/// 線程池中線程的數量。
///
/// # Panics
///
/// `new` 函數在 size 為 0 時會 panic。
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
這裡用文檔註釋為 ThreadPool
增加了一些文件
如果要做得更好,可以改為傳回 Result
pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
產生儲存 threads 的空間
現在有了一個有效的線程池線程數,就可以實際產生這些線程,並在回傳前將他們儲存在 ThreadPool
結構中。不過要如何 “儲存” 一個線程?讓我們再看看 thread::spawn
的簽名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
spawn
返回 JoinHandle<T>
,其中 T
是閉包返回的類型。嘗試使用 JoinHandle
來看看會發生什麼。在我們的情況中,傳遞給線程池的閉包會處理連線並不返回任何值,所以 T
將會是單元類型 ()
。
20-14 中的代碼可以編譯,不過實際上還並沒有產生任何線程。我們改變了 ThreadPool
的定義來存放一個 thread::JoinHandle<()>
的 vector 實例,使用 size
容量來初始化,並設置一個 for
循環了來執行產生線程的code,並回傳包含這些線程的 ThreadPool
實例:
lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// with_capacity 與 Vec::new 做了同樣的工作,不過它為 vector 預先分配空間。因為已經知道了 vector 中需要 size 個元素,預先進行分配比僅僅 Vec::new 要稍微有效率一些
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
Worker
struct 負責從 ThreadPool 將 code 傳給 thread
20-14 的 for
循環中留下了一個關於產生線程的註釋。實際上如何產生線程呢?這是一個難題。標準庫提供的產生線程的方法,thread::spawn
,它期望獲取一些一旦產生線程就應該執行的代碼。然而,我們希望開始線程並使其等待稍後傳遞的代碼。標準庫的線程實現並沒有包含這麼做的方法;我們必須自己實現。
我們將要實現的行為是產生線程並稍後發送代碼,這會在 ThreadPool
和線程間引入一個新類別來管理這種新行為。這個資料結構稱為 Worker
:這是一個 pool 中的常見概念。想像一下在餐館廚房工作的員工:員工等待來自客戶的訂單,他們負責接受這些訂單並完成它們。
不同於在線程池中儲存一個 JoinHandle<()>
實例的 vector,我們會儲存 Worker
結構的實例。每一個 Worker
會儲存一個單獨的 JoinHandle<()>
實例。接著會在 Worker
上實現一個方法,它會獲取需要允許代碼的閉包並將其發送給已經啟動的線程執行。我們還會賦予每一個 worker id
,這樣就可以在日誌和呼叫中區別線程池中的不同 worker。
首先,讓我們做出產生 ThreadPool
時所需的修改。在通過如下方式設置完 Worker
之後,我們會實現向線程發送閉包的代碼:
- 定義
Worker
結構存放 id
和 JoinHandle<()>
- 修改
ThreadPool
存放一個 Worker
實例的 vector
- 定義
Worker::new
函數,它獲取一個 id
數字並返回一個帶有 id
和用空閉包分配的線程的 Worker
實例
- 在
ThreadPool::new
中,使用 for
循環計數生成 id
,使用這個 id
新建 Worker
,並儲存進 vector 中
20-15 就是一個做出了這些修改的例子:
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
將 ThreadPool
中從 threads
改為 workers
,因為它現在儲存 Worker
而不是 JoinHandle<()>
。使用 for
循環中的計數作為 Worker::new
的參數,並將每一個新建的 Worker
儲存在叫做 workers
的 vector 中。
Worker
結構和其 new
函數是私有的,因為外部代碼(比如 src/bin/main.rs 中的 server)並不需要知道關於 ThreadPool
中使用 Worker
結構的實現細節。Worker::new
函數使用 id
參數並儲存了使用一個 closure 產生的 JoinHandle<()>
。
這段code 能夠編譯並用指定給 ThreadPool::new
的參數產生儲存了一系列的 Worker
實例,不過 仍然 沒有處理 execute
中得到的閉包。
透過 Channels 發送 request 給 threads
下一個需要解決的問題是傳給 thread::spawn
的閉包完全沒有做任何工作。目前,我們在 execute
方法中獲得期望執行的閉包,不過在創建 ThreadPool
的過程中產生每一個 Worker
時需要向 thread::spawn
傳遞一個閉包。
我們希望剛創建的 Worker
結構能夠從 ThreadPool
的隊列中獲取需要執行的代碼,並發送到線程中執行他們。
在 chap 16,我們學習了 通道 —— 一個溝通兩個線程的簡單手段 —— 對於這個例子來說則是絕佳的。這裡通道將充當任務隊列的作用,execute
將通過 ThreadPool
向其中線程正在尋找工作的 Worker
實例發送任務。如下是計畫:
ThreadPool
會產生一個通道並當作發送端。
- 每個
Worker
將會充當通道的接收端。
- 一個
Job
結構來存放用於向通道中發送的閉包。
execute
方法會在通道發送端發出期望執行的任務。
- 在線程中,
Worker
會歷遍通道的接收端並執行任何接收到的任務。
從在 ThreadPool::new
中產生通道並讓 ThreadPool
實例充當發送端開始,如 20-16 所示。Job
是將在通道中發出的類別,目前它是一個沒有任何內容的結構體
// --snip--
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
在 ThreadPool::new
中,產生了一個通道,並接著讓線程池在接收端等待。這段 code 能夠編譯,不過仍有警告。
嘗試在線程池產生每個 worker 時,將通道的接收端傳遞給他們。我們希望在 worker 所分配的線程中使用通道的接收端,所以將在閉包中引用 receiver
參數。20-17 中展示的 code 還不能編譯:
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
這是一些小而直觀的修改:將通道的接收端傳遞進了 Worker::new
,並接著在閉包中使用它。
Rust 所提供的通道實現是多 生產者,單 消費者 的。這意味著不能簡單的 clone 通道的消費端來解決問題。從通道隊列中取出任務涉及到修改 receiver
,所以這些線程需要一個能安全的共享和修改 receiver
的方式,否則可能導致 race condition
chap 16 討論的線程安全智能指針,為了在多個線程間共享所有權並允許線程修改其值,需要使用 Arc<Mutex<T>>
。Arc
使得多個 worker 擁有接收端,而 Mutex
則確保一次只有一個 worker 能從接收端得到任務。
use std::sync::Arc;
use std::sync::Mutex;
// --snip--
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
}
}
在 ThreadPool::new
中,將通道的接收端放入一個 Arc
和一個 Mutex
中。對於每一個新 worker,clone Arc
來增加引用計數,如此這些 worker 就可以共享接收端的所有權了。
通過這些修改,code可以編譯了
實作 execute
實現 ThreadPool
上的 execute
方法。同時也要修改 Job
結構:它將不再是結構,Job
將是一個有著 execute
接收到的閉包類型的 trait 對象的類別別名。 chap 19 “類別別名用來創建類別同義詞” 部分提到過,類別別名允許將長的類別變短。
// --snip--
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
為存放每一個閉包的 Box
產生一個 Job
類型別名,接著在通道中發出任務
在使用 execute
得到的閉包產生 Job
instance 之後,將這些任務從通道的發送端發出。這裡呼叫 send
上的 unwrap
,因為發送可能會失敗,例如這可能發生於停止了所有線程執行的情況,這意味著接收端停止接收新消息了。不過目前我們無法停止線程執行;只要線程池存在他們就會一直執行。使用 unwrap
是因為我們知道失敗不可能發生,即便編譯器不這麼認為。
在 worker 中,傳給 thread::spawn
的閉包仍然還只是 引用 了通道的接收端。相反我們需要閉包一直循環,向通道的接收端請求任務,並在得到任務時執行他們。如 20-20 對 Worker::new
做出修改:
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
(*job)();
}
});
Worker {
id,
thread,
}
}
}
首先在 receiver
上呼叫了 lock
來獲取互斥器,接著 unwrap
在出現任何錯誤時 panic。如果互斥器處於一種叫做 被污染(poisoned)的狀態時獲取鎖可能會失敗,這可能發生於其他線程在持有鎖時 panic 了且沒有釋放鎖。在這種情況下,呼叫 unwrap
使其 panic 是正確的行為。可將 unwrap
改為包含有意義錯誤信息的 expect
。
如果鎖定了互斥器,接著調用 recv
從通道中接收 Job
。最後的 unwrap
也繞過了一些錯誤,這可能發生於持有通道發送端的線程停止的情況,類似於如果接收端關閉時 send
方法如何回傳 Err
一樣。
呼叫 recv
會阻塞當前線程,所以如果還沒有任務,其會等待直到有可用的任務。Mutex<T>
確保一次只有一個 Worker
線程嘗試請求任務。
但目前還是無法編譯
為了呼叫儲存在 Box<T>
(這正是 Job
別名的類型)中的 FnOnce
閉包,該閉包需要能將自己移動 出 Box<T>
,因為當呼叫這個閉包時,它獲取 self
的所有權。通常來說,將值移動出 Box<T>
是不被允許的,因為 Rust 不知道 Box<T>
中的值將會有多大;chap 15 能夠正常使用 Box<T>
是因為我們將未知大小的值儲存進 Box<T>
從而得到已知大小的值。
chap17 曾見過,17-15 中有使用了 self: Box<Self>
語法的方法,它允許方法獲取儲存在 Box<T>
中的 Self
值的所有權。這正是我們希望做的,然而不幸的是 Rust 不允許我們這麼做:Rust 當閉包被調用時行為的那部分並沒有使用 self: Box<Self>
實現。所以這裡 Rust 也不知道它可以使用 self: Box<Self>
來獲取閉包的所有權並將閉包移動出 Box<T>
。
Rust 仍在努力改進提升編譯器的過程中,未來新版的 rust,應該可讓 20-20 中的 code 正常工作。
不過目前讓我們通過一個小技巧來繞過這個問題。可以明確告訴 Rust 在這裡我們可以使用 self: Box<Self>
來獲取 Box<T>
中值的所有權,而一旦獲取了閉包的所有權就可以呼叫它了。這涉及到定義一個新 trait,它帶有一個在簽名中使用 self: Box<Self>
的方法 call_box
,為任何實現了 FnOnce()
的類型定義這個 trait,修改類別別名來使用這個新 trait,並修改 Worker
使用 call_box
方法。這些修改如 20-21 所示:
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
新增一個 trait FnBox
來繞過當前 Box<FnOnce()>
的限制
新增了一個叫做 FnBox
的 trait。這個 trait 有一個方法 call_box
,它類似於其他 Fn*
trait 中的 call
方法,除了它獲取 self: Box<Self>
以便獲取 self
的所有權並將值從 Box<T>
中移動出來。
接下來,為任何實現了 FnOnce()
trait 的類型 F
實現 FnBox
trait。這實際上就等於任何 FnOnce()
closure 都可以使用 call_box
方法。call_box
的實現使用 (*self)()
將閉包移動出 Box<T>
並呼叫此閉包。
現在我們需要 Job
類別別名是任何實現了新 trait FnBox
的 Box
。這允許我們在得到 Job
值時使用 Worker
中的 call_box
。為任何 FnOnce()
閉包都實現了 FnBox
trait 意味著無需對實際在通道中發出的值做任何修改。現在 Rust 就能夠理解我們的行為是正確的了。
Graceful shutdown and cleanup
20-21 中的代碼如期通過使用線程池異步的響應請求。這裡有一些警告說 workers
、id
和 thread
字段沒有直接被使用,這提醒了我們並沒有清理所有的內容。當使用不那麼優雅的 ctrl-C 終止主線程時,所有其他線程也會立刻停止,即便它們正處於處理請求的過程中。
現在我們要為 ThreadPool
實現 Drop
trait 對線程池中的每一個線程調用 join
,這樣這些線程將會執行完他們的請求。接著會為 ThreadPool
實現一個告訴線程他們應該停止接收新請求並結束的方式。為了實踐這些代碼,修改 server 在優雅停機(graceful shutdown)之前只接受兩個請求。
為 ThreadPool
實現 Drop
Trait
當線程池被丟棄時,應該 join 所有線程以確保他們完成其操作。20-23 展示了 Drop
實現的第一次嘗試;這些代碼還不能夠編譯:
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
編譯錯誤
error[E0507]: cannot move out of borrowed content
--> src/lib.rs:61:13
|
61 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ cannot move out of borrowed content
error: aborting due to previous error
這告訴我們並不能調用 join
,因為只有每一個 worker
的可變借用,而 join
獲取其參數的所有權。為瞭解決這個問題,需要一個方法將 thread
移動出擁有其所有權的 Worker
實例以便 join
可以消費這個線程。17-15 中我們曾見過這麼做的方法:如果 Worker
存放的是 Option<thread::JoinHandle<()>
,就可以在 Option
上調用 take
方法將值從 Some
成員中移動出來而對 None
成員不做處理。換句話說,正在運行的 Worker
的 thread
將是 Some
成員值,而當需要清理 worker 時,將 Some
替換為 None
,這樣 worker 就沒有可以運行的線程了。
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
當新建 Worker
時需要將 thread
值封裝進 Some
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
Worker {
id,
thread: Some(thread),
}
}
}
調用 Option
上的 take
將 thread
移動出 worker
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
向 thread 發送訊息,使其停止接收任務
有了這些修改,code 就能編譯且沒有任何警告。不過也有壞消息,這些代碼還不能以我們期望的方式運行。問題的關鍵在於 Worker
中分配的線程所運行的閉包中的邏輯:呼叫 join
並不會關閉線程,因為他們一直 loop
來尋找任務。如果採用這個實現來嘗試丟棄 ThreadPool
,則主線程會永遠阻塞在等待第一個線程結束上。
為了修復這個問題,修改線程既監聽是否有 Job
運行也要監聽一個應該停止監聽並退出無限循環的信號。所以通道將發送這個枚舉的兩個成員之一而不是 Job
實例
enum Message {
NewJob(Job),
Terminate,
}
Message
要麼是存放了線程需要運行的 Job
的 NewJob
成員,要麼是會導致線程退出循環並終止的 Terminate
成員。
同時需要修改通道來使用 Message
而不是 Job
,收發 Message
值並在 Worker
收到 Message::Terminate
時退出循環
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
// --snip--
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
為了使用 Message
enum 需要將兩個地方的 Job
修改為 Message
:ThreadPool
的定義和 Worker::new
的簽名。ThreadPool
的 execute
方法需要發送封裝進 Message::NewJob
成員的任務。然後,在 Worker::new
中當從通道接收 Message
時,當獲取到 NewJob
成員會處理任務而收到 Terminate
成員則會退出循環。
修改後再次能夠編譯並繼續按照期望的行為運行。不過還是會得到一個警告,因為並沒有產生任何 Terminate
成員的消息。如 20-25 所示修改 Drop
實現來修復此問題:
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
在對每個 worker 線程調用 join
之前向 worker 發送 Message::Terminate
現在遍歷了 worker 兩次,一次向每個 worker 發送一個 Terminate
消息,一個呼叫每個 worker 線程上的 join
。如果嘗試在同一循環中發送消息並立即 join 線程,則無法保證當前迭代的 worker 是從通道收到終止消息的 worker。
為了更好的理解為什麼需要兩個分開的循環,想像一下只有兩個 worker 的場景。如果在一個單獨的循環中遍歷每個 worker,在第一次迭代中向通道發出終止消息並對第一個 worker 線程調用 join
。我們會一直等待第一個 worker 結束,不過它永遠也不會結束因為第二個線程接收了終止消息。deadlock!
為了避免此情況,首先在一個循環中向通道發出所有的 Terminate
消息,接著在另一個循環中 join 所有的線程。每個 worker 一旦收到終止消息即會停止從通道接收消息,意味著可以確保如果發送同 worker 數相同的終止消息,在 join 之前每個線程都會收到一個終止消息。
為了實踐這些代碼,如 20-26 所示修改 main
在優雅停止 server 之前只接受兩個請求:
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
你不會希望真實世界的 web server 只處理兩次請求就停機了,這只是為了展示優雅停機和清理處於正常工作狀態。
take
方法定義於 Iterator
trait,這裡限制循環最多頭 2 次。ThreadPool
會在 main
的結尾離開作用域,而且還會看到 drop
實現的運行。
使用 cargo run
啟動 server,並發起三個 request。第三個request 應該會失敗
這個特定的運行過程中一個有趣的地方在於:注意我們向通道中發出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結束。與此同時,每一個線程都收到了終止消息。一旦 worker 0 結束,主線程就等待其他 worker 結束,此時他們都已經收到終止消息並能夠停止了。
以下是完整的程式
main.rs
extern crate hello;
use hello::ThreadPool;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let mut file = File::open(filename).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
lib.rs
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<dyn FnBox + Send + 'static>;
impl ThreadPool {
/// 創建線程池。
///
/// 線程池中線程的數量。
///
/// # Panics
///
/// `new` 函數在 size 為 0 時會 panic。
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
References
The Rust Programming Language
中文版
中文版 2