前言:
在上一篇rust入门实践单线程http服务器时,我们实现的单线程版本满足web第一阶段的理论实现,即通过实现掌握rust编写web服务器的基础(引用net包,链接TCP链接),同时掌握http协议进行请求解析。走出一小步,才能大步快步迈向那个Hello World。
接下来,将通过实现线程池模型来提高服务器的并发能力,线程池模型也是生产环境常用的一种方式,其他的高并发方式还有fork/join(如:jdk7 基于work-strealing思想实现的并行计算)、单线程异步io(如:nginx、redis)。
这里我们主要讲解线程池,也即是常说的threaPool。
内容
1、掌握线程池的设计思路
2、掌握rust的闭包
3、rust的类设计
从发现问题到解决问题
问题
一、单线程会阻塞并发请求
在生产环境中,我们的web服务器面向的是互联网群体,用户量之大,请求之多,没个用户的请求应该是互不影响的,而第一版本的web服务器存在一个问题是当有多个请求并发时,每一个请求必须等待前一个请求处理完才能响应,用户体验极差。那单线程无法处理,我们可以开多个线程呀,每个请求都由一个线程处理不就完美解决了嘛,让我们看下如何实现多线程吧:
fn main() {
let listener = TcpListener::bind("10.86.168.45:9999").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
std::thread::spawn(move || {
handle_connection(stream);
});
}
}
当服务器接收到请求流时,使用rust的thread库,通过调用std::thread::spawn创建线程,其源码如下:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}
其参数接收的是FnOnce类型闭包,闭包还有其他两种类型:Fn、FnMut区别在于闭包获取环境变量的所有权。
1、FnOnce参数类型是 self,所以,这种类型的闭包会获取变量的所有权,生命周期只能是当前作用域,之后就会被释放了,不能运行多次。
2、FnMut参数类型是 &mut self,所以,这种类型的闭包是可变借用,会改变变量,但不会释放该变量。可以运行多次。
3、Fn参数类型是 &self,所以,这种类型的闭包是不可变借用,不会改变变量,也不会释放该变量。可以运行多次。
而在参数列表前使用 move 关键字是强制闭包获取其使用的环境值的所有权。这个技巧在创建新线程将值的所有权从一个线程移动到另一个线程时最为实用,而不需要由rust编译器去推导所有权,但当使用move关键字将stream转移给闭包线程后,在主线程便不能再进行操作,这是move语义需要注意的,在这里不展开。
二、多线程改进方式会引发DoS攻击
通过为每个请求创建线程的方式虽然解决了多并发阻塞的问题,但是当请求量无限上涨时,比如常见的DoS攻击(洪水攻击),即不停的创建请求导致线程无上限创建时会消耗服务器资源。
那可以限定线程的数量,而线程的执行效率也是跟cpu挂钩,系统线程数=cpu个数 * 核数,当运行线程数大于系统线程数时,操作系统会采用cpu时间片控制,采用线程调度算法进行线程切换,线程数越大切换越频繁,所以经常说线程数不可过大,一般设置线程数有前辈给出过实践公式,可以搜一下。我一般不分io密集还是cpu密集,会设置标准数量为核数的大小,可最大创建两倍核数大小。
限制线程数可以防止线程无限创建导致的资源问题,那么当请求大于工作线程数量时,又该如何处理不能及时处理的请求呢,想象一下我们平时生活中的场景,去银行办理业务时,银行柜台工作有限,那么排队的人就需要获取排队号码进行等待,同理的,我们可以引入一个FIFO队列,即先请求的任务势必先处理。
限定线程池 + 请求队列,这便是我们线程池的两大核心:
1、预创建固定数量的线程数
2、将请求放入请求队列等待处理
总结:
在rust实践中,我们将使用channel来进行任务的分发,ThreadPool在接受到任务时写入channel,预创建的线程工作者可以同channel获取到任务并执行。
设计
线程池不仅仅是可以用在web请求处理上,实际上它应该被作为一个基础组件运用在其他需要进行并行计算的位置,我们可以单独把线程池作为一个独立的模块。
首先,我们先拆分之前的项目结构,在src目录下创建bin目录,用于放置我们的主工程,而在bin外层,也即src下创建lib.rs,用于封装ThreadPool作为主工程的外部库,拆分后的目录结构如下:
封装ThreadPool
lib.rs 代码如下:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
main.rs 使用 ThreadPool
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use webBean::ThreadPool;
fn main() {
let listener = TcpListener::bind("10.86.168.45:9999").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
// std::thread::spawn(move || {
// handle_connection(stream);
// });
pool.execute(|| {
handle_connection(stream);
});
}
}
当我们在设计ThreadPool的Api的时候, ThreadPool的使用者即开发人员是我们要服务的对象,那Api必须让使用者尽量少的学习成本以及使用负担,ThreadPool的Api如下说明:
-
new函数:表示线程池里的预创建线程数量,这里还未采用弹性创建方式
-
execute函数:将函数处理交由线程池托管,函数api跟std::thread::spawn一致
executre在模仿spawn实现时,核心在于函数参数:闭包。
在这里,咱们先在脑里回顾前文讲到的rust闭包有三种特性:Fn,FnMut,FnOnce。在线程池的这个使用场景下,我们该使用哪个特性呢?
处理请求的线程只执行请求的闭包函数一次,之后闭包的生命周期便结束了,不可再运行多次,所以在这里可以使用FnOnce
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
实现
Api设计结束,虽然代码已经可以运行,但是只是个架子,接下来需要为其丰富其内涵了。
- 回顾银行柜台处理业务的场景,线程池的机制便是先预创建柜台员工(Worker),每个Workder持续(线程)工作(处理每一个请求)。
在ThreadPool内部,我们定义Worker并实现
struct Worker {
//每个员工有编号
id: usize,
//绑定线程持续处理请求
thread: thread::JoinHandle<()>,
}
impl Worker {
//构造Workder
fn new (id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {id, thread}
}
}
- 现在员工(Worker)有了, 那他们应该怎么知道有请求要进行处理呢。接下来,我们将引入Rust的线程通信机制Channel。
应用层调用ThreadPool的Execute方法时,ThreadPool可以通过chancel发送一个任务(Job)给到Workder绑定的channel端。
Job的定义如下:
type Job = Box<dyn FnOnce() + Send + 'static>;
这句定义可能比较隐晦,我们先一步步拆解 Rust的语法糖
FnOnce() + Send + 'static 是一个特征对象:
-
FnOnce也即是闭包特性
-
Send 表示该类型的值可以通过跨线程发送,用于Channel
-
'static 表示对特征对象里包含的引用变量生命周期的约束,要保证的是在执行闭包时,闭包的对象生命周期不能小于闭包,而实现安全检测。
-
Channel通信机制
ThreadPool在初始化时创建Channel的作为Job的发送端,每个Worker获取Channel作为Job的接收端,当每个Worker在获取Channel中的Job任务时,涉及到并发共享锁的问题,所以在消费Job时,每个Worker需要对Channel使用Arc<Mutex>,Arc 如C++中的智能指针shared_ptr,只是增加引用计数,最后一个指针销毁后,内存才被释放,而这里加了Mutex表示内存是mut可修改的。 -
ThreadPool创建Channel
-
Worker构造时接收Channel
-
Worker处理Channel任务Job时,进行锁
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
//短别名
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// 创建Channel,作为任务发送端
let (sender, receiver) = mpsc::channel();
//创建Channel接收端指针
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
//每个work共享一个receiver
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
//threadPool接收任务时通过Channel发送Job任务
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
//构造Worker时接收Receiver作为线程的闭包参数捕获
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
//调用recv阻塞当前线程,直到任务接收后进行执行,执行结束再进行loop循环从Channel接收Job
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread,
}
}
}