手写Java线程池

练习手写线程池

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * ...
 *
 * @author : liangfen.zhou@huolala.cn
 * Date : 2018/12/18 下午4:48
 */
public class TestPool {

    private int coreSize,
            maxSize;
    private AtomicInteger running = new AtomicInteger(0);
    private BlockingQueue<Runnable> queue;

    public TestPool(int coreSize, int maxSize, BlockingQueue<Runnable> queue) {
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.queue = queue;
    }

    public void execute(Runnable runnable) {
        if (running.get() < coreSize) {
            if(!addWorker(runnable)){
                reject();
            }
        } else {
            System.out.println("当前队列大小:"+queue.size());
            //添加任务到队列中
            if(!queue.offer(runnable)){
                //如果添加失败则继续创建线程
                System.out.println("offer 失败,当前线程数:"+running.get());
                if(!addWorker(runnable)){
                    reject();
                }
            }
        }
    }

    private void reject() {
        throw new RuntimeException("超出大小,当前线程数:"+running.get()+" 队列大小:"+queue.size());
    }

    private boolean addWorker(Runnable runnable) {
        //如果当前线程数大于最大数则创建失败
        if (running.get() >= maxSize) {
            return false;
        }
        Worker worker = new Worker(runnable);
        worker.start();
        return true;
    }


    private class Worker extends Thread {
        private Runnable runnable;

        public Worker(Runnable runnable) {
            this.runnable = runnable;
            //增加线程运行线程数
            System.out.println("创建线程:当前线程数:"+running.incrementAndGet());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    //运行线程
                    runnable.run();
                    System.out.println("运行结束,当前线程数:"+running.get());
                    //如果当前运行线程数大于核心大小就退出线程
                    if (running.get() > coreSize) {
                        break;
                    }else{
                        //反之从队列里取数据,理论上至少存活coreSize个线程
                        try {
                            System.out.println("000000:队列大小:"+queue.size());
                            runnable = queue.take();
                            System.out.println("11111111:队列大小:"+queue.size());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } finally {
                //线程结束,减少运行数
                running.decrementAndGet();
                System.out.println("结束线程,当前线程数:"+running.get());
            }
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("main");
        TestPool pool = new TestPool(2, 2, new ArrayBlockingQueue<>(3));
        for(int i=0;i<100;i++){
            final  int j = i;
            System.out.println("i="+i+" "+Thread.currentThread().getName());
            pool.execute(()->{
                try {
                    Thread.sleep(100);
                    System.out.println("睡0.1秒 完成:"+j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
上一篇:PostgreSQL pg_basebackup


下一篇:google map根据地址获取经纬度