JUC
java.util.concurrent :concrrent包
java.util.concurrent.atomic :原子包
java.util.concurrent.;locks :锁lock包
lock 锁
package com.bin.concurrent;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Resources {
private int num = 30;
private Lock lock = new ReentrantLock();//可重入锁
public void subtractNum() {
lock.lock();
try {
if (num > 0) {
System.out.println(Thread.currentThread().getName() + "售卖前:" + (num--) + "售卖后:num=" + num);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
/**
* 1.高内聚低耦合的情况下: 线程 操作(对外暴露的调用方法) 资源类
* 2.判断 / 干活 / 通知
* 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if
*
* @Author: 邱成兵
* @Date: Created in 19:30 2021/4/27
*/
public class SaleTicket {
public static void main(String[] args) {
Resources resources = new Resources();
new Thread(() -> {
for (int i = 0; i < 40; i++) resources.subtractNum();
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) resources.subtractNum();
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) resources.subtractNum();
}, "C").start();
/*new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
resources.subtractNum();
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
resources.subtractNum();
}
}
},"B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
resources.subtractNum();
}
}
},"C").start();*/
}
}
package com.bin.concurrent;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource {
private int number=1;//A:1 B:2 C:3
private Lock lock=new ReentrantLock();//可重入锁
private Condition condition1=lock.newCondition();
private Condition condition2=lock.newCondition();
private Condition condition3=lock.newCondition();
public void print5(){
lock.lock();
try {
//判断
if(number!=1){
condition1.await();
}
//干活
for (int i = 1; i <=5 ; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i);
}
number=2;
//通知
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print10(){
lock.lock();
try {
//判断
if(number!=2){
condition2.await();
}
//干活
for (int i = 1; i <=10 ; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i);
}
number=3;
//通知
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void print15(){
lock.lock();
try {
//判断
if(number!=3){
condition3.await();
}
//干活
for (int i = 1; i <=15 ; i++) {
System.out.println(Thread.currentThread().getName()+"\t"+i);
}
number=1;
//通知
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
/**
* 精准唤醒
* 1.高内聚低耦合的情况下: 线程 操作(对外暴露的调用方法) 资源类
* 2.判断 / 干活 / 通知
* 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if
* 4.标志位
*
* @Author: 邱成兵
* @Date: Created in 11:03 2021/4/30
*/
public class ThreadOrderAccess {
public static void main(String[] args) {
ShareResource shareResource=new ShareResource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareResource.print15();
}
}, "C").start();
}
}
package com.bin.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class AirConditioner{
private int num=0;
private Lock lock=new ReentrantLock();//可重入锁
//代替 synchronized 里面的 wait notify notifyall 方法 awati signal signalAll
private Condition condition=lock.newCondition();
public void addNum() throws InterruptedException {
lock.lock();
try {
while (num!=0){
// wait();
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() +"生产" +num);
// notify();
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void subtractNum() throws InterruptedException {
lock.lock();
try {
//判断
while(num==0){
// wait();
condition.await();
}
//干活
num--;
System.out.println(Thread.currentThread().getName() +"消费"+num);
// notify();
//通知
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
* 老版本
*
**/
/*public synchronized void addNum() throws InterruptedException {
while (num>0){
wait();
}
num++;
System.out.println(Thread.currentThread().getName() +"生产" +num);
notify();
}
public synchronized void subtractNum() throws InterruptedException {
while(num<=0){
wait();
}
num--;
System.out.println(Thread.currentThread().getName() +"消费"+num);
notify();
}*/
}
/**
* 交替打印出消费数字
* 2.判断 / 干活 / 通知
* 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while 不能用if
*
* @Author: 邱成兵
* @Date: Created in 11:18 2021/4/28
*/
public class ThreadWaitNotifyDemo {
public static void main(String[] args) {
AirConditioner airConditioner=new AirConditioner();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
airConditioner.addNum();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
airConditioner.subtractNum();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
airConditioner.addNum();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "C").start();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
airConditioner.subtractNum();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "D").start();
}
}
多线程8锁
同步方法,锁是当前this
静态同步方法,锁是当前对象的.class
同步方法块,锁是()里传的对象,结束或异常必须释放锁
普通方法,互不影响
线程异常
java.util.concurrentModificationException 并发修改异常
Arraylist 如何线程安全
- 用vector集合
- Collections.synchronizedList(new ArrayList<>()) Collections工具类
- CopyOnWriteArrayList-写时复制(读写分离思想) concrrent并发包下
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
每次添加进来先拿到锁, 先拿到以前的集合,再获取长度,在扩容+1,把添加的数据放入扩容过后的集合,再放入以前的集合,并通知下一个-标志位
集合扩容是一半 map扩容是一倍2*n次方 每次加1
并发经验:如果能确认map的大小直接给确定值,避免后续再扩容操作
安全的集合和map
package com.bin.concurrent;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 线程安全
* 1.故障现象: java.util.ConCurrentModificationException
* 2.导致原因: 线程不安全导致的
* 3.解决方案: 线程安全的集合或map 集合工具类 collections 并发包:concurrent
* 4.优化建议: (同样的错误 不出现第二次)
*
* @Author: qcb
* @Date: Created in 11:47 2021/5/6
*/
public class NotSafeDemo {
public static void mapNotSafe() {
// 线程安全的map
Map<Object, Object> collections = new ConcurrentHashMap<>();//Collections.synchronizedMap()//new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
collections.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0, 8));
System.out.println(collections);
}, String.valueOf(i)).start();
}
}
public static void setNotSafe() {
// 线程安全的set
Set<String> collections = new CopyOnWriteArraySet(); //Collections.synchronizedSet(new HashSet<>())//new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
collections.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(collections);
}, String.valueOf(i)).start();
}
}
public static void listNotSafe(){
// 线程安全的list
List<String> collections= new CopyOnWriteArrayList();//Collections.synchronizedList()//new Vector<>();//new ArrayList<>();
for (int i = 0; i <30 ; i++) {
new Thread(() -> {
collections.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(collections);
}, String.valueOf(i)).start();
}
}
}
多线程
- 创建多线程的方式:
- 传统有2种,java5之后增加了2种 总共4种
callable 细节
package com.bin.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("come in here");
return 1024;
}
}
/**
* @Author: 邱成兵
* @Date: Created in 15:15 2021/5/6
* 获取多线程的方式
* 1.继承Thread
* 2.实现runnable
* 3.实现callable<>
* 4.线程池
*/
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread());
//实现原理 java 多态
new Thread(futureTask, "A").start();
//多次调用 走的是缓存 只会调用一次
//new Thread(futureTask, "B").start();
//获取结果一定要放到最后 不然会阻塞拿到结果在执行
System.out.println(futureTask.get());
}
}
辅助类
CountDownLatch 辅助类-减少计数
作用: 是一种通用的同步工具 等其他线程执行完了 主线程再关闭 关门走人
原理:
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
package com.bin.concurrent;
import java.util.concurrent.CountDownLatch;
/**
* JUC辅助类
* @Author: 邱成兵
* @Date: Created in 16:09 2021/5/6
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//减少计数 开始初始 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;
CountDownLatch countDownLatch=new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println("第"+Thread.currentThread().getName()+"个同学出教室 /t");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
//第二个是完成信号,允许司机等到所有的工作人员完成。
countDownLatch.await();
System.out.println("关门");
}
}
CyclicBarrier 辅助类-循环栅栏 到达共同屏障点的同步辅助
允许一组线程全部等待彼此达到共同屏障点的同步辅助 集齐7龙珠
原理:
- CyclicBarrier * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是, * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞, * 直到最后一个线程到达屏障时,屏障才会开门,所有 * 被屏障拦截的线程才会继续干活。 * 线程进入屏障通过CyclicBarrier的await()方法。
package com.bin.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 允许一组线程全部等待彼此达到共同屏障点的同步辅助
*
* @Author: 邱成兵
* @Date: Created in 16:53 2021/5/6
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
//参数1:多少个线程跳闸 参数2:跳闸之后执行的线程
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println("集齐7颗龙珠");
});
for (int i = 0; i < 7; i++) {
final int temInt=i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" /t 第" +temInt+"颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
semaphore -辅助类-信号灯
可做秒杀线程数量控制 不管前端请求多少个 只接受固定能接受的数量
主要用于并发的控制可资源的互斥 限流
如果设置semaphore 为1 等同于 synchronized (场景 一个线程持有一个资源多久 可以用这个实现)
原理:
在信号量上我们定义两种操作: * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), * 要么一直等下去,直到有线程释放信号量,或超时。 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 * * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.bin.concurrent;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 信号等
* 用户并发的控制和资源的互斥 限流
* @Author: 邱成兵
* @Date: Created in 17:15 2021/5/6
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//模拟3个车位
Semaphore semaphore=new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
//资源减一
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"/t 抢到了车位");
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
}
System.out.println(Thread.currentThread().getName()+"/t 退出停车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
ReadWriteLock 读写锁 (场景:缓存的读写)
读读能共存 读写不能共存 写写不能共存
package com.bin.concurrent;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
private volatile Map<String,Object> map=new HashMap<>();
//读写锁
private ReadWriteLock lock=new ReentrantReadWriteLock();
public void put(String key,Object value){
try {
lock.writeLock().lock();
System.out.println(key+" /t 开始写入数据");
TimeUnit.MILLISECONDS.sleep(3);
map.put(key,value);
System.out.println(key+" /t 写入完成"+value);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.writeLock().unlock();
}
}
public void get(String key) {
try {
lock.readLock().lock();
System.out.println(key+" /t 开始读数据");
TimeUnit.MILLISECONDS.sleep(3);
Object o = map.get(key);
System.out.println(key+" /t 读取读数据"+o);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
}
}
/**
* 读写锁
* @Author: 邱成兵
* @Date: Created in 10:32 2021/5/7
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache=new MyCache();
for (int i = 0; i < 5; i++) {
myCache.put(i+"",i+"");
}
for (int i = 0; i < 5; i++) {
myCache.get(i+"");
}
}
}
BlockingQueue - 阻塞队列
不用手动阻塞、唤醒线程了 (wait/await notify/signal notifyall/signalAll) BlockingQueue全部自动实现了
核心方法
package com.bin.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 阻塞队列
* @Author: 邱成兵
* @Date: Created in 14:35 2021/5/7
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(3);
blockingQueue.add("a");//添加 超过界限会报异常 queue full :队列已满
blockingQueue.remove();//移除先进的第一位 没有会报异常
blockingQueue.element();//检查 没有会报异常
blockingQueue.offer("a"); //超出返回false
blockingQueue.poll(); //没有移除的返回 null
blockingQueue.peek();//检查 没有返回null
blockingQueue.put("a");//添加 超出会阻塞
blockingQueue.take();//移除 没有会阻塞
blockingQueue.offer("a",3l, TimeUnit.SECONDS);//超出多少时间后直接退出
blockingQueue.poll(3l, TimeUnit.SECONDS);//超出多少时间后直接退出
}
}
线程池
例子:10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用;控制最大并发数;管理线程。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
package com.bin.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池三大主流使用方法
* @Author: 邱成兵
* @Date: Created in 15:59 2021/5/7
*/
public class ThreadDemo {
public static void main(String[] args) {
//执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
// ExecutorService executorService= Executors.newFixedThreadPool(5);
//一个任务一个任务的执行,一池一线程
// ExecutorService executorService= Executors.newSingleThreadExecutor();
//执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
ExecutorService executorService= Executors.newCachedThreadPool();
try {
for (int i = 1; i <= 10; i++) {
final int teamInt=i;
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+" /t 办理业务"+teamInt);
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
}
7大参数详解
1、corePoolSize:线程池中的常驻核心线程数
2、maximumPoolSize:线程池中能够容纳同时
执行的最大线程数,此值必须大于等于1
3、keepAliveTime:多余的空闲线程的存活时间
当前池中线程数量超过corePoolSize时,当空闲时间
达到keepAliveTime时,多余线程会被销毁直到
只剩下corePoolSize个线程为止
4、unit:keepAliveTime的单位
5、workQueue:任务队列,被提交但尚未被执行的任务
6、threadFactory:表示生成线程池中工作线程的线程工厂,
用于创建线程,一般默认的即可
7、handler:拒绝策略,表示当队列满了,并且工作线程大于
等于线程池的最大线程数(maximumPoolSize)时如何来拒绝
请求执行的runnable的策略
线程池底层工作原理
工作底层原理步骤
- 提交任务
- 判断核心线程是否满 满:进入队列
- 判断阻塞队列是否满 满:创建主线程 (一个一个的扩)
- 判断主线程是否满 满:拒绝策略
1、在创建了线程池后,开始等待请求。
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1、如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2、如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3、如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4、如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
工作中常用创建的线程池方法
在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?
一个都不用,我们工作中只能使用自定义的
OOM(OutOfMemoryError? -JAVA内存溢出 虚拟机暴露故障
ExecutorService executorService=new ThreadPoolExecutor(2,//核心
5,//主线程
2L,//过期时间
TimeUnit.SECONDS,//过期单位
new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小
Executors.defaultThreadFactory(), //线程工厂 一般都是用默认的
new ThreadPoolExecutor.AbortPolicy());//拒绝策略
4大拒绝策略
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
最大线程数该如何设置?
public static void main(String[] args) {
//cpu的数量
int cpuNum = Runtime.getRuntime().availableProcessors();
//cpu密集型 内核数+1或+2
//IO密集型 1/总核数/阻塞系数
ExecutorService executorService=new ThreadPoolExecutor(2,//核心
5,//主线程
2L,//过期时间
TimeUnit.SECONDS,//过期单位
new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小
Executors.defaultThreadFactory(), //线程工厂 一般都是用默认的
new ThreadPoolExecutor.AbortPolicy());//拒绝策略 默认抛出异常
}
一:CPU密集型:
定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。
这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。(或者加1到2)
特点:
01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用
02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了
03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。
package pool;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo02 {
public static void main(String[] args) {
//自定义线程池! 工作中只会使用 ThreadPoolExecutor
/**
* 最大线程该如何定义(线程池的最大的大小如何设置!)
* 1、CPU 密集型,几核,就是几,可以保持CPU的效率最高!
*/
//获取电脑CPU核数
System.out.println(Runtime.getRuntime().availableProcessors()); //8核
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, //核心线程池大小
Runtime.getRuntime().availableProcessors(), //最大核心线程池大小(CPU密集型,根据CPU核数设置)
3, //超时了没有人调用就会释放
TimeUnit.SECONDS, //超时单位
new LinkedBlockingDeque<>(3), //阻塞队列
Executors.defaultThreadFactory(), //线程工厂,创建线程的,一般不用动
new ThreadPoolExecutor.AbortPolicy()); //银行满了,还有人进来,不处理这个人的,抛出异常
try {
//最大承载数,Deque + Max (队列线程数+最大线程数)
//超出 抛出 RejectedExecutionException 异常
for (int i = 1; i <= 9; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown(); //(为确保关闭,将关闭方法放入到finally中)
}
}
}
二:IO密集型:
定义:IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。
我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。
01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。
IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU
时间片让出去,直到缓冲区写满。既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):
CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU
空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU
时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中
package pool;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo02 {
public static void main(String[] args) {
//自定义线程池! 工作中只会使用 ThreadPoolExecutor
/**
* 最大线程该如何定义(线程池的最大的大小如何设置!)
* 2、IO 密集型 >判断你程序中十分耗IO的线程
* 程序 15个大型任务 io十分占用资源! (最大线程数设置为30)
* 设置最大线程数为十分耗io资源线程个数的2倍
*/
//获取电脑CPU核数
System.out.println(Runtime.getRuntime().availableProcessors()); //8核
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, //核心线程池大小
16, //若一个IO密集型程序有15个大型任务且其io十分占用资源!(最大线程数设置为 2*CPU 数目)
3, //超时了没有人调用就会释放
TimeUnit.SECONDS, //超时单位
new LinkedBlockingDeque<>(3), //阻塞队列
Executors.defaultThreadFactory(), //线程工厂,创建线程的,一般不用动
new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试和最早的竞争,也不会抛出异常
try {
//最大承载数,Deque + Max (队列线程数+最大线程数)
//超出 抛出 RejectedExecutionException 异常
for (int i = 1; i <= 9; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown(); //(为确保关闭,将关闭方法放入到finally中)
}
}
}
总结:
1:高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
2:并发不高、任务执行时间长的业务这就需要区分开看了:
a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,线程池中的线程数设置得少一些,减少线程上下文的切换
(其实从一二可以看出无论并发高不高,对于业务中是否是cpu密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)
3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,我们的项目使用的时redis作为缓存(这类非关系型数据库还是挺好的)。增加服务器是第二步(一般*项目的首先,因为不用对项目技术做大改动,求一个稳,但前提是资金充足),至于线程池的设置,设置参考
2
。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦。三.:总结:
01:一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8
个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU
核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。02:如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候
CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起
CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。
java.util.function 函数式接口
4大函数式接口
- Function<T,R> 函数型接口 特点:传入T 有一个返回R
/*Function<String,Integer> function=new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return 1024;
}
};*/
//新写法 lambda 表达式写法
// Function<String,Integer> function=(s) -> {return 1024;};
//一个参数可以省略() 返回可以省略{return }
Function<String,Integer> function=s -> 1024;
System.out.println(function.apply("a"));
- Predicate 断定型接口 特点:返回boolean
/*Predicate<String> predicate=new Predicate<String>() {
@Override
public boolean test(String s) {
return false;
}
};*/
//lambda 表达式
// Predicate<String> predicate=s -> s.isEmpty();
//lambada 表达式+方法的引用
Predicate<String> predicate=String::isEmpty;
System.out.println(predicate.test("qcb"));
- Consumer 消费型接口 特点:没有返回值 有参数
/*Consumer<String> consumer=new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};*/
//lambda 表达式
// Consumer consumer= s -> System.out.println(s);
Consumer consumer= System.out::print;
consumer.accept("a");
- Supplier 宫给型接口 特点:无参数 有返回值
/*Supplier<String> supplier=new Supplier<String>() {
@Override
public String get() {
return "aa";
}
};*/
//lambda 表达式
// Supplier<String> supplier=() -> {return "aa";};
Supplier<String> supplier=() -> "aa";
System.out.println(supplier.get());
java.util.stream
流(Stream) 到底是什么呢?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。“集合讲的是数据,流讲的是计算!”
特点:
- Stream 自己不会存储元素
- Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
- Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
阶段:
- 创建一个Stream:一个数据源(数组、集合)
- 中间操作:一个中间操作,处理数据源数据
- 终止操作:一个终止操作,执行中间操作链,产生结果
//题目:请按照给出数据,找出同时满足 *
//偶数ID且年龄大于24且用户名转为大写且用户名字母倒排序 *
// 最后只输出一个用户名字
User u1 = new User(11, "a", 23);
User u2 = new User(12, "b", 24);
User u3 = new User(13, "c", 22);
User u4 = new User(14, "d", 28);
User u5 = new User(16, "e", 26);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
list.stream().filter(user -> {
return user.getId() % 2 == 0 && user.getAge() > 24; //id偶数 并且 大于24的
})./*filter(user -> {
return user.getAge() > 24;
//Function<T,R> mapper)
}).*/map(user -> {
return user.getUserName().toUpperCase(); //把名称转为大写
}).sorted(/*(user1, user2) -> {
return user2.compareTo(user1); //倒叙
}*/Collections.reverseOrder()).limit(1).forEach(System.out::println);//只查一条
ForkJoinPool 分支合并框架
原理:Fork:把一个复杂任务进行分拆,大事化小Join:把分拆任务的结果进行合并
ForkJoinPool:分支合并池 类比=> 线程池
ForkJoinTask:ForkJoinTask 类比=> FutureTask
RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务
@Override
protected Integer compute() {
if((end-begin)<=ADJUST_VALUE){
for (int i = begin; i <= end; i++) {
result=result+i;
}
}else {
int middle = (end + begin) / 2;
MyTask myTask1=new MyTask(begin,middle); //分支1
MyTask myTask2=new MyTask(middle+1,end);//分支2
myTask1.fork();//开启分支1
myTask2.fork();//开启分支2
result=myTask1.join()+myTask2.join();//合并
}
return result;
}
package com.bin.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
//RecursiveTask 继承ForkJoinTask 继承 Future 可以调用callable接口 返回计算结果
class MyTask extends RecursiveTask<Integer> {
private static final Integer ADJUST_VALUE=10;
private int begin;
private int end;
private int result;
public MyTask(Integer begin, Integer end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if((end-begin)<=ADJUST_VALUE){
for (int i = begin; i <= end; i++) {
result=result+i;
}
}else {
int middle = (end + begin) / 2;
MyTask myTask1=new MyTask(begin,middle);
MyTask myTask2=new MyTask(middle+1,end);
myTask1.fork();
myTask2.fork();
result=myTask1.join()+myTask2.join();
}
return result;
}
}
/**
* 分支合并框架
* @Author: 邱成兵
* @Date: Created in 18:04 2021/5/8
*/
public class ForkJoinPollDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask=new MyTask(0,100);
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);
System.out.println(submit.get());
forkJoinPool.shutdown();
}
}
CompletableFutrue - 异步调用
package com.bin.concurrent;
import java.util.concurrent.CompletableFuture;
/**
* 异步调用
*
* @Author: 邱成兵
* @Date: Created in 14:37 2021/5/11
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
/*CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("a");
}
});*/
//执行异步调用没有返回参数的
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("没有返回值cccccc");
});
completableFuture.get();
/*CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1024;
}
});*/
//执行异步调用有返回参数的
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
// int a=1024/0;
return 1024;
});
System.out.println(completableFuture1.whenComplete((t, d) -> {
System.out.println("t:" + t);
System.out.println("d:" + d);
}).exceptionally(f -> {
System.out.println(f.getMessage());
return 4444;
}).get());
}
}
原理:https://blog.csdn.net/finalheart/article/details/87615546