手把手教你实现简单而又安全的线程池
微信公众号:大黄奔跑
关注我,可了解更多有趣的面试相关问题。
写在之前
前两篇文档分别从面试角度、源码角度分析了线程池使用及原理,抛开Jdk
实现的线程池,如果让你自己写一个线程池,该当如何快速实现呢?
本篇是之前面试快手时面试官给的题目,后续自己加了一些自己的思考并且手动实现了一个简单的线程池
线程池的好处之前已经赘述,可以避免频繁的创建和销毁线程,管理多个线程。Jdk
本身也对线程池做了很好的实现,特别是AQS用于同步队列,简直是妙哉,但是如果想让我们手动去实现一个线程池,该如何做呢?
本文就来实现一个简单的线程池,肯定不能够保证性能媲美 AQS,但是安全性还是要保证的。
首先是定义线程池的接口:
线程池的核心是:用户不需要自己去new线程,而是将new的过程交由线程池处理。
并且线程池可以用于管理线程
/**
* @time 2020/5/17 11:03
* @Description 手动来实现一个简单的线程池
*/
public interface ThreadPool<Job extends Runnable> {
/**
* 执行一个工作任务,将任务提交给线程池
* @param job
*/
void execute(Job job);
/**
* 关闭线程池
*/
void shutdown();
/**
* 增加工作线程
* @param num 增加的个数
*/
void addWorkers(int num);
/**
* 减少工作者线程
* @param num 减少的个数
*/
void removeWorkers(int num);
/**
* 获取正在等待执行的任务数量
* @return
*/
int getJobSize();
}
定义具体的实现
/**
* @time 2020/5/17 11:08
* @Description
*/
public class MySelfThreadPool<Job extends Runnable> implements ThreadPool<Job>{
/**
* 最大的线程数 默认10
*/
private static final int MAX_WORKER_NUMBERS = 10;
/**
* 默认创建的线程数
*/
private static final int DEFAULT_WORKER_NUMBERS = 5;
/**
* 最小的线程数
*/
private static final int MIN_WORKER_NUMBERS = 1;
/**
* 工作列表,需要向里面插入工作任务
*/
private final LinkedList<Job> jobs = new LinkedList <>();
/**
* 工作者列表 利用自带的容器同步工具,也就是有多少个线程可以处理任务咯
*/
private final List<MyWorker> workers = Collections.synchronizedList(new ArrayList <>());
/**
* 工作线程的数量
*/
private int workerNum = DEFAULT_WORKER_NUMBERS;
/**
* 生成线程编号
*/
private AtomicLong threadNum = new AtomicLong();
/**
* 构造方法,如果不传参数的时候,初始化线程数
*/
public MySelfThreadPool(){
initWorkers(DEFAULT_WORKER_NUMBERS);
}
/**
* 构造方法 初始化特定的线程数
* @param num 线程数 不可以超过最大的线程数
*/
public MySelfThreadPool(int num){
// 如果传入的参数非法,则直接转化为范围内的值
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initWorkers(workerNum);
}
/**
* 初始化线程数,并且这些线程已经启动的
* @param workerNum
*/
private void initWorkers(int workerNum) {
for(int i = 0;i < workerNum;i++){
MyWorker worker = new MyWorker();
// 因为workers队列本身是线程安全的
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-worker-" + threadNum.incrementAndGet());
thread.start();
}
}
/**
* 执行一个工作任务,将任务提交给线程池,这里直接用synchronized锁定同步容器
* @param job
*/
@Override
public void execute(Job job) {
if(job != null){
synchronized (jobs){
jobs.addLast(job);
jobs.notify();
}
}
}
/**
* 关闭线程池,那肯定需要关闭一个个线程咯
*/
@Override
public void shutdown() {
for(MyWorker worker : workers){
worker.shutdown();
}
}
/**
* 向工作队列中增加线程
* @param num 增加的个数
*/
@Override
public void addWorkers(int num) {
synchronized (jobs){
// 防止线程数移除
if(num + this.workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initWorkers(num);
this.workerNum += num;
}
}
/**
* 从工作队列中移除线程
* @param num 减少的个数
*/
@Override
public void removeWorkers(int num) {
synchronized (jobs){
if(num >= this.workerNum){
throw new IllegalArgumentException("参数有误哦");
}
// 一个一个移除线程
int count = 0 ;
while (count < num){
MyWorker worker = workers.get(count);
// 如果顺利移除,则关闭线程
if(workers.remove(worker)){
worker.shutdown();
count++;
}
}
// 一定要修改全局的变量
this.workerNum -= count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
/**
* 定义自己的工作线程
*/
class MyWorker implements Runnable{
/**
* 定义一个boolean类型的数据表示是否运行中……
*/
private volatile boolean running = true;
@Override
public void run() {
// 判断线程是否处于运行状态,如果运行则说明可以做后续工作
while (running){
Job job = null;
// 直接锁住整个工作队列
synchronized (jobs){
// 如果工作队列为空,则需要等待
while (jobs.isEmpty()){
try {
jobs.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// 如果工作队列不为空,则取出任务
job = jobs.removeFirst();
}
if(job != null){
job.run();
}
}
}
/**
* 暂停工作
*/
public void shutdown(){
running = false;
}
}
}
总结
本意想借着文章的机会给大家分享所学所得,无奈知识浅薄,文章难免很有很多纰漏,如果你发现了错误的地方,可以加我的微信交流。(微信公众号没有留言。)
番外
另外,关注大黄奔跑公众号,第一时间收获独家整理的面试实战记录及面试知识点总结。
我是大黄,一个只会写HelloWorld
的程序员,咱们下期见。