Set不安全
package com.kuang.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 同理可证:ConcurrentModificationException并发修改异常
* 解决方法:
* //1.Set<String> set = Collections.synchronizedSet(new HashSet<>());工具类的写法
* //2.Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashSet底层是什么??
public HashSet() {
map = new HashMap<>();
}
//add Set本质是map key是无法重复的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//PRESENT是常量固定的值 不会变
private static final Object PRESENT = new Object();
Map不安全
回顾map的基本操作
package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//ConcurrentModificationException 并发修改异常
public class MapTest {
public static void main(String[] args) {
//map是这样用的吗? 不是 工作中不用HashMap
//默认等价于什么?? new HashMap<>(16,0.75);
//HashMap<String, String> map = new HashMap<>();
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 1; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
//加载因子 初始化容量
}
}
7.Callable
1.可以有返回值
2.可以抛出异常
3.方法不同 run()/ call()
代码测试
package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();//Runnable()) 和FutureTask<V>()两者等价
//new Thread(new FutureTask<V>(Callable)).start();//FutureTask可以调用(Callable)
//new Thread().start()是可以启动Callable
new Thread().start();//怎么启动Callable
MyThread thread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(thread);//适配类
new Thread(futureTask,"A").start();
Integer o = (Integer) futureTask.get();//获取Callable返回结果
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call(){
System.out.println("call()");
return 1024;
}
}
package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();//Runnable()) 和FutureTask<V>()两者等价
//new Thread(new FutureTask<V>(Callable)).start();//FutureTask可以调用(Callable)
//new Thread().start()是可以启动Callable
new Thread().start();//怎么启动Callable
MyThread thread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(thread);//适配类
new Thread(futureTask,"A").start();//A为线程
new Thread(futureTask,"B").start();//B为线程 打印的结果只有一个call 因为结果会被缓存,效率高
Integer o = (Integer) futureTask.get();//获取Callable返回结果
//这个get方法可能会产生阻塞,把它放在最后
//或者使用异步通信来处理
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call(){
System.out.println("call()");
return 1024;
}
}
细节:
1.缓存
2.结果可能需要等待,会阻塞!
8 常用的辅助类(必会)
8.1 CountDownLatch
package com.kuang.add;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6的倒计时, 必须要执行任务的时候,再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go out");
countDownLatch.countDown();//数量减一
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
减法计数器
原理:
countDownLatch.countDown();//数量减一
-
countDownLatch.await();//等待计数器归零,然后再向下执行
每次有线程调用countDown()数量减一。假设计数器变为0,countDownLatch.await();就会被唤醒,继续执行!
8.2 CyclicBarrier
加法计数器
package com.kuang.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7; i++) {
final int temp = i;
//lambda能操作到i吗?
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"搜集"+temp+"个龙珠");
try {
cyclicBarrier.await();//等待7个线程结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3 Semaphore
package com.kuang.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量: 类比于停车位 3个停车位 限流的时候可以用
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
//acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//release() 释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
原理:
semaphore.acquire();//获取
,假设已经满了,等待,等待释放为止!
semaphore.release();//释放
,会将当前的信号量释放+1,然后唤醒等待的线程!
作用:多个共享资源互斥使用!并发限流,控制最大的线程数。
9读写锁
ReadWriteLock
package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 独占锁(写锁)一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* ReadWriteLockDemo
* 读-读 可以共存
* 读-写 不能共存
* 写-写 不能共存
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
//写入 lambda表达式无法访问外部变量,只有通过fianl变量去进行中间转换
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,写的过程 只希望同时只有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//取,读的过程 所有人都可以去读
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
/**
* 自定义缓存
*/
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
//存,写的过程
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}
//取,读的过程
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}
}
10 阻塞队列
队列 :FIFO
写入:如果队列满了,就必须阻塞等待
取:如果队列是空的,必须阻塞等待生产
不得不阻塞
阻塞队列:
BlockingQueue: 不是新的东西
什么情况下我们会使用阻塞队列:多线程并发处理,线程池!
学会使用队列
添加 移除
四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer()空参 | put() | offer(,,) |
移除 | remove | pool() | take() | poll(,) |
检测队首元素 | element | peek() | - | - |
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
test1();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
System.out.println(blockingQueue.remove());
}
}
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
test2();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2() {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
//取
System.out.println("===============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
}
}
输出结果:
true true true
false
a b c null
Process finished with exit code 0
查看队首元素 System.out.println(blockingQueue.element())放在哪个remove 前面,哪个就是队首 其余的元素就被移除了
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
test1();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.element());//查看队首元素
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
//System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2() {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
//取
System.out.println("===============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
}
}
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
test2();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.element());//查看队首元素
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
//System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2() {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
//取
System.out.println("===============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
}
}
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) throws InterruptedException {
test3();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.element());//查看队首元素
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
//System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2() {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
//取
System.out.println("===============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞
//存
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");//队列没有位置了会一直等待
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());//没有这个元素,会一直阻塞
}
/**
* 等待,阻塞(等待超时)
*/
}
package com.kuang.bq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
test4();
}
/**
* 抛出异常
*/
public static void test1(){
//<>()放队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println("++++++++++++++++++++++++++++");
//IllegalStateException: Queue full抛出异常
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.element());//查看队首元素
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException没有元素错误 抛出异常
//System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2() {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
//取
System.out.println("===============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞
//存
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");//队列没有位置了会一直等待
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());//没有这个元素,会一直阻塞
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//存
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
//blockingQueue.offer("d",2, TimeUnit.SECONDS);//延迟两秒,超时退出
System.out.println("===========================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2, TimeUnit.SECONDS);//等待超过2秒就退出
}
}
SynchronousQueue同步队列
没有容量
进去一个元素,必须等待取出来之后,才能往里面放一个元素
put take
package com.kuang.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
* 和其他的BlockingQueue不一样,SynchronousQueue不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();//同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
11、线程池(重点)
线程池:三大方法,七大参数,四种拒绝策略
池化技术
程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池///.... 创建、销毁,十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
线程池的好处:
1.降低资源的消耗
2.提高响应的速度
3.方便管理。
线程可以复用 ,可以控制最大并发数,管理线程
线程池:三大方法
package com.kuang.pool;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Executors工具类 三大方法
//使用了线程池之后,使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小
//ExecutorService threadPool = Executors.newCachedThreadPool();//缓存的 可以伸缩,遇强则强,遇弱则弱
try {
for (int i = 0; i < 100 ; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
7大参数
源码分析:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//约为21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质: ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大的线程池大小
long keepAliveTime,//超时了,没有人调用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler//拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
四种拒绝策略
package com.kuang.pool;
import java.util.concurrent.*;
// Executors工具类 三大方法
/**
* new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人的,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里
* new ThreadPoolExecutor.DiscardPolicy());//队列满了,丢掉任务,不会抛出异常
* new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常
*/
//使用了线程池之后,使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
//自定义线程池!工作中使用ThreadPoolExecutor
//假设核心线程池有2个 就是2个银行业务员,最大同时有5个人去办理 超过3秒就不等了 LinkedBlockingDeque<>()侯客区只能放3人
ExecutorService threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常
try {
//最大承载:Deque + max
//RejectedExecutionException超出最大承载抛出的异常
for (int i = 1; i <= 9 ; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}