package com.leetcode.random.difficulty;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Threadpool{
private MyQueue<Runnable> q; //任务队列
private HashSet<Worker> workers = new HashSet<>(); //线程集合,线程不安全
private int coreSize; //核心任务数
private long timeout; //获取任务时的超时连接
private TimeUnit timeUnit;
//执行任务
public void execute(Runnable task){
synchronized (workers){
//任务数没有超过coreSize时,直接交给worker对象执行
//如果任务数超过coreSzie时,加入任务队列暂存
if(workers.size() < coreSize){
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}
else{
q.put(task);
}
}
}
public Threadpool( int coreSize, long timeout, TimeUnit timeUnit, int capacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.q = new MyQueue<>(capacity);
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run(){
//执行任务
//1)当task不为空,执行任务
//2)当task执行完毕,再接着从任务队列获取任务并执行
while(task != null || (task =q.take()) != null){
try{
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
workers.remove(this); //把当前的work对象移除
}
}
}
}
class MyQueue<T> {
private Deque<T> q = new ArrayDeque<>(); //任务队列 双链表 还可以选择LinkedList数据结构
private int capacity ; //定死容量10
private ReentrantLock lock = new ReentrantLock(); //锁
private Condition empty = lock.newCondition(); //条件变量
private Condition full = lock.newCondition(); //条件变量
public MyQueue(int capacity) {
this.capacity = capacity;
}
public T take(){
lock.lock();
try {
while (q.isEmpty()) {
try {
empty.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
T result = q.removeFirst();
full.signal();
return result;
} finally {
lock.unlock();
}
};
public void put(T t){
lock.lock();
try {
while (q.size() == capacity) {
try {
full.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
q.addLast(t);
full.signal();
} finally {
lock.unlock();
}
};
public T take(long timeout, TimeUnit unit){
lock.lock();
try {
//单位统一为ns
long nanos = unit.toNanos(timeout);
while (q.isEmpty()) {
try {
if(nanos <= 0) {
return null;
}
//唤醒后剩余时间,给作为下次阻塞的等待时间
nanos = empty.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
T result = q.removeFirst();
full.signal();
return result;
} finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try{
return q.size();
}finally {
lock.unlock();
}
}
}