Java阻塞队列BlockingQueue(生产者消费者模型)

文章目录


阻塞队列

阻塞队列是一种特殊的队列,JDK中提供了很多种阻塞队列,不过我们常见的就是 LinkedBlockingDeque 和 PriorityBlockingQueue,它们都实现那了BlockingQueue接口,这里主要是使用LinkedBlockingDeque 来实现生产者消费者模型。

Java阻塞队列BlockingQueue(生产者消费者模型)
Java阻塞队列BlockingQueue(生产者消费者模型)
LinkedBlockingDeque是一个用链表实现的有界阻塞队列,此队列的默认和最大长度为 Integer.MAX_VALUE。此队列遵循先进先出的原则。
阻塞队列中提供了 put和tack方法,这两个方法是带有阻塞功能的。

  • put 方法用来入队列,如果队列满了,将阻塞直到有空间可用为止才继续入队列
  • take 方法用来出队列,如果队列为空,就会处于阻塞到队列中有元素才会开始出队列
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性

生产者消费者模型

通过阻塞队列就可以实现生产者-消费者这种设计模式,该模式将"找出需要完成的工作"与"执行工作"这个两个过程分离开来,并把工作项放入一个“待完成”列表中以便在随后处理,而不是找出后立即处理。生产者消费者模型能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用过程解耦开来以简化工作负载管理。

在基于阻塞队列构建的生产者-消费者模型中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据,生产者不需要知道消费者的标识或数量,也不需要知道它们是否是唯一生产者,只需要将数据放入队列即可。同样,消费者也不需要知道生产者是谁,只管从队列里拿数据就好了。

举个例子:
以两个人洗盘子为例,两者的的劳动分工也是一种生产者-消费者模型;其中一个人把洗好的盘子放在架子上,而另一个人从盘架取出盘子并它烘干。
在这个例子中,盘架相当于阻塞队列,如果盘架上没有盘子,那么消费者会一直等待,直到盘架有盘子需要烘干。如果盘架上放满了,那么生产者会停止清洗知道盘架上有更多的空间,。我们可以将这种类比扩展为多个生产者和多个消费者,每个工人直需要和盘架打交道。人们不需要知道究竟有多少生产者或消费者。

再举个列子:
我们国家的三峡大坝,在汛期的时候就会关闸存上游的水,在旱季的时候就会适当开闸放水,从而保证下游不会发生水灾和干旱。
这个列子中,上游就是生产者,下游就是消费者,大坝就是阻塞队列。

生产者消费者模型是再服务器开发中非常常用的一直编程手段。
它有两个最大的用途

  1. 解耦合

就像上面说的例子,生产者和消费者都不需要知道对方是谁只需要做自己的事情,如果有多个生产者和消费者也并不影响,以后如果要拓展也比较容易。

  1. 削峰填谷

如果有一大波请求来冲击我们的服务器,我们通过阻塞队列也能保证后续的服务器仍然是以固定的速率来消费数据
Java阻塞队列BlockingQueue(生产者消费者模型)

实现生产者消费者模型

  1. 先创建一个阻塞队列
  2. 再创建两个线程,一个生产者线程,一个消费者线程
  3. 接着启动线程就好了

直接用自带的 LinkedBlockingQueue 类实现生产者消费者模型,比较简单

public static void main(String[] args) throws InterruptedException {
        //指定阻塞队列容量为 100
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100);
        //put 带有阻塞功能, offer不带有阻塞功能
        queue.put(10);
        //take带有阻塞功能,poll不带有阻塞功能
        queue.take();
        //生产者线程
        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    try {
                        //Thread.sleep(1000);
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("生产了 "+i);
                }
            }
        });
        producer.start();
        //消费者线程
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(1000);
                        Integer value = queue.take();
                        System.out.println("消费了 "+value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

模拟实现阻塞队列

通过一个数组实现一个循环队列,再加锁就能实现一个简单的阻塞队列

static class LinkedBlockingQueue {
        private int start;
        private int end;
        private int[] array;
        //阻塞队列元素个数
        private int size;
        //锁对象
        Object lock;

        public LinkedBlockingQueue() {
            this.array = new int[Integer.MAX_VALUE];
            this.lock = new Object();
        }
        public LinkedBlockingQueue(int capacity) {
            this.array = new int[capacity];
            this.lock = new Object();
        }

        public void put(int value) throws InterruptedException {
            synchronized (lock) {
                //判断队列是否已满
                while (this.size == this.array.length) {
                    lock.wait();
                }
                this.array[this.end++] = value;
                this.size++;
                //判断end是否已经到达数组末尾
                if (this.end == this.array.length) {
                    this.end = 0;
                }
                //每次插入都唤醒一个线程
                lock.notify();
            }
        }
        public int tack() throws InterruptedException {
            synchronized (lock) {
                //判断队列是否为空
                while (this.size == 0) {
                    lock.wait();
                }
                int value = this.array[this.start++];
                this.size--;
                //判断start是否走到数组末尾
                if (this.start == this.array.length) {
                    this.start = 0;
                }
                //唤醒一个线程
                lock.notify();
                return value;
            }
        }
上一篇:Java并发队列BlockingQueue实现之LinkedBlockingQueue源码分析


下一篇:阻塞队列示例