自定义线程池代码

public class TestPool {
    public static void main(String[] args) {
        Pools pools=new Pools(2,1000,TimeUnit.MILLISECONDS,2,
                ((queue, task) -> {
                   //死等
                   // queue.put(task);
                    //带超时等待
                   // queue.offer(task,500,TimeUnit.MILLISECONDS);
                    //放弃执行任务
                    //System.out.println("放弃");
                    //抛出异常
                    //throw new RuntimeException("运行抛出异常");
                    //自己执行任务
                    task.run();
                }));
        for (int i = 0; i <10 ; i++) {
            int j=i;
            pools.excutor(()->{
                System.out.println("执行任务"+j);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
//线程池
class Pools{
    //任务队列
    private TaskQueue<Runnable> taskQueue;
    //线程集合
    private HashSet<Worker> workers=new HashSet<>();
    //核心数
    private int Coresize;
    //获取任务超时时间
    private long time;
    private TimeUnit unit;
    private RejectPolicy<Runnable> rejectPolicy;
    public void excutor(Runnable runnable){
        synchronized (workers) {
            if (workers.size() < Coresize) {
                Worker worker = new Worker(runnable,""+(workers.size()+1));
                workers.add(worker);
                System.out.println("线程执行:"+runnable);
                worker.start();
            } else {
                //taskQueue.put(runnable);
                taskQueue.tryput(rejectPolicy,runnable);
            }
        }
    }

    public Pools(int coresize, long time, TimeUnit unit,int QueueSize,RejectPolicy<Runnable> rejectPolicy) {
        Coresize = coresize;
        this.time = time;
        this.unit = unit;
        taskQueue=new TaskQueue<>(QueueSize);
        this.rejectPolicy=rejectPolicy;
    }

    class Worker extends Thread{
        private String name;
        private Runnable task;

        public Worker(Runnable task,String str) {
            this.task = task;
            name=str;
        }

        @Override
        public void run() {
            //死等
           // while (task!=null||(task=taskQueue.take())!=null){
            //超时等待
            while(task!=null||(task=taskQueue.poll(time,unit))!=null){
                System.out.println("正在执行..."+task);
                task.run();
                task=null;
            }
            synchronized(workers){
                System.out.println("移除线程:"+this);
                workers.remove(this);
            }
        }
    }

}
//阻塞队列
class TaskQueue <T>{
    //任务队列
    private ArrayDeque<T> queue=new ArrayDeque<>();
    //容量
    private int capcity;
    //锁
    private ReentrantLock lock=new ReentrantLock();
    //条件变量
    private Condition full=lock.newCondition();
    private Condition empty=lock.newCondition();
    public TaskQueue(int capcity) {
        this.capcity = capcity;
    }
    //获取任务队列大小
    public int getSize(){
        lock.lock();
        try
        {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }
    //带超时阻塞获取
    public T poll(long time, TimeUnit unit){
        long timeout=unit.toNanos(time);
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    if(timeout<=0){
                        return null;
                    }
                    timeout=empty.awaitNanos(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            full.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
    //阻塞获取
    public T take(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            full.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
    //阻塞生产
    public void put(T task){
        lock.lock();
        try{
            while (queue.size()==capcity){
                System.out.println("等待加入任务队列:"+task);
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列:"+task);
            queue.addLast(task);
            empty.signal();
        }finally {
            lock.unlock();
        }
    }
    //带超时阻塞添加
    public boolean offer(T task,long time,TimeUnit unit){
        lock.lock();
        try{
            long timeout=unit.toNanos(time);
            while (queue.size()==capcity){
                if(timeout<=0) return false;
                System.out.println("等待加入任务队列:"+task);
                timeout=full.awaitNanos(timeout);
            }
            System.out.println("加入任务队列:"+task);
            queue.addLast(task);
            empty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return true;
    }

    public void tryput(RejectPolicy<T> rejectPolicy, T runnable) {
        lock.lock();
        try {
            if(queue.size()==capcity){
                rejectPolicy.reject(this,runnable);
            }else {
                System.out.println("加入队列:"+runnable);
                queue.addLast(runnable);
                empty.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(TaskQueue<T> queue,T task);
}

上一篇:Java多线程详解


下一篇:java多线程 实现Runnable 接口