JUC常用类
指的是import java.util.concurrent
包下的类,通常用于解决多线程协调问题
- lock和衍生的ReentrantLock
- 各种容器的安全类:CopyOnWriteArrayList,ConcurrentHashMap…
- 不安全集合转安全集合:Collections.synchronizedLis()
- …
生产者和消费者的问题(面试)
- 如果生产者不为空,通知消费者消费
- 当商品被消费完了,通知消费者消费
- 生产者和消费者的拉锯
使用wait与notify
//生产者
class Producer {
final Produce produce;
public Producer(Produce produce) {
this.produce = produce;
}
//生产
public void produce() {
for (int i = 0; i < 100; i++) {
produce.add();
}
}
}
//消费者
class Consumer {
final Produce produce;
public Consumer(Produce produce) {
this.produce = produce;
}
//消费
public void consume() {
for (int i = 0; i < 1000; i++) {
produce.dec();
}
}
}
//产品
class Produce {
Queue<Integer> data = new LinkedList<>();
int count = 0;
public synchronized void add(){
while (data.size()>=10){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data.offer(count);
System.out.println("生产了产品:" + count++);
notifyAll();
}
public synchronized void dec(){
while (data.size()==0){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费了产品:" + data.poll());
notifyAll();
}
}
Condition
- 通常是通过
lock.newCondition
创建
//产品
class Produce {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Queue<Integer> data = new LinkedList<>();
int count = 0;
public void add() {
lock.lock();
try {
while (data.size() >= 10) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data.offer(count);
System.out.println("生产了产品:" + count++);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void dec() {
lock.lock();
try {
while (data.size() == 0) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费了产品:" + data.poll());
condition.signalAll();
} finally {
lock.unlock();
}
}
}
- 多个Condition可以实现精准唤醒
package com.hzy.juc.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class C {
public static void main(String[] args) {
Data3 data3=new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printC();
}
},"C").start();
}
}
class Data3{
private Lock lock=new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int number=1; //1a 2b 3c
public void printA(){
lock.lock();
try {
while (number!=1)
condition1.await();
{
}
System.out.println(Thread.currentThread().getName()+"=>AAAAAA");
number=2;
condition2.signal();//唤醒指定的人干活
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while (number!=2)
condition2.await();
{
}
System.out.println(Thread.currentThread().getName()+"=>BBBBBB");
number=3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while (number!=3)
condition3.await();
{
}
System.out.println(Thread.currentThread().getName()+"=>CCCCCC");
number=1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
计数器
CountDownLatch(减法计数器)
- 线程安全
- 多个线程
countDown
,为0await
向下执行
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch=new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"go out");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归0然后再向下执行
System.out.println("Close Door");
}
}
CyclicBarrier(加法计数器)
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
* 召唤龙珠的线程
* */
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7 ; i++) {
final int temp=i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore(信号量)
多个信号量可以让多个线程交替使用
多个共享资源互斥的使用!并发限流,控制最大的线程数!
acquire();
获取,假设已经满了,等待被释放为止!release();
释放,会将当前的信号量释放+1,然后唤醒等待的线程
//最多同时三个访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
//拿到许可证,若没有则等待
semaphore.acquire();
//一秒后释放许可证
System.out.println(Thread.currentThread().getName()+"执行中");
TimeUnit.SECONDS.sleep(1);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
读写锁
- 读是共享锁,写是排它锁
- 分为
writeLock().lock()
和readLock().lock();
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
队列
阻塞队列
- FIFO(先入先出)
- 满了阻塞,空了阻塞
- 场景:并发处理,线程池
- add&remove
- offer&poll
ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(int num);
同步队列
- 只能放一个,存进去必须等待取出来(交替打印)
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue= new SynchronousQueue();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"=>"+"put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"=>"+"put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"=>"+"put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
线程池(重点)
池化技术:事先准备好一些资源,用完之后还给我(不回收)
- 节省开辟线程的消耗
- 提高想要速度
- 并与管理
看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,通过源码分析禁用的原因。
阅读Executors
源码,发现只不过是写好了参数。
-
newFixedThreadPool
作用:该方法返回一个固定线程数量的线程池,线程数量自定义。该方法创建的线程池最大线程数量等于核心线程数量。如果新提交的任务没有空闲的线程去处理,就会被放入阻塞队列中。
缺点:该线程池使用的阻塞队列是LinkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)
-
newSingleThreadExecutor
作用:该方法创建了只有一个线程的线程池,如果提交的任务没有空闲的线程去处理,就会被放入阻塞队列中
缺点:该线程池使用的阻塞队列是LinkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)
-
newCachedThreadPool
作用:该方法返回一个可根据实际需求调整线程数量的线程池。如果提交的任务没有空闲的线程处理,就会创建新的线程去处理该任务。如果有线程空闲时间超过60秒,就会被销毁
缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)
-
newScheduleThreadPool
作用:该方法可以创建自定义核心线程容量的线程池,而且该线程池支持定时以及周期性的任务执行。
缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)
FixedThreadPool
FixedThreadPool是复用固定数量的线程处理一个共享的无边界队列
- 它是一种固定大小的线程池;
- corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
- keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
- 阻塞队列采用了LinkedBlockingQueue,它是一个*队列;
- 由于阻塞队列是一个*队列,因此永远不可能拒绝任务;
- 由于采用了*队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
SingleThreadExecutor
- 它只会创建一条工作线程处理任务;
- 采用的阻塞队列为LinkedBlockingQueue;
CachedThreadPool
- 它是一个可以无限扩大的线程池;
- 它比较适合处理执行时间比较小的任务;
- corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
- keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
- 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
ScheduledThreadPool(延时任务)
scheduledThreadPool.schedule(() -> System.out.println("hello"), int time, TimeUnit.MINUTES);
:time分钟后打印hello
- 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
- scheduledAtFixedRate
- scheduledWithFixedDelay
- SchduledFutureTask接收的参数:
- time:任务开始的时间
- sequenceNumber:任务的序号
- period:任务执行的时间间隔
- 它采用DelayQueue存储等待的任务
- DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
- DelayQueue也是一个*队列;
- 工作线程的执行过程:
- 工作线程会从DelayQueue取已经到期的任务去执行;
- 执行结束后重新设置任务的到期时间,再次放回DelayQueue
ForkJoin
ForkJoin在JDK1.7,并行执行任务!提高效率,大数据!(把大任务拆分成小任务)
public class Main {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
Future
- 类似Ajax
- 对将来的某个事件的结果进行建模
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync"+":Integer");
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
//t:正常的返回结果
//u:错误信息
System.out.println(t + "\t" + u);
}).exceptionally((e) -> {
e.getMessage();//打印错误信息
return 233;// 可以获取到错误的返回结果
}).get());
CompletableFuture
Java8引入的,针对Future做了优化,可以传入回调对象,当任务结束时,会自动回调某个对象的方法,并且可以串行执行,使用静态方法即可创建一个异步任务:
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello");
return "hello";
});
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("world");
return "world";
});
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2);
completableFuture.thenAccept(c -> {
System.out.println("任务都完成了");
});