public class TestPool {
public static void main(String[] args) {
Pools pools=new Pools(2,1000,TimeUnit.MILLISECONDS,2,
((queue, task) -> {
//死等
// queue.put(task);
//带超时等待
// queue.offer(task,500,TimeUnit.MILLISECONDS);
//放弃执行任务
//System.out.println("放弃");
//抛出异常
//throw new RuntimeException("运行抛出异常");
//自己执行任务
task.run();
}));
for (int i = 0; i <10 ; i++) {
int j=i;
pools.excutor(()->{
System.out.println("执行任务"+j);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
//线程池
class Pools{
//任务队列
private TaskQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers=new HashSet<>();
//核心数
private int Coresize;
//获取任务超时时间
private long time;
private TimeUnit unit;
private RejectPolicy<Runnable> rejectPolicy;
public void excutor(Runnable runnable){
synchronized (workers) {
if (workers.size() < Coresize) {
Worker worker = new Worker(runnable,""+(workers.size()+1));
workers.add(worker);
System.out.println("线程执行:"+runnable);
worker.start();
} else {
//taskQueue.put(runnable);
taskQueue.tryput(rejectPolicy,runnable);
}
}
}
public Pools(int coresize, long time, TimeUnit unit,int QueueSize,RejectPolicy<Runnable> rejectPolicy) {
Coresize = coresize;
this.time = time;
this.unit = unit;
taskQueue=new TaskQueue<>(QueueSize);
this.rejectPolicy=rejectPolicy;
}
class Worker extends Thread{
private String name;
private Runnable task;
public Worker(Runnable task,String str) {
this.task = task;
name=str;
}
@Override
public void run() {
//死等
// while (task!=null||(task=taskQueue.take())!=null){
//超时等待
while(task!=null||(task=taskQueue.poll(time,unit))!=null){
System.out.println("正在执行..."+task);
task.run();
task=null;
}
synchronized(workers){
System.out.println("移除线程:"+this);
workers.remove(this);
}
}
}
}
//阻塞队列
class TaskQueue <T>{
//任务队列
private ArrayDeque<T> queue=new ArrayDeque<>();
//容量
private int capcity;
//锁
private ReentrantLock lock=new ReentrantLock();
//条件变量
private Condition full=lock.newCondition();
private Condition empty=lock.newCondition();
public TaskQueue(int capcity) {
this.capcity = capcity;
}
//获取任务队列大小
public int getSize(){
lock.lock();
try
{
return queue.size();
}finally {
lock.unlock();
}
}
//带超时阻塞获取
public T poll(long time, TimeUnit unit){
long timeout=unit.toNanos(time);
lock.lock();
try{
while (queue.isEmpty()){
try {
if(timeout<=0){
return null;
}
timeout=empty.awaitNanos(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
full.signal();
return t;
}finally {
lock.unlock();
}
}
//阻塞获取
public T take(){
lock.lock();
try{
while (queue.isEmpty()){
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
full.signal();
return t;
}finally {
lock.unlock();
}
}
//阻塞生产
public void put(T task){
lock.lock();
try{
while (queue.size()==capcity){
System.out.println("等待加入任务队列:"+task);
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列:"+task);
queue.addLast(task);
empty.signal();
}finally {
lock.unlock();
}
}
//带超时阻塞添加
public boolean offer(T task,long time,TimeUnit unit){
lock.lock();
try{
long timeout=unit.toNanos(time);
while (queue.size()==capcity){
if(timeout<=0) return false;
System.out.println("等待加入任务队列:"+task);
timeout=full.awaitNanos(timeout);
}
System.out.println("加入任务队列:"+task);
queue.addLast(task);
empty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
return true;
}
public void tryput(RejectPolicy<T> rejectPolicy, T runnable) {
lock.lock();
try {
if(queue.size()==capcity){
rejectPolicy.reject(this,runnable);
}else {
System.out.println("加入队列:"+runnable);
queue.addLast(runnable);
empty.signal();
}
}finally {
lock.unlock();
}
}
}
@FunctionalInterface
interface RejectPolicy<T>{
void reject(TaskQueue<T> queue,T task);
}