线程之线程池

目标

  1. 线程的等待和通知

  2. 生产者消费者模式

  3. 线程池

线程的等待和通知

Object类中的方法

  • wait() 让当前线程进入等待状态,直到被通知为止

  • wait(long) 让当前线程进入等待状态,同时设置时间;直到被通知为止或时间结束

  • notify() 随机通知一个等待线程

  • notifyAll() 通知所有的等待线程

注意:等待和通知方法必须是锁对象,否则会抛出IllegalMonitorStateException

/**
 * 通过锁对象将线程等待,经过5秒通知该线程来执行
 */
public class WaitDemo {

    public synchronized void print() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + "--->" +i);
            if(i == 50){
                //让当前线程等待
                this.wait();
            }
        }
    }

    public synchronized void notifyTest(){
        //让等待的线程执行
        this.notifyAll();
    }

    public static void main(String[] args) {
        WaitDemo waitDemo = new WaitDemo();
        new Thread(()->{
            try {
                waitDemo.print();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitDemo.notifyTest();
    }
}

wait()和sleep()的区别

1.wait()方法时object类的方法 sleep()是Thread类的方法
2.sleep()让线程暂停一段时间,时间一到自动恢复执行,不设计线程间的通信 调用sleep()方法不会释放锁。
Wait() 调用后线程会释放占用的锁,用于线程间的通信,只有其他线程调用notify()方法或者notifyall()才醒来
3.使用域不同 wait()方法必须放在同步代码块和同步控制方法中使用,sleep()方法则可以放在任何地方使用
4.sleep()方法必须捕获异常 而wait() notify() notifyall() 不需要捕获异常
在sleep过程中 可能被其他对象调用它的interrupt() 产生interruptedException 由于sleep不会释放锁标志 容易导致死锁问题的发生 因此一般情况下 推荐使用wait() 方法.
 

生产者消费者模式

在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。结构图如下:

线程之线程池

实现过程:

  1. 通过添加缓冲区,设置上限

  2. 生产者生产数据,向缓冲区存放,如果满了,生产者进入等待,直到缓冲区有空的位置通知生产者生产;

  3. 消费者从缓冲区取数据进行消费,如果空了,消费者进入等待,直到缓冲区有数据再通知消费者消费。

解决问题:

  • 解耦

  • 提高并发性能

  • 解决忙闲不均    

案例实现

包子铺卖包子

import java.util.ArrayList;
import java.util.List;

/**
 * 包子铺
 */
public class BaoziShop {
    /**
     * 包子
     */
    class Baozi{
        private int id;
        public Baozi(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "包子--" + id;
        }
    }
    //上限
    public static final int MAX_COUNT = 100;
    //缓冲区 存放数据
    private List<Baozi> baozis = new ArrayList<>();

    /**
     * 做包子
     */
    public synchronized void makeBaozi() throws InterruptedException {
        //判断缓冲区是否满了
        if(baozis.size() == MAX_COUNT){
            System.out.printf("缓冲区满了,%s等待%n",Thread.currentThread().getName());
            //让生产者线程等待
            this.wait();
        }else{
            //通知生产者线程生产
            this.notifyAll();
        }
        //创建包子
        Baozi baozi = new Baozi(baozis.size() + 1);
        System.out.println(Thread.currentThread().getName()+"做了"+baozi);
        //保存到缓冲区
        baozis.add(baozi);
    }

    /**
     * 拿包子
     */
    public synchronized void takeBaozi() throws InterruptedException {
        //判断缓冲区是否空了
        if(baozis.size() == 0){
            System.out.printf("缓冲区空了,%s等待%n", Thread.currentThread().getName());
            //让消费者等待
            this.wait();
        }else{
            //通知消费者消费
            this.notifyAll();
        }
        //获得第一个包子,并删除
        if(baozis.size() > 0){
            Baozi baozi = baozis.remove(0);
            System.out.println(Thread.currentThread().getName()+"吃了"+baozi);
        }
    }

    public static void main(String[] args) {
        BaoziShop baoziShop = new BaoziShop();
        //一个生产者
        Thread productor = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    baoziShop.makeBaozi();
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        productor.start();
        //10个消费者吃10个包子
        for (int i = 0; i < 10; i++) {
            Thread consumer = new Thread(() ->{
                try {
                    for (int j = 0; j < 10; j++) {
                        baoziShop.takeBaozi();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            consumer.start();
        }
    }
}

可应用场景:

  • 12306售票

  • 消息队列

  • 线程池

  • 等等。。。。

阻塞队列

应用了生产者消费者模式的集合,能够根据数据满或空的情况,自动对线程执行等待和通知

BlockingQueue 接口

  • put 添加数据,达到上限会自动让线程等待

  • take 取并删除数据,数据空了会自动让线程等待

实现类

ArrayBlockingQueue 类 数据结构为数组

LinkedBlockingQueue类 链表结构

/**
 * 包子铺
 */
public class BaoziShop2 {
    /**
     * 包子
     */
    static class Baozi{
        private int id;
        public Baozi(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "包子--" + id;
        }
    }


    public static void main(String[] args) {
        //阻塞队列
        BlockingQueue<Baozi> baozis = new ArrayBlockingQueue<>(100);
        //生产者线程
        new Thread(() -> {
            for (int i = 0; i < 200; i++) {
                //创建包子,添加到阻塞队列,满了就自动阻塞线程
                Baozi baozi = new Baozi(baozis.size() + 1);
                try {
                    baozis.put(baozi);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"生产了"+baozi);
            }
        }).start();
        //消费者线程
        for(int i = 0;i < 5;i++){
            new Thread(() -> {
                //取包子,空了会自动阻塞
                for (int j = 0; j < 40; j++) {
                    try {
                        Baozi baozi = baozis.take();
                        System.out.println(Thread.currentThread().getName()+"消费了"+baozi);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

线程池

作用:回收利用线程资源

线程是一种宝贵的系统资源,执行完任务后会死亡,如果有大量任务需要处理,需要频繁的创建和销毁线程,造成系统性能降低。

线程池会保存一定量的线程,线程执行完任务后,会回到线程池中,等待下一个任务,节省系统资源,提升性能。

线程池的使用

顶层接口:Executor

  • execute(Runnable) 启动线程执行一个任务

ExecutorService

继承 Executor

添加线程池管理方法,如:shutdown()、shutdownNow()

Executors

用于创建线程池的工具类

主要的方法

方法名 说明
newCachedThreadPool() 创建长度不限的线程池
newFixedThreadPool(int ) 创建固定长度的线程池
newSingleThreadExecutor() 创建单一个数的线程池
newScheduledThreadPool(int) 创建可以调度的线程池
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {

    public static void useCachedThreadPool(){
        //创建长度不限的线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            int n = i;
            //使用线程池启动线程
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"执行了任务" + n);
            });
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }

    public static void useFixedThreadPool(){
        //创建长度固定的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 100; i++) {
            int n = i;
            //使用线程池启动线程
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"执行了任务"+n);
            });
        }
        executorService.shutdown();
    }

    public static void useSingleThreadPool(){
        //创建单一长度的线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            int n = i;
            //使用线程池启动线程
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+"执行了任务"+n);
            });
        }
        executorService.shutdown();
    }

    public static void useScheduledThreadPool(){
        //获得可调度的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //执行可调度任务
        System.out.println("------------");
        scheduledExecutorService.scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName()+"---->"+ LocalDateTime.now());
        },1,3, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
//        useCachedThreadPool();
//        useFixedThreadPool();
//        useSingleThreadPool();
        useScheduledThreadPool();
    }
}

线程池的优化配置

线程池的实现类

ThreadPoolExecutor

线程池的构造方法参数:

  • corePoolSize 核心线程数,创建线程池后自带线程,不会进行销毁

  • maximumPoolSize 最大线程数

  • keepAliveTime 存活时间,非核心线程能够闲置的时间,超过后被销毁

  • timeUnit 时间单位

  • blockingQueue 阻塞队列 存放任务(Runnable)的集合

优化配置

  1. 核心线程数 应该和CPU内核数量相关 CPU内核数 * N (N和任务执行需要时间和并发量相关)

    Runtime.getRuntime().availableProcessors()
  2. 最大线程数可以和核心线程数一样,避免频繁创建和销毁线程

  3. 如果存在非核心线程,设置大一点,避免频繁创建和销毁线程

  4. 阻塞队列使用LinkedBlockingQueue,插入和删除任务效率更高

线程池的实现原理

问题:线程池是如何对线程进行回收利用的?

所有线程保存在HashSet中

/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

线程之线程池

总结

  1. 线程的等待和通知(是干什么,谁来调用)

  2. wait和sleep的区别

  3. 生产者和消费者模式(是什么,解决什么问题,如何实现的,项目中有哪些应用)

  4. 阻塞队列(了解)

  5. 线程池(作用,有哪几种线程池,线程池的创建有哪些参数,如何优化,实现原理(看源码、画图、归纳))

上一篇:【高德地图API】从零开始学高德JS API(一)地图展现——仙剑地图,麻点图,街景,室内图


下一篇:ufs 2.2 协议扫盲(六)