**假设此时读者已经对于线程池的的七大参数
配置,核心线程数
、最大线程数
、阻塞队列拒绝策略已经有了一定的了解,那摩再看看这个代码可能会对你理解线程池的底层实现原理有一定的帮助
- 自定义线程池+工作线程实现
class ThreadPool{
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void execute(Runnable task){
// 当任务数没有超过coreSize时,直接交给worker对象执行
// 如果任务数量超过coreSize时,加入任务队列暂存
synchronized (workers){
if (workers.size() < coreSize){
Worker worker = new Worker(task);
System.out.println("新增worker{},{}"+worker+"任务对象{}"+task);
workers.add(worker);
worker.start();
}else {
System.out.println("加入任务队列{}"+task);
// 说明此时线程池中的核心线程都在被使用,因此新进来的任务需要被加入到任务队列中
// taskQueue.put(task);
/** 等待又有一些等待策略
* 1) 死等
* 2) 带超时等待
* 3) 放弃任务执行
* 4) 抛出异常
* 5) 调用者自己执行任务
* 还有更多的选择
*/
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当task不为空,执行任务
// 2) 当task执行完毕,再接着从任务队列获取任务并执行(需要复用该线程)
// while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try {
System.out.println("正在执行..{}"+task);
task.run();
} catch (Exception e){
e.printStackTrace();
}finally {
// 代表当前的这一个任务执行完毕
task = null;
}
}
synchronized (workers){
System.out.println("work 被移除{}"+this);
// 任务执行完毕,从队列中移除
workers.remove(this);
}
}
}
}
2.阻塞队列配置
class BlockingQueue<T>{
//1. 创建任务队列(Deque底层是一个双向链表,有两种实现,linkedlist和arrayDeque)
// arrayDequeue效率要比linkedlist效率好一些
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁,保证一个线程一次只能从一个队列中获取一个任务
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capity;
// 设置容量
public BlockingQueue(int capity) {
this.capity = capity;
}
/**
* 带超时的阻塞获取
* @param timeout
* @param unit
* @return
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
// 将timeout 统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
// 返回的是剩余时间
if (nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取到该队列的队首元素,获取到之后会移除该任务
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞获取,意思就是如果任务队列空了,就不能再继续往队列中获取任务了
* 需要阻塞住
* @return
*/
public T take(){
lock.lock();
try{
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取到该队列的队首元素,获取到之后会移除该任务
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻涉添加
public void put(T task){
lock.lock();
try{
// 当任务队列满了之后,新进来的任务就进入等待
while (queue.size() == capity){
try {
System.out.println("等待加入任务队列"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
// 唤醒
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
*带超时的阻塞添加
* @return
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit){
lock.lock();
try{
// 将时间统一转换为纳秒
long nacos = timeUnit.toNanos(timeout);
// 当任务队列满了之后,新进来的任务就进入等待
while (queue.size() == capity){
try {
System.out.println("等待加入任务队列"+task);
if (nacos <=0){
return false;
}
// 返回的是剩余需要等待的时间
nacos = fullWaitSet.awaitNanos(nacos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
// 唤醒
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 获取队列大小
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满了
if (queue.size() == capity){
rejectPolicy.reject(this,task);
}else { // 工作队列有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
3.自定义聚聚策略
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
4.主线程测试
public class TestPool1 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,((queue, task) -> {
// queue.put(task); // 1.死等策略
// 2.超时等待
// queue.offer(task,1500,TimeUnit.MILLISECONDS);
// 3.让调用者放弃任务执行
//System.out.println("放弃{}"+task);
// 4.让调用者抛出异常
// throw new RuntimeException("任务执行失败"+task);
// 5. 让调用者自己执行任务
task.run(); // 这其实就是主线程自己执行的任务
}));
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(()->{
try {
// 此时休眠是模拟线程处理任务需要花费一些时间,这时候又有新任务进来
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
5.最后完整线程池结构代码,这里是在一些内部类的基础上编写的,重要理解设计和思想
package Thread.diyThreadPool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestPool1 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,((queue, task) -> {
// queue.put(task); // 1.死等策略
// 2.超时等待
// queue.offer(task,1500,TimeUnit.MILLISECONDS);
// 3.让调用者放弃任务执行
//System.out.println("放弃{}"+task);
// 4.让调用者抛出异常
// throw new RuntimeException("任务执行失败"+task);
// 5. 让调用者自己执行任务
task.run(); // 这其实就是主线程自己执行的任务
}));
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(()->{
try {
// 此时休眠是模拟线程处理任务需要花费一些时间,这时候又有新任务进来
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
class ThreadPool{
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void execute(Runnable task){
// 当任务数没有超过coreSize时,直接交给worker对象执行
// 如果任务数量超过coreSize时,加入任务队列暂存
synchronized (workers){
if (workers.size() < coreSize){
Worker worker = new Worker(task);
System.out.println("新增worker{},{}"+worker+"任务对象{}"+task);
workers.add(worker);
worker.start();
}else {
System.out.println("加入任务队列{}"+task);
// 说明此时线程池中的核心线程都在被使用,因此新进来的任务需要被加入到任务队列中
// taskQueue.put(task);
/** 等待又有一些等待策略
* 1) 死等
* 2) 带超时等待
* 3) 放弃任务执行
* 4) 抛出异常
* 5) 调用者自己执行任务
* 还有更多的选择
*/
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当task不为空,执行任务
// 2) 当task执行完毕,再接着从任务队列获取任务并执行(需要复用该线程)
// while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try {
System.out.println("正在执行..{}"+task);
task.run();
} catch (Exception e){
e.printStackTrace();
}finally {
// 代表当前的这一个任务执行完毕
task = null;
}
}
synchronized (workers){
System.out.println("work 被移除{}"+this);
// 任务执行完毕,从队列中移除
workers.remove(this);
}
}
}
}
class BlockingQueue<T>{
//1. 创建任务队列(Deque底层是一个双向链表,有两种实现,linkedlist和arrayDeque)
// arrayDequeue效率要比linkedlist效率好一些
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁,保证一个线程一次只能从一个队列中获取一个任务
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capity;
// 设置容量
public BlockingQueue(int capity) {
this.capity = capity;
}
/**
* 带超时的阻塞获取
* @param timeout
* @param unit
* @return
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
// 将timeout 统一转换为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
// 返回的是剩余时间
if (nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取到该队列的队首元素,获取到之后会移除该任务
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞获取,意思就是如果任务队列空了,就不能再继续往队列中获取任务了
* 需要阻塞住
* @return
*/
public T take(){
lock.lock();
try{
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取到该队列的队首元素,获取到之后会移除该任务
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻涉添加
public void put(T task){
lock.lock();
try{
// 当任务队列满了之后,新进来的任务就进入等待
while (queue.size() == capity){
try {
System.out.println("等待加入任务队列"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
// 唤醒
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
*带超时的阻塞添加
* @return
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit){
lock.lock();
try{
// 将时间统一转换为纳秒
long nacos = timeUnit.toNanos(timeout);
// 当任务队列满了之后,新进来的任务就进入等待
while (queue.size() == capity){
try {
System.out.println("等待加入任务队列"+task);
if (nacos <=0){
return false;
}
// 返回的是剩余需要等待的时间
nacos = fullWaitSet.awaitNanos(nacos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
// 唤醒
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 获取队列大小
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满了
if (queue.size() == capity){
rejectPolicy.reject(this,task);
}else { // 工作队列有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}