RabbitMQ——入门

RabbitMQ工作原理

四大核心概念

  • 生产者:产生数据发送消息的程序是生产者
  • 交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  • 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

RabbitMQ——入门

Hello World

在本节中将学习使用RabbitMQ创建一个最简单的例子

引入依赖

<!--rabbitmq 依赖客户端-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.6</version>
</dependency>

编写生产者代码

public class Producer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.130");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列中的消息是否支持持久化,false:消息存在内存中,true:消息存在硬盘中
         * 3.该队列是否只供一个消费者消费,false:只提供一个消费者消费
         * 4.最后一个消费者断开连接之后该队列是否自动删除,true为自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";
        /**
         * 发送一个消息
         * 1.发送消息到哪个交换机,简单模式所以不需要交换机,填空字符串即可
         * 2.路由的key是哪个
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

当消息发送完毕后,我们可以查看客户端,发现队列中多了一个名为hello的队列,接下来我们继续编写消费者代码

RabbitMQ——入门

编写消费者代码

public class Consumer {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.130");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.队列名称
         * 2.消费成功后是否自动应答,true为自动应答,false不自动
         * 3.消费者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
    }
}

当我们执行消费者代码时,会输出hello world,并且客户端中的消息以及被消费掉了

RabbitMQ——入门

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。我们通过一张图,来理解一下。

RabbitMQ——入门

轮流分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程 是如何工作的。

编写工具类

从helloworld案例中我们发现了创建工厂、获取信道一系列的工作都是重复的,所以将他整合为一个工具类,方便日后的使用

public class RabbitMqUtils {
    public static Channel getChannel(){
        Channel channel = null;
        try {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.200.130");
            factory.setUsername("admin");
            factory.setPassword("admin");
            //创建连接
            Connection connection = factory.newConnection();
            //获取信道
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
}

编写工作线程

public class Work01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("接收到的消息为:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        System.out.println("工作线程B正在等待消息...");
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

idea是支持同一个main方法多次运行的,需要进行一下设置,可以参考这位博主的博客 点击跳转

完成之后我们启动两次,一次为工作线程A,另一次为工作线程B

编写消费者

因为我们需要发送多次消息,可以通过控制台的输入来发送消息

public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("请求发送成功:" + message);
        }
    }
}

测试

我们输入AA,BB,CC,DD来看一下工作线程获取的结果,如果是依次获取,就证明了我们一开始的说法

RabbitMQ——入门

消息应答

作用

举例说明:还是有两个工作线程来处理消息,比如工作线程A处理消息的时间较长,而在处理消息的过程中宕机了,如果不做任何处理,就会出现消息丢失的情况。所以我们需要一个消息应答,告诉RabbitMQ消息已经处理完毕之后,MQ才可以将消息删除,否则回到队列中。

RabbitMQ——入门

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。

消息应答的方法

  • channel.basicAsk()

    用于肯定应答,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

  • channel.basicNack()

    用于否定应答

  • Channel.basicReject()

    用于否定应答,与channel.basicNack()相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

编写生产者代码

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

编写两个工作线程代码

这里我们让一个工作线程完成速度为1秒,另外一个为30秒,模拟工作任务时长的效果

public class Work02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程A正在等待消息,处理较快...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

public class Work03 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程B正在等待消息,处理较慢...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

测试

启动三个方法,正常的我们输入aa,bb,cc分别交由线程A和线程B处理,当我们再次输入dd时,是交给线程B处理的,这时候手动关闭进程,模拟宕机状况,我们发现线程A接替他处理了dd消息,这样就做到了消息不丢失的效果。

RabbitMQ——入门

RabbitMQ持久化操作

队列持久化

之前我们创建的队列都是非持久化的,rabbitmq如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable参数设置为持久化

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        boolean durable = true; //设置队列持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

更改完成重启之后我们发现,该队列的状态被D标志,即持久化

RabbitMQ——入门

消息持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //生成队列
        boolean durable = true; //设置队列持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            // MessageProperties.PERSISTENT_TEXT_PLAIN 设置消息持久化
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("请求发送成功:" + message);
        }
    }
}

不公平分发

在之前的课程中我们说过了,MQ在分发消息时是绝对公平的,从上述例子中我们也可以发现,无论B执行的多慢,所有消息都是A一条,B一条,这个其实照我来说确实不公平的。应该是能者多劳的形式,A做完了应该可以继续做。我们需要修改工作线程代码,如下

public class Work03 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程B正在等待消息,处理较慢...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        //设置不公平分发,1为不公平分发
        channel.basicQos(1);
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

通过测试可以发现,因为B要工作要沉睡30秒,A完成之后可以获取接下来的工作,而不是继续分发给B

RabbitMQ——入门

预取值

预取值的概念相当于我作为领导,我已经分配好了,一共7个任务,我给B分配5个任务,给A分配2个任务,通过代码如下设置

public class Work02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("工作线程A正在等待消息,处理较快...");
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息为:" + new String(message.getBody()));
            /**
             * 手动应答
             * 1.消息标记 tag消息标记 tag
             * 2.是否批量应答消息,建议不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag) ->{
            System.out.println(consumerTag + ":接收消息中断");
        };
        //接收消息
        boolean autoAck = false;
        //设置不公平分发,1为不公平分发,大于1代表预取值
        channel.basicQos(2);
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

这里我们需要通过一张图片说一下这个预取值的概念

RabbitMQ——入门

上一篇:photoshop大揭密:解密红外人像摄影的拍摄和PS后期处理过程


下一篇:windows下安装RabbitMQ