用Rust编写web server,实现线程池的清除

在之前用线程池实现的web server中,每个工作线程中通过loop进行循环,从channel的接收端等待任务,然后执行。但是代码中没有提供一种机制,来通知这些工作线程结束。本节就是在之前的基础上,来实现线程池对象的正确清除。

通过为ThreadPool实现Drop trait来实现线程池对象清除

修改Worker如下:

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

为ThreadPool实现Drop:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {//知识点:Option的take方法
                thread.join().unwrap();
            }
        }
    }
}

通知worker线程结束

修改发送内容的格式:

enum Message {
    NewJob(Job),
    Terminate,
}

修改ThreadPool:

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

修改Worker:

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
        Worker {

        let thread = thread::spawn(move ||{
            loop {
                let message = receiver.lock().unwrap().recv().unwrap();

                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

再在ThreadPool的Drop实现中添加发送结束信息:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

修改主函数:

//src/main.rs
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {//take 方法定义于 Iterator trait,这里限制循环最多头 2 次
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

总结

web server这个例子来自于The Rust这本教材,通过这个例子,我们能基本对如何用Rust写一个简单的线程池有一个了解,用到的知识点主要是channel、thread、Arc等知识点。

后续我们还将为大家写一个简单的区块链的例子。

关注令狐一冲,关注区块链和Rust。

上一篇:java-遗传算法的线程池内存不足-为什么?


下一篇:ThreadPool(线程池)介绍