在之前用线程池实现的web server中,每个工作线程中通过loop进行循环,从channel的接收端等待任务,然后执行。但是代码中没有提供一种机制,来通知这些工作线程结束。本节就是在之前的基础上,来实现线程池对象的正确清除。
通过为ThreadPool实现Drop trait来实现线程池对象清除
修改Worker如下:
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
Worker {
id,
thread: Some(thread),
}
}
}
为ThreadPool实现Drop:
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {//知识点:Option的take方法
thread.join().unwrap();
}
}
}
}
通知worker线程结束
修改发送内容的格式:
enum Message {
NewJob(Job),
Terminate,
}
修改ThreadPool:
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
// --snip--
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
修改Worker:
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
再在ThreadPool的Drop实现中添加发送结束信息:
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
修改主函数:
//src/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {//take 方法定义于 Iterator trait,这里限制循环最多头 2 次
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
总结
web server这个例子来自于The Rust这本教材,通过这个例子,我们能基本对如何用Rust写一个简单的线程池有一个了解,用到的知识点主要是channel、thread、Arc等知识点。
后续我们还将为大家写一个简单的区块链的例子。
关注令狐一冲,关注区块链和Rust。