RabbitMQ学习笔记

RabbitMQ学习笔记

  • 简介
    • RabbitMQ基础架构
    • RabbitMQ的6种工作模式:
  • RabbitMQ基本使用
    • 简单模式(一个生产者对应一个消费者)
      • 消息生产端
      • 消息消费端
    • work queues模式(一个生产者对应多个消费者)
    • Publish/Subscribe 发布与订阅模式(通过exchange分发消息)
      • 消息生产端
      • 消息消费端
    • Routing路由模式(Exchange根据routingKey分发消息)
      • 消息生产端
      • 消息消费端
    • Topics主题模式(Exchange根据routingKey模糊定向分发消息)
      • 消息生产端
      • 消息消费端
    • RPC远程调用模式
  • SpringBoot整合RabbitMQ
    • 消息生产端
    • 消息消费端
      • 只通过@RabbitListener实现
      • 通过@RabbitListener和@RabbitHandler实现
    • Message 内容对象序列化与反序列化
      • 生产端序列化
        • 使用默认SimpleMessageConverter
        • 使用Jackson2JsonMessageConverter
          • 一个小坑
      • 消费端反序列化
    • @Payload 与 @Headers注解
  • 消息的可靠投递(保证生产端发送数据给MQ的可靠性)
    • confirm确认模式
    • return退回模式
  • Consumer ACK(保证MQ发送数据给消费端的可靠性)
    • channel.basicReject()与channel.basicNack()的区别
      • channel.basicReject()
      • channel.basicNack()
  • 消费端限流
  • TTL(生产端设置)
    • 设置队列TTL
    • 设置消息TTL
    • 注意点(关注)
  • 死信队列
  • 延迟队列

简介

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品

AMQP消息队列角色解析:
Publisher:消息生产者
Exchange:交换机,负责分发消息
Queue:存储消息的容器
Consumer:消息消费者

上图中矩形框起来的部分就是消息队列中间件
整体运作流程为:
Publisher生产出消息发布给Exchange,Exchange根据不同的规则将消息以Routes(路由)的形式分发给不同的Queue,Queue中保存消息,Consumer会监听Queue中保存的消息,当有自己需要的消息出现时就从Queue中取出并进行消费。

RabbitMQ基础架构

Broker为RabbitMQ的服务端,两侧的Producer和Consumer称为客户端
客户端通过Connection(TCP链接)与服务端进行通信,但如果每次通信都新建TCP链接则会造成资源浪费。所以在每个Connection中都存在多个channel(可以理解为轻量级的Connection),最终通过channel进行通信

接下来看Broker。其中存在多个Virtual Host(虚拟机),出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分多
个Virtual Host,每个用户在自己的Virtual Host创建exchange / queue等。

每个Virtual Host中都存在很多个Exchange和Queue,每个Exchange都可以与一个或多个Queue绑定(Binding)。

  • Exchange是message到达broker的第一站, 根据分发规则,匹配查询表中的routing key,根据匹配到的routing key分发消息到对应的Queue中去。Exchange的常用类型有: direct (point to- point), topic (publish subscribe) and fanout (multicast)
  • Binding是Exchange和Queue之间的虚拟连接, Binding中可以包含routing key。Binding信息被保存到Exchange中的查询表中,作为message的分发依据

RabbitMQ的6种工作模式:

简单模式、work queues模式、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式

RabbitMQ基本使用

引入依赖:

dependencies {
    implementation 'com.rabbitmq:amqp-client:5.7.2'
}

简单模式(一个生产者对应一个消费者)

架构图:

注意:不存在自己创建的交换机,只有默认的交换机

消息生产端

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProductMsg {
    public static void main(String[] args) {
        // 1.创建链接(Connection)工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
        connectionFactory.setHost("服务器IP");
        // 端口默认为5672
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("rabbitmq登录用户名");
        connectionFactory.setPassword("rabbitmq登录密码");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3.创建链接
            connection = connectionFactory.newConnection();
            // 4.创建Channel
            channel = connection.createChannel();
            // 5.创建队列Queue
            /**
             * 参数解析
             * String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
             * queue:设置发送消息队列的名称
             * durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
             * exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
             * autoDelete:设置当没有消费者时是否自动删除该队列
             * arguments:设置删除队列时的参数
             */
            // 如果该名称队列不存在则会创建否则不会
            channel.queueDeclare("lc_test",true,false,false,null);
            // 6.发送消息
            /**
             * 参数解析
             * String exchange, String routingKey, BasicProperties props, byte[] body
             * exchange:设置交换机名称。简单模式下会使用默认的""
             * routingKey:路由名称,当队列名称与路由名称相同时才能绑定
             * props:配置信息
             * body:具体发送的消息数据
             */
            String msg = "hello_rabbitmq";
            channel.basicPublish("","lc_test",null,msg.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            // 7. 释放资源
        	try {
                if (!Objects.isNull(connection) && !Objects.isNull(channel)) {
                    channel.close();
                    connection.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

这里生产了一条信息,内容是hello_rabbitmq字符数组

消息消费端

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumeMsg {
    public static void main(String[] args) {
        // 1.创建链接(Connection)工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
        connectionFactory.setHost("服务器IP");
        // 端口默认为5672
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("rabbitmq登录用户名");
        connectionFactory.setPassword("rabbitmq登录密码");
        Connection connection = null;
        try {
            // 3.创建链接
            connection = connectionFactory.newConnection();
            // 4.创建Channel
            Channel channel = connection.createChannel();
            // 5.创建队列Queue
            /**
             * 参数解析
             * String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
             * queue:设置发送消息队列的名称
             * durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
             * exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
             * autoDelete:设置当没有消费者时是否自动删除该队列
             * arguments:设置删除队列时的参数
             */
            // 如果该名称队列不存在则会创建否则不会
            channel.queueDeclare("lc_test",true,false,false,null);

            // 6.接收消息
            /**
             * 参数解析
             * String queue, boolean autoAck, Consumer callback
             * queue:设置获取消息队列的名称
             * autoAck:设置是否自动确认,确认是指消费者收到消息会告诉mq收到消息了
             * callback:回调对象
             */
            // 这里的队列名称要与发送消息所使用的队列名称一致
            Consumer Consumer = new DefaultConsumer(channel){
                // 回调方法,当收到消息后会自动执行该方法
                /**
                 * 参数解析
                 * @param consumerTag 消息标识
                 * @param envelope 获取一些信息,比如交换机、routingKey等相关信息
                 * @param properties 配置信息
                 * @param body 具体数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumerTag: "+consumerTag);
                    System.out.println("envelope: exchangeMsg: "+envelope.getExchange()+"routingKeyMsg: "+envelope.getRoutingKey());
                    System.out.println("properties: "+properties);
                    System.out.println("body: "+new String(body));
                }
            };
            channel.basicConsume("lc_test",true,Consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }
}

这里获取信息后的处理关键在Consumer回调对象的重写回调方法(handleDelivery)中,通过该回调方法可以获取到具体的消息。
注意:消息消费端不要立即关闭Connection,Channel资源,我们需要其处在一个持续监听状态。如果关闭了资源就无法监听了。
此外消费端可以不用创建队列Queue,因为已经在生产端创建了,只需监听对应队列即可

work queues模式(一个生产者对应多个消费者)

架构图:

虽然是多个消费者但它们彼此之间是竞争关系,一条消息只能被一个消费者取到

作用:在任务过重的情况下该模式能提高任务处理的速度
比如队列中有1000条消息,有两个消费者,每个消费者只能消费500条消息。如果采用简单模式那么消息是来不及消费的,如果采用work queues模式则可以加入两个消费者来消费消息,此时能达到要求(采用轮询的方式,消费者1拿完消费者2拿再1拿再2拿…)

两端代码与简单模式一致,区别在于可以创建多个消费者端来监听同一条队列

Publish/Subscribe 发布与订阅模式(通过exchange分发消息)

架构图:

图中的X即为交换机

Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定的交换机队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

通过交换机可以实现一条消息被多个消费者消费
注意:Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

消息生产端

public class ProductPubSub {
    public static void main(String[] args) {
        // 1.创建链接(Connection)工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
         connectionFactory.setHost("服务器IP");
        // 端口默认为5672
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("rabbitmq登录用户名");
        connectionFactory.setPassword("rabbitmq登录密码");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3.创建链接
            connection = connectionFactory.newConnection();
            // 4.创建Channel
            channel = connection.createChannel();
            // 5.创建交换机
            /**
             * 参数解析
             * String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments
             * exchange:设置交换机名称
             * type:设置交换机类型(4种,枚举)
             *      DIRECT("direct"):定向
             *      FANOUT("fanout"):广播,发送消息给每个与该交换机绑定的队列
             *      TOPIC("topic"):通配符方式
             *      HEADERS("headers"):参数匹配(不常用)
             * durable:是否持久化
             * autoDelete:自动删除
             * internal:是否mq内部使用(一般设为false)
             * arguments:参数列表
             */
            String exchangeName = "test_fanout";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            // 6.创建队列Queue
            /**
             * 参数解析
             * String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
             * queue:设置队列名称
             * durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
             * exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
             * autoDelete:设置当没有消费者时是否自动删除该队列
             * arguments:设置删除队列时的参数
             */
            // 如果该名称队列不存在则会创建否则不会
            String queue1Name = "lc_test1";
            String queue2Name = "lc_test2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            // 7.绑定队列和交换机
            /**
             * 参数解析
             * String queue, String exchange, String routingKey
             * queue:被绑定队列名称
             * exchange:被绑定交换机名称
             * routingKey:路由键,即绑定规则
             */
            // 当交换机类型为fanout时,需要将routingKey设置为""以实现广播效果
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
            // 8.发送消息
            /**
             * 参数解析
             * String exchange, String routingKey, BasicProperties props, byte[] body
             * exchange:设置交换机名称。简单模式下会使用默认的""
             * routingKey:路由名称,当队列名称与路由名称相同时才能绑定
             * props:配置信息
             * body:具体发送的消息数据
             */
            String msg = "hello_rabbitmq";
            channel.basicPublish(exchangeName,"",null,msg.getBytes());
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 9. 释放资源
            try {
                if (!Objects.isNull(connection) && !Objects.isNull(channel)) {
                    channel.close();
                    connection.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

这里创建了一个交换机(test_fanout)和两个队列(lc_test1和lc_test2),并且将这两个队列与交换机绑定。交换机类型设置为fanout
广播类型,这样同一条消息会由交换机分发给这两个队列进行消费。注意设置交换机的routingKey为""(空字符串)

消息消费端

与之前无异,注意修改监听的队列名称即可。可以创建多个消费者来监听不同的队列

Routing路由模式(Exchange根据routingKey分发消息)

架构图:

队列与交换机的绑定,不再是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey

Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routing key完全—致,才会接收到消息

消息生产端

package com.lc.example;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

/**
 * @Title ProductMsg
 * @Author LC
 * @Description //TODO $
 * @Date $ $
 **/
public class ProductRouting {
    public static void main(String[] args) {
        // 1.创建链接(Connection)工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
       	// 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
        connectionFactory.setHost("服务器IP");
        // 端口默认为5672
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("rabbitmq登录用户名");
        connectionFactory.setPassword("rabbitmq登录密码");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3.创建链接
            connection = connectionFactory.newConnection();
            // 4.创建Channel
            channel = connection.createChannel();
            // 5.创建交换机
            /**
             * 参数解析
             * String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments
             * exchange:设置交换机名称
             * type:设置交换机类型(4种,枚举)
             *      DIRECT("direct"):定向
             *      FANOUT("fanout"):广播,发送消息给每个与该交换机绑定的队列
             *      TOPIC("topic"):通配符方式
             *      HEADERS("headers"):参数匹配(不常用)
             * durable:是否持久化
             * autoDelete:自动删除
             * internal:是否mq内部使用(一般设为false)
             * arguments:参数列表
             */
            String exchangeName = "test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
            // 6.创建队列Queue
            /**
             * 参数解析
             * String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
             * queue:设置队列名称
             * durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
             * exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
             * autoDelete:设置当没有消费者时是否自动删除该队列
             * arguments:设置删除队列时的参数
             */
            // 如果该名称队列不存在则会创建否则不会
            String queue1Name = "lc_test1_direct";
            String queue2Name = "lc_test2_direct";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            // 7.绑定队列和交换机
            /**
             * 参数解析
             * String queue, String exchange, String routingKey
             * queue:被绑定队列名称
             * exchange:被绑定交换机名称
             * routingKey:路由键,即绑定规则
             */
            // 当交换机类型为direct时,需要将routingKey设置为不同的string以实现定向分发效果
            // queue1的routingKey为error,所有routingKey也为error的消息都会分发给queue1
            channel.queueBind(queue1Name,exchangeName,"error");
            // queue2的routingKey为info和warning,所有routingKey也为info或warning的消息都会分发给queue2
            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"warning");
            // 8.发送消息
            /**
             * 参数解析
             * String exchange, String routingKey, BasicProperties props, byte[] body
             * exchange:设置交换机名称。简单模式下会使用默认的""
             * routingKey:路由名称,当队列名称与路由名称相同时才能绑定
             * props:配置信息
             * body:具体发送的消息数据
             */
            for (int i=0 ; i<10 ; i++){
                String msg = "hello_rabbitmq"+i;
                if (0 == i%2) {
                    channel.basicPublish(exchangeName,"error",null,(msg+"error").getBytes());
                }else {
                    channel.basicPublish(exchangeName,"info",null,(msg+"info").getBytes());
                    channel.basicPublish(exchangeName,"warning",null,(msg+"warning").getBytes());
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 9. 释放资源
            try {
                if (!Objects.isNull(connection) && 
上一篇:【北京迅为】《STM32MP157开发板嵌入式开发指南》- 第十五章 Linux 文件系统概念


下一篇:IPv6常见问题解答