JUC - 学习笔记

JUC

java.util.concurrent :concrrent包
java.util.concurrent.atomic :原子包
java.util.concurrent.;locks :锁lock包

lock 锁

package com.bin.concurrent;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Resources {
    private int num = 30;
    private Lock lock = new ReentrantLock();//可重入锁

    public void subtractNum() {
        lock.lock();
        try {
            if (num > 0) {
                System.out.println(Thread.currentThread().getName() + "售卖前:" + (num--) + "售卖后:num=" + num);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 1.高内聚低耦合的情况下: 线程  操作(对外暴露的调用方法)  资源类
 * 2.判断 / 干活  / 通知
 * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if
 *
 * @Author: 邱成兵
 * @Date: Created in 19:30 2021/4/27
 */
public class SaleTicket {
    public static void main(String[] args) {
        Resources resources = new Resources();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) resources.subtractNum();
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) resources.subtractNum();
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) resources.subtractNum();
        }, "C").start();

        /*new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 40; i++) {
                    resources.subtractNum();
                }
            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 40; i++) {
                    resources.subtractNum();
                }
            }
        },"B").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 40; i++) {
                    resources.subtractNum();
                }
            }
        },"C").start();*/
    }
}

package com.bin.concurrent;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareResource {
    private int number=1;//A:1 B:2 C:3
    private Lock lock=new ReentrantLock();//可重入锁
    private Condition condition1=lock.newCondition();
    private Condition condition2=lock.newCondition();
    private Condition condition3=lock.newCondition();
    public void print5(){
        lock.lock();
        try {
            //判断
            if(number!=1){
                condition1.await();
            }
            //干活
            for (int i = 1; i <=5 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=2;
            //通知
            condition2.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print10(){
        lock.lock();
        try {
            //判断
            if(number!=2){
                condition2.await();
            }
            //干活
            for (int i = 1; i <=10 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=3;
            //通知
            condition3.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print15(){
        lock.lock();
        try {
            //判断
            if(number!=3){
                condition3.await();
            }
            //干活
            for (int i = 1; i <=15 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=1;
            //通知
            condition1.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

/**
 *  精准唤醒
 * 1.高内聚低耦合的情况下: 线程  操作(对外暴露的调用方法)  资源类
 * 2.判断 / 干活  / 通知
 * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if
 * 4.标志位
 *
 * @Author: 邱成兵
 * @Date: Created in 11:03 2021/4/30
 */
public class ThreadOrderAccess {
    public static void main(String[] args) {
        ShareResource shareResource=new ShareResource();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print5();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print10();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print15();
            }
        }, "C").start();
    }
}

package com.bin.concurrent;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class AirConditioner{
    private int num=0;
    private Lock lock=new ReentrantLock();//可重入锁
    //代替 synchronized 里面的  wait notify notifyall 方法   awati  signal signalAll
    private Condition condition=lock.newCondition();

    public void addNum() throws InterruptedException {
        lock.lock();
        try {
            while (num!=0){
//                wait();
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() +"生产" +num);
//            notify();
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }


    public void subtractNum() throws InterruptedException {
        lock.lock();
        try {
            //判断
            while(num==0){
//                wait();
                condition.await();
            }
            //干活
            num--;
            System.out.println(Thread.currentThread().getName() +"消费"+num);
//            notify();
            //通知
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }


    /**
     *  老版本
     *
     **/
    /*public synchronized void addNum() throws InterruptedException {
        while (num>0){
            wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName() +"生产" +num);
        notify();
    }

    public synchronized void subtractNum() throws InterruptedException {
        while(num<=0){
            wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName() +"消费"+num);
        notify();
    }*/
}
/**
 * 交替打印出消费数字
 *   2.判断 / 干活  / 通知
 *   3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if
 *
 * @Author: 邱成兵
 * @Date: Created in 11:18 2021/4/28
 */
public class ThreadWaitNotifyDemo {
    public static void main(String[] args) {
        AirConditioner airConditioner=new AirConditioner();
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    airConditioner.addNum();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    airConditioner.subtractNum();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    airConditioner.addNum();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "C").start();

        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    airConditioner.subtractNum();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "D").start();
    }
}

多线程8锁

同步方法,锁是当前this
静态同步方法,锁是当前对象的.class
同步方法块,锁是()里传的对象,结束或异常必须释放锁
普通方法,互不影响

线程异常

java.util.concurrentModificationException 并发修改异常

Arraylist 如何线程安全

  1. 用vector集合
  2. Collections.synchronizedList(new ArrayList<>()) Collections工具类
  3. CopyOnWriteArrayList-写时复制(读写分离思想) concrrent并发包下

public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
每次添加进来先拿到锁, 先拿到以前的集合,再获取长度,在扩容+1,把添加的数据放入扩容过后的集合,再放入以前的集合,并通知下一个-标志位

JUC - 学习笔记

集合扩容是一半 map扩容是一倍2*n次方 每次加1
并发经验:如果能确认map的大小直接给确定值,避免后续再扩容操作

安全的集合和map

package com.bin.concurrent;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 线程安全
 * 1.故障现象: java.util.ConCurrentModificationException
 * 2.导致原因: 线程不安全导致的
 * 3.解决方案: 线程安全的集合或map  集合工具类 collections  并发包:concurrent 
 * 4.优化建议: (同样的错误 不出现第二次)
 * 
 * @Author: qcb
 * @Date: Created in 11:47 2021/5/6
 */
public class NotSafeDemo {
    public static void mapNotSafe() {
        // 线程安全的map
        Map<Object, Object> collections = new ConcurrentHashMap<>();//Collections.synchronizedMap()//new HashMap<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                collections.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0, 8));
                System.out.println(collections);
            }, String.valueOf(i)).start();
        }
    }

    public static void setNotSafe() {
        // 线程安全的set
        Set<String> collections = new CopyOnWriteArraySet(); //Collections.synchronizedSet(new HashSet<>())//new HashSet<>();

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                collections.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(collections);
            }, String.valueOf(i)).start();
        }
    }

    public static void listNotSafe(){
        // 线程安全的list
       List<String> collections= new CopyOnWriteArrayList();//Collections.synchronizedList()//new Vector<>();//new ArrayList<>();
        for (int i = 0; i <30 ; i++) {
            new Thread(() -> {
                collections.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(collections);
            }, String.valueOf(i)).start();
        }
    }
}

多线程

  • 创建多线程的方式:
  1. 传统有2种,java5之后增加了2种 总共4种

JUC - 学习笔记

callable 细节

package com.bin.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("come in here");
        return 1024;
    }
}

/**
 * @Author: 邱成兵
 * @Date: Created in 15:15 2021/5/6
 *  获取多线程的方式
 * 1.继承Thread
 * 2.实现runnable
 * 3.实现callable<>
 * 4.线程池
 */
public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(new MyThread());
        //实现原理   java 多态
        new Thread(futureTask, "A").start();
        //多次调用 走的是缓存  只会调用一次
       //new Thread(futureTask, "B").start();

        //获取结果一定要放到最后  不然会阻塞拿到结果在执行
        System.out.println(futureTask.get());
    }

}

辅助类

JUC - 学习笔记

CountDownLatch 辅助类-减少计数

作用: 是一种通用的同步工具 等其他线程执行完了 主线程再关闭 关门走人
原理:

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
package com.bin.concurrent;

import java.util.concurrent.CountDownLatch;

/**
 *  JUC辅助类
 * @Author: 邱成兵
 * @Date: Created in 16:09 2021/5/6
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //减少计数  开始初始 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;
        CountDownLatch countDownLatch=new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println("第"+Thread.currentThread().getName()+"个同学出教室 /t");
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }
        //第二个是完成信号,允许司机等到所有的工作人员完成。
        countDownLatch.await();
        System.out.println("关门");
    }
}

CyclicBarrier 辅助类-循环栅栏 到达共同屏障点的同步辅助

允许一组线程全部等待彼此达到共同屏障点的同步辅助 集齐7龙珠
原理:

  • CyclicBarrier * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是, * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞, * 直到最后一个线程到达屏障时,屏障才会开门,所有 * 被屏障拦截的线程才会继续干活。 * 线程进入屏障通过CyclicBarrier的await()方法。
package com.bin.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 *   允许一组线程全部等待彼此达到共同屏障点的同步辅助
 *
 * @Author: 邱成兵
 * @Date: Created in 16:53 2021/5/6
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //参数1:多少个线程跳闸   参数2:跳闸之后执行的线程
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
            System.out.println("集齐7颗龙珠");
        });
        for (int i = 0; i < 7; i++) {
            final int temInt=i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+" /t 第" +temInt+"颗龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }

    }
}

semaphore -辅助类-信号灯

可做秒杀线程数量控制 不管前端请求多少个 只接受固定能接受的数量
主要用于并发的控制可资源的互斥 限流
如果设置semaphore 为1 等同于 synchronized (场景 一个线程持有一个资源多久 可以用这个实现)
原理:
在信号量上我们定义两种操作: * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), * 要么一直等下去,直到有线程释放信号量,或超时。 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 * * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

package com.bin.concurrent;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 *  信号等
 *  用户并发的控制和资源的互斥   限流
 * @Author: 邱成兵
 * @Date: Created in 17:15 2021/5/6
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //模拟3个车位
        Semaphore semaphore=new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                //资源减一
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"/t 抢到了车位");
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    }catch (Exception e){

                    }
                    System.out.println(Thread.currentThread().getName()+"/t 退出停车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放资源
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

ReadWriteLock 读写锁 (场景:缓存的读写)

读读能共存 读写不能共存 写写不能共存

package com.bin.concurrent;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MyCache{
    private volatile Map<String,Object> map=new HashMap<>();
    //读写锁
    private ReadWriteLock lock=new ReentrantReadWriteLock();

    public void put(String key,Object value){
        try {
            lock.writeLock().lock();
            System.out.println(key+" /t 开始写入数据");
            TimeUnit.MILLISECONDS.sleep(3);
            map.put(key,value);
            System.out.println(key+" /t 写入完成"+value);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.writeLock().unlock();
        }
    }

    public void get(String key) {
        try {
            lock.readLock().lock();
            System.out.println(key+" /t 开始读数据");
            TimeUnit.MILLISECONDS.sleep(3);
            Object o = map.get(key);
            System.out.println(key+" /t 读取读数据"+o);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.readLock().unlock();
        }

    }
}

/**
 *  读写锁
 * @Author: 邱成兵
 * @Date: Created in 10:32 2021/5/7
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache=new MyCache();
        for (int i = 0; i < 5; i++) {
            myCache.put(i+"",i+"");
        }
        for (int i = 0; i < 5; i++) {
            myCache.get(i+"");
        }
    }
}

BlockingQueue - 阻塞队列

不用手动阻塞、唤醒线程了 (wait/await notify/signal notifyall/signalAll) BlockingQueue全部自动实现了

JUC - 学习笔记
JUC - 学习笔记

核心方法

JUC - 学习笔记

package com.bin.concurrent;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 *  阻塞队列
 * @Author: 邱成兵
 * @Date: Created in 14:35 2021/5/7
 */
public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(3);
        blockingQueue.add("a");//添加  超过界限会报异常  queue full :队列已满
        blockingQueue.remove();//移除先进的第一位  没有会报异常
        blockingQueue.element();//检查 没有会报异常

        blockingQueue.offer("a"); //超出返回false
        blockingQueue.poll(); //没有移除的返回 null
        blockingQueue.peek();//检查 没有返回null

        blockingQueue.put("a");//添加  超出会阻塞
        blockingQueue.take();//移除  没有会阻塞

        blockingQueue.offer("a",3l, TimeUnit.SECONDS);//超出多少时间后直接退出
        blockingQueue.poll(3l, TimeUnit.SECONDS);//超出多少时间后直接退出
    }
}

线程池

例子:10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。

线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

JUC - 学习笔记

package com.bin.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *  线程池三大主流使用方法
 * @Author: 邱成兵
 * @Date: Created in 15:59 2021/5/7
 */
public class ThreadDemo {
    public static void main(String[] args) {
        //执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
//        ExecutorService executorService= Executors.newFixedThreadPool(5);
        //一个任务一个任务的执行,一池一线程
//        ExecutorService executorService= Executors.newSingleThreadExecutor();
        //执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
        ExecutorService executorService= Executors.newCachedThreadPool();
        try {
            for (int i = 1; i <= 10; i++) {
                final int teamInt=i;
                executorService.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" /t 办理业务"+teamInt);
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

7大参数详解

1、corePoolSize:线程池中的常驻核心线程数

2、maximumPoolSize:线程池中能够容纳同时
执行的最大线程数,此值必须大于等于1

3、keepAliveTime:多余的空闲线程的存活时间
当前池中线程数量超过corePoolSize时,当空闲时间
达到keepAliveTime时,多余线程会被销毁直到
只剩下corePoolSize个线程为止

4、unit:keepAliveTime的单位

5、workQueue:任务队列,被提交但尚未被执行的任务

6、threadFactory:表示生成线程池中工作线程的线程工厂,
用于创建线程,一般默认的即可

7、handler:拒绝策略,表示当队列满了,并且工作线程大于
等于线程池的最大线程数(maximumPoolSize)时如何来拒绝
请求执行的runnable的策略

JUC - 学习笔记

线程池底层工作原理

JUC - 学习笔记
JUC - 学习笔记

工作底层原理步骤

  1. 提交任务
  2. 判断核心线程是否满 满:进入队列
  3. 判断阻塞队列是否满 满:创建主线程 (一个一个的扩)
  4. 判断主线程是否满 满:拒绝策略

1、在创建了线程池后,开始等待请求。
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1、如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2、如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3、如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4、如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

工作中常用创建的线程池方法

在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?
一个都不用,我们工作中只能使用自定义的
JUC - 学习笔记
OOM(OutOfMemoryError? -JAVA内存溢出 虚拟机暴露故障

ExecutorService executorService=new ThreadPoolExecutor(2,//核心
                5,//主线程
                2L,//过期时间
                TimeUnit.SECONDS,//过期单位
                new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小
                Executors.defaultThreadFactory(), //线程工厂  一般都是用默认的
                new ThreadPoolExecutor.AbortPolicy());//拒绝策略

4大拒绝策略

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

最大线程数该如何设置?

public static void main(String[] args) {
        //cpu的数量
        int cpuNum = Runtime.getRuntime().availableProcessors();
        //cpu密集型   内核数+1或+2
        //IO密集型    1/总核数/阻塞系数

        ExecutorService executorService=new ThreadPoolExecutor(2,//核心
                5,//主线程
                2L,//过期时间
                TimeUnit.SECONDS,//过期单位
                new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小
                Executors.defaultThreadFactory(), //线程工厂  一般都是用默认的
                new ThreadPoolExecutor.AbortPolicy());//拒绝策略  默认抛出异常
    }

一:CPU密集型:

定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。
这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。(或者加1到2)

特点:

01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用

02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了

03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。

package pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo02 {
    public static void main(String[] args) {
        //自定义线程池! 工作中只会使用 ThreadPoolExecutor

        /**
         * 最大线程该如何定义(线程池的最大的大小如何设置!)
         * 1、CPU  密集型,几核,就是几,可以保持CPU的效率最高!
         */

        //获取电脑CPU核数
        System.out.println(Runtime.getRuntime().availableProcessors());    //8核

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,                                        //核心线程池大小
                Runtime.getRuntime().availableProcessors(),   //最大核心线程池大小(CPU密集型,根据CPU核数设置)
                3,                                       //超时了没有人调用就会释放
                TimeUnit.SECONDS,                             //超时单位
                new LinkedBlockingDeque<>(3),                 //阻塞队列
                Executors.defaultThreadFactory(),             //线程工厂,创建线程的,一般不用动
                new ThreadPoolExecutor.AbortPolicy());        //银行满了,还有人进来,不处理这个人的,抛出异常

        try {
            //最大承载数,Deque + Max    (队列线程数+最大线程数)
            //超出 抛出 RejectedExecutionException 异常
            for (int i = 1; i <= 9; i++) {
                //使用了线程池之后,使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池用完,程序结束,关闭线程池
            threadPool.shutdown();      //(为确保关闭,将关闭方法放入到finally中)
        }
    }
}

二:IO密集型:

定义:IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。

我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。

01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。

02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU
时间片让出去,直到缓冲区写满。

既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):

CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU
空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU
时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中

package pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo02 {
    public static void main(String[] args) {
        //自定义线程池! 工作中只会使用 ThreadPoolExecutor

        /**
         * 最大线程该如何定义(线程池的最大的大小如何设置!)
         * 2、IO   密集型  >判断你程序中十分耗IO的线程
         *      程序    15个大型任务   io十分占用资源!  (最大线程数设置为30)
         *      设置最大线程数为十分耗io资源线程个数的2倍
         */

        //获取电脑CPU核数
        System.out.println(Runtime.getRuntime().availableProcessors());   //8核

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,                               //核心线程池大小
                16,                     //若一个IO密集型程序有15个大型任务且其io十分占用资源!(最大线程数设置为 2*CPU 数目)
                3,                                //超时了没有人调用就会释放
                TimeUnit.SECONDS,                 //超时单位
                new LinkedBlockingDeque<>(3),     //阻塞队列
                Executors.defaultThreadFactory(),               //线程工厂,创建线程的,一般不用动
                new ThreadPoolExecutor.DiscardOldestPolicy());  //队列满了,尝试和最早的竞争,也不会抛出异常

        try {
            //最大承载数,Deque + Max    (队列线程数+最大线程数)
            //超出 抛出 RejectedExecutionException 异常
            for (int i = 1; i <= 9; i++) {
                //使用了线程池之后,使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池用完,程序结束,关闭线程池
            threadPool.shutdown();      //(为确保关闭,将关闭方法放入到finally中)
        }
    }
}


总结:

1:高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换

2:并发不高、任务执行时间长的业务这就需要区分开看了:

a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务

b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,线程池中的线程数设置得少一些,减少线程上下文的切换

(其实从一二可以看出无论并发高不高,对于业务中是否是cpu密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)

3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,我们的项目使用的时redis作为缓存(这类非关系型数据库还是挺好的)。增加服务器是第二步(一般*项目的首先,因为不用对项目技术做大改动,求一个稳,但前提是资金充足),至于线程池的设置,设置参考
2
。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦。

三.:总结:

01:一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8
个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU
核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。

02:如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候
CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起
CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。

最大线程数量 引用地址

java.util.function 函数式接口

4大函数式接口

JUC - 学习笔记

  1. Function<T,R> 函数型接口 特点:传入T 有一个返回R
/*Function<String,Integer> function=new Function<String, Integer>() {
            @Override
            public Integer apply(String s) {
                return 1024;
            }
        };*/
        //新写法 lambda 表达式写法
//        Function<String,Integer> function=(s) -> {return 1024;};
		//一个参数可以省略()  返回可以省略{return }
        Function<String,Integer> function=s -> 1024;
        System.out.println(function.apply("a"));
  1. Predicate 断定型接口 特点:返回boolean
/*Predicate<String> predicate=new Predicate<String>() {
            @Override
            public boolean test(String s) {
                return false;
            }
        };*/
        //lambda 表达式
//        Predicate<String> predicate=s -> s.isEmpty();
//lambada 表达式+方法的引用
        Predicate<String> predicate=String::isEmpty;
        System.out.println(predicate.test("qcb"));
  1. Consumer 消费型接口 特点:没有返回值 有参数
/*Consumer<String> consumer=new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };*/
        //lambda 表达式
//        Consumer consumer= s -> System.out.println(s);
        Consumer consumer= System.out::print;
        consumer.accept("a");
  1. Supplier 宫给型接口 特点:无参数 有返回值
/*Supplier<String> supplier=new Supplier<String>() {
            @Override
            public String get() {
                return "aa";
            }
        };*/
        //lambda 表达式
//        Supplier<String> supplier=() -> {return "aa";};
        Supplier<String> supplier=() -> "aa";
        System.out.println(supplier.get());

java.util.stream

流(Stream) 到底是什么呢?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。“集合讲的是数据,流讲的是计算!”

特点:
  1. Stream 自己不会存储元素
  2. Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
  3. Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
阶段:
  1. 创建一个Stream:一个数据源(数组、集合)
  2. 中间操作:一个中间操作,处理数据源数据
  3. 终止操作:一个终止操作,执行中间操作链,产生结果
//题目:请按照给出数据,找出同时满足 *
//偶数ID且年龄大于24且用户名转为大写且用户名字母倒排序 *
// 最后只输出一个用户名字
User u1 = new User(11, "a", 23);
        User u2 = new User(12, "b", 24);
        User u3 = new User(13, "c", 22);
        User u4 = new User(14, "d", 28);
        User u5 = new User(16, "e", 26);
        List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
        list.stream().filter(user -> {
            return user.getId() % 2 == 0 && user.getAge() > 24;  //id偶数 并且 大于24的
        })./*filter(user -> {
            return user.getAge() > 24;
            //Function<T,R> mapper)
        }).*/map(user -> {
            return user.getUserName().toUpperCase(); //把名称转为大写
        }).sorted(/*(user1, user2) -> {
            return user2.compareTo(user1);      //倒叙
        }*/Collections.reverseOrder()).limit(1).forEach(System.out::println);//只查一条

ForkJoinPool 分支合并框架

原理:Fork:把一个复杂任务进行分拆,大事化小Join:把分拆任务的结果进行合并

ForkJoinPool:分支合并池 类比=> 线程池

ForkJoinTask:ForkJoinTask 类比=> FutureTask

RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务

@Override
    protected Integer compute() {
        if((end-begin)<=ADJUST_VALUE){
            for (int i = begin; i <= end; i++) {
                result=result+i;
            }
        }else {
            int middle = (end + begin) / 2;
            MyTask myTask1=new MyTask(begin,middle); //分支1
            MyTask myTask2=new MyTask(middle+1,end);//分支2
            myTask1.fork();//开启分支1
            myTask2.fork();//开启分支2
            result=myTask1.join()+myTask2.join();//合并
        }
        return result;
    }

JUC - 学习笔记

package com.bin.concurrent;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

//RecursiveTask 继承ForkJoinTask 继承 Future 可以调用callable接口 返回计算结果
class MyTask extends RecursiveTask<Integer> {
    private static final Integer ADJUST_VALUE=10;
    private int begin;
    private int end;
    private int result;

    public MyTask(Integer begin, Integer end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if((end-begin)<=ADJUST_VALUE){
            for (int i = begin; i <= end; i++) {
                result=result+i;
            }
        }else {
            int middle = (end + begin) / 2;
            MyTask myTask1=new MyTask(begin,middle);
            MyTask myTask2=new MyTask(middle+1,end);
            myTask1.fork();
            myTask2.fork();
            result=myTask1.join()+myTask2.join();
        }
        return result;
    }
}


/**
 *  分支合并框架
 * @Author: 邱成兵
 * @Date: Created in 18:04 2021/5/8
 */
public class ForkJoinPollDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask=new MyTask(0,100);
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);
        System.out.println(submit.get());
        forkJoinPool.shutdown();
    }
}

CompletableFutrue - 异步调用

package com.bin.concurrent;

import java.util.concurrent.CompletableFuture;

/**
 * 异步调用
 *
 * @Author: 邱成兵
 * @Date: Created in 14:37 2021/5/11
 */
public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        /*CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("a");
            }
        });*/
        //执行异步调用没有返回参数的
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("没有返回值cccccc");
        });
        completableFuture.get();

        /*CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                return 1024;
            }
        });*/
        //执行异步调用有返回参数的
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
//            int a=1024/0;
            return 1024;
        });
        System.out.println(completableFuture1.whenComplete((t, d) -> {
            System.out.println("t:" + t);
            System.out.println("d:" + d);
        }).exceptionally(f -> {
            System.out.println(f.getMessage());
            return 4444;
        }).get());
    }
}

原理:https://blog.csdn.net/finalheart/article/details/87615546

上一篇:JUC三大常用辅助类


下一篇:厚积薄发打卡Day50:JUC并发编程(从Synchronized 到 AQS)