前言
本文為「The Rust Programming Language」語言指南的學習筆記。
做法
使用執行緒池可以來同時回應多重請求,但警告顯示 workers
、id
與 thread
欄位沒有被直接使用,這提醒目前尚未清理所有內容。當使用比較不優雅的 ctrl-c 方式來中斷主執行緒時,所有其他執行緒也會立即停止,不管它們是否正在處理請求。
現在要實作 Drop
特徵來對池中的每個執行緒呼叫 join
方法,讓它們能在關閉前把任務處理完畢。然後要告訴執行緒它們該停止接收新的請求並關閉。為了觀察此程式碼的實際運作,以下會修改伺服器讓它在正常關機(graceful shutdown)前,只接收兩個請求。
實作 Drop 特徵
先對執行緒池實作 Drop
特徵。當池被釋放時,執行緒都該加入(join)回來以確保它們有完成它們的工作。首先,遍歷執行緒池中的每個 workers
。對此使用 &mut
是因為 self
是個可變引用,而且我們也需要能夠改變 worker
。對每個工作者印出訊息來說明此工作者正要關閉,然後對工作者的執行緒呼叫 join
。如果 join
的呼叫失敗的話,我們使用 unwrap
來讓 Rust 恐慌,使其變成較不正常的關機方式。
1 | impl Drop for ThreadPool { |
目前並無法呼叫 join
,因為我們只有每個 worker
的可變借用,而 join
會取走其引數的所有權。要解決此問題,需要將 thread
中的執行緒移出 Worker
實例,讓 join
可以消耗該執行緒。如果 Worker
改持有 Option<thread::JoinHandle<()>>
的話,可以對 Option
呼叫 take
方法來移動 Some
變體中的數值,並在原處留下 None
變體。換句話說,thread
中有 Some
變體的話就代表 Worker
正在執行,而當我們清理 Worker
時,我們會將 Some
換成 None
來讓 Worker
沒有任何執行緒可以執行。
更新 Worker
的定義如以下。
1 | struct Worker { |
當建立新的 Worker
,需要將 thread
的數值封裝到 Some
。
1 | impl Worker { |
對 Option
呼叫 take
來將 thread
移出 worker
。
1 | impl Drop for ThreadPool { |
Option
的 take
方法會取走 Some
變體的數值並在原地留下 None
。我們使用 if let
來解構 Some
並取得執行緒,然後我們對執行緒呼叫 join
。如果工作者的執行緒已經是 None
,就知道該該工作者已經清理其執行緒了,所以沒有必要再處理。
發送停止接收任務的信號
現在雖然有呼叫 join
,但這無法關閉執行緒,因為它們會一直 loop
來尋找任務執行。如果嘗試以目前的 drop
實作釋放 ThreadPool
的話,主執行緒會被阻擋,一直等待第一個執行緒處理完成。
要修正此問題,要修改執行緒,讓它們除了接收 Job
來執行以外,也要能收到告訴它們要停止接收並離開無限迴圈的信號。所以我們的通道將傳送以下兩種枚舉變體,而不再是 Job
實例。
修改 src/lib.rs
檔,Message
枚舉可以是存有該執行緒要執行的 Job
的 NewJob
變體,或是通知執行緒離開其迴圈並停止的 Terminate
變體。
1 | enum Message { |
需要調整通道來使用 Message
型別,而不是 Job
型別。
1 | pub struct ThreadPool { |
為了改用 Message
枚舉,有兩個地方得將 Job
改成 Message
:ThreadPool
的定義與 Worker::new
的簽名。ThreadPool
的 execute
方法需要傳送封裝成 Message::NewJob
的任務。然後在 Worker::new
中,也就是取得 Message
的通道接收端中,如果收到 NewJob
變體的話,其就會處理任務;而如果收到 Terminate
變體的話,執行緒就會打破迴圈。
有了這些改變,程式碼就能編譯並繼續執行。
1 | impl Drop for ThreadPool { |
現在會遍歷工作者們 2 次,一次是傳送 Terminate
訊息給每個工作者,另一次是對每個工作者執行緒呼叫 join
。如果我們嘗試在同個迴圈中傳送訊息並立即呼叫 join
的話,會無法保證在當前疊代中的工作者就是從通道中取得訊息的工作者。
為了更好理解為何我們需要兩個分開的迴圈,想像一個情境中有 2 個工作者。如果用一個迴圈來遍歷每個工作者,在第一次疊代中會有個關機訊息傳至通道,並對第一個工作者執行緒呼叫 join
。如果第一個工作者正在忙於處理請求的話,第二個工作者就會從通道取得關機訊息並關閉。這樣會變成持續等待第一個工作者關閉,但是它永遠不會關閉,因為是第二個執行緒取得關機訊息的。死結(deadlock)就發生了!
為了預防此情形,我們首先在一個迴圈中對通道傳送所有的 Terminate
訊息,然後我們在另一個迴圈才將所有的執行緒加回來。每個工作者一旦收到關機訊息後,就會停止從通道中接收訊息。所以可以確定如果我們發送與執行緒數量相當的關機訊息的話,每個工作者都會在其執行緒被呼叫 join
前收到關機訊息。
要實際看到此程式碼的運作情形,修改 main
來在正常關閉伺服器前,只接收兩個請求。
修改 src/bin/main.rs
檔。
1 | fn main() { |
在真實世界中的網頁伺服器當然不會只處理 2 個請求就關機。此程式碼只是用來說明正常關機與清理的運作流程。
take
方法是由 Iterator
特徵所定義,且限制該疊代最多只會取得前兩項。ThreadPool
會在 main
結束時離開作用域,然後 drop
的實作就會執行。
執行程式。
1 | cargo run |
輸出以下訊息。
1 | $ cargo run |
可能會看到不同順序的工作者與訊息輸出。可以從訊息中看到此程式碼如何執行的,工作者 0 與 3 獲得前兩個請求。然後對於第三個請求,伺服器會停止接受連線。當 ThreadPool
在 main
結尾離開作用域時,它 Drop
的實作就會生效,然後執行緒池告訴所有工作者關閉。當工作者看到關機訊息時,它們就會印出訊息,然後執行緒池會呼叫 join
來關閉每個工作者的執行緒。
此特定執行方式中有個有趣的地方值得注意:ThreadPool
傳送關機訊息至通道,且在任何工作者收到訊息前,我們就已經著將工作者 0 加入回來。工作者 0 此時尚未收到關機訊息,所以主執行緒會被擋住並等待工作者 0 完成。同一時間,每個工作者會開始收到關機訊息。當工作者 0 完成時,主執行緒會等待剩下的工作者完成任務。屆時,它們都會收到關機訊息並能夠關閉。