RabbitMQ学习笔记
- 简介
- RabbitMQ基础架构
- RabbitMQ的6种工作模式:
- RabbitMQ基本使用
- 简单模式(一个生产者对应一个消费者)
- 消息生产端
- 消息消费端
- work queues模式(一个生产者对应多个消费者)
- Publish/Subscribe 发布与订阅模式(通过exchange分发消息)
- 消息生产端
- 消息消费端
- Routing路由模式(Exchange根据routingKey分发消息)
- 消息生产端
- 消息消费端
- Topics主题模式(Exchange根据routingKey模糊定向分发消息)
- 消息生产端
- 消息消费端
- RPC远程调用模式
- SpringBoot整合RabbitMQ
- 消息生产端
- 消息消费端
- 只通过@RabbitListener实现
- 通过@RabbitListener和@RabbitHandler实现
- Message 内容对象序列化与反序列化
- 生产端序列化
- 使用默认SimpleMessageConverter
- 使用Jackson2JsonMessageConverter
- 一个小坑
- 消费端反序列化
- @Payload 与 @Headers注解
- 消息的可靠投递(保证生产端发送数据给MQ的可靠性)
- confirm确认模式
- return退回模式
- Consumer ACK(保证MQ发送数据给消费端的可靠性)
- channel.basicReject()与channel.basicNack()的区别
- channel.basicReject()
- channel.basicNack()
- 消费端限流
- TTL(生产端设置)
- 设置队列TTL
- 设置消息TTL
- 注意点(关注)
- 死信队列
- 延迟队列
简介
RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品
AMQP消息队列角色解析:
Publisher:消息生产者
Exchange:交换机,负责分发消息
Queue:存储消息的容器
Consumer:消息消费者
上图中矩形框起来的部分就是消息队列中间件
整体运作流程为:
Publisher生产出消息发布给Exchange,Exchange根据不同的规则将消息以Routes(路由)的形式分发给不同的Queue,Queue中保存消息,Consumer会监听Queue中保存的消息,当有自己需要的消息出现时就从Queue中取出并进行消费。
RabbitMQ基础架构
Broker为RabbitMQ的服务端,两侧的Producer和Consumer称为客户端
客户端通过Connection(TCP链接)与服务端进行通信,但如果每次通信都新建TCP链接则会造成资源浪费。所以在每个Connection中都存在多个channel(可以理解为轻量级的Connection),最终通过channel进行通信
接下来看Broker。其中存在多个Virtual Host(虚拟机),出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分多
个Virtual Host,每个用户在自己的Virtual Host创建exchange / queue等。
每个Virtual Host中都存在很多个Exchange和Queue,每个Exchange都可以与一个或多个Queue绑定(Binding)。
- Exchange是message到达broker的第一站, 根据分发规则,匹配查询表中的routing key,根据匹配到的routing key分发消息到对应的Queue中去。Exchange的常用类型有: direct (point to- point), topic (publish subscribe) and fanout (multicast)
- Binding是Exchange和Queue之间的虚拟连接, Binding中可以包含routing key。Binding信息被保存到Exchange中的查询表中,作为message的分发依据
RabbitMQ的6种工作模式:
简单模式、work queues模式、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式
RabbitMQ基本使用
引入依赖:
dependencies {
implementation 'com.rabbitmq:amqp-client:5.7.2'
}
简单模式(一个生产者对应一个消费者)
架构图:
注意:不存在自己创建的交换机,只有默认的交换机
消息生产端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProductMsg {
public static void main(String[] args) {
// 1.创建链接(Connection)工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
connectionFactory.setHost("服务器IP");
// 端口默认为5672
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("rabbitmq登录用户名");
connectionFactory.setPassword("rabbitmq登录密码");
Connection connection = null;
Channel channel = null;
try {
// 3.创建链接
connection = connectionFactory.newConnection();
// 4.创建Channel
channel = connection.createChannel();
// 5.创建队列Queue
/**
* 参数解析
* String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
* queue:设置发送消息队列的名称
* durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
* exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
* autoDelete:设置当没有消费者时是否自动删除该队列
* arguments:设置删除队列时的参数
*/
// 如果该名称队列不存在则会创建否则不会
channel.queueDeclare("lc_test",true,false,false,null);
// 6.发送消息
/**
* 参数解析
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:设置交换机名称。简单模式下会使用默认的""
* routingKey:路由名称,当队列名称与路由名称相同时才能绑定
* props:配置信息
* body:具体发送的消息数据
*/
String msg = "hello_rabbitmq";
channel.basicPublish("","lc_test",null,msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 7. 释放资源
try {
if (!Objects.isNull(connection) && !Objects.isNull(channel)) {
channel.close();
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
这里生产了一条信息,内容是hello_rabbitmq字符数组
消息消费端
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumeMsg {
public static void main(String[] args) {
// 1.创建链接(Connection)工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
connectionFactory.setHost("服务器IP");
// 端口默认为5672
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("rabbitmq登录用户名");
connectionFactory.setPassword("rabbitmq登录密码");
Connection connection = null;
try {
// 3.创建链接
connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* 参数解析
* String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
* queue:设置发送消息队列的名称
* durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
* exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
* autoDelete:设置当没有消费者时是否自动删除该队列
* arguments:设置删除队列时的参数
*/
// 如果该名称队列不存在则会创建否则不会
channel.queueDeclare("lc_test",true,false,false,null);
// 6.接收消息
/**
* 参数解析
* String queue, boolean autoAck, Consumer callback
* queue:设置获取消息队列的名称
* autoAck:设置是否自动确认,确认是指消费者收到消息会告诉mq收到消息了
* callback:回调对象
*/
// 这里的队列名称要与发送消息所使用的队列名称一致
Consumer Consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后会自动执行该方法
/**
* 参数解析
* @param consumerTag 消息标识
* @param envelope 获取一些信息,比如交换机、routingKey等相关信息
* @param properties 配置信息
* @param body 具体数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: "+consumerTag);
System.out.println("envelope: exchangeMsg: "+envelope.getExchange()+"routingKeyMsg: "+envelope.getRoutingKey());
System.out.println("properties: "+properties);
System.out.println("body: "+new String(body));
}
};
channel.basicConsume("lc_test",true,Consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
}
这里获取信息后的处理关键在Consumer回调对象的重写回调方法(handleDelivery)中,通过该回调方法可以获取到具体的消息。
注意:消息消费端不要立即关闭Connection,Channel资源,我们需要其处在一个持续监听状态。如果关闭了资源就无法监听了。
此外消费端可以不用创建队列Queue,因为已经在生产端创建了,只需监听对应队列即可
work queues模式(一个生产者对应多个消费者)
架构图:
虽然是多个消费者但它们彼此之间是竞争关系,一条消息只能被一个消费者取到
作用:在任务过重的情况下该模式能提高任务处理的速度
比如队列中有1000条消息,有两个消费者,每个消费者只能消费500条消息。如果采用简单模式那么消息是来不及消费的,如果采用work queues模式则可以加入两个消费者来消费消息,此时能达到要求(采用轮询的方式,消费者1拿完消费者2拿再1拿再2拿…)
两端代码与简单模式一致,区别在于可以创建多个消费者端来监听同一条队列
Publish/Subscribe 发布与订阅模式(通过exchange分发消息)
架构图:
图中的X即为交换机
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定的交换机队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
通过交换机可以实现一条消息被多个消费者消费
注意:Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
消息生产端
public class ProductPubSub {
public static void main(String[] args) {
// 1.创建链接(Connection)工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
connectionFactory.setHost("服务器IP");
// 端口默认为5672
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("rabbitmq登录用户名");
connectionFactory.setPassword("rabbitmq登录密码");
Connection connection = null;
Channel channel = null;
try {
// 3.创建链接
connection = connectionFactory.newConnection();
// 4.创建Channel
channel = connection.createChannel();
// 5.创建交换机
/**
* 参数解析
* String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments
* exchange:设置交换机名称
* type:设置交换机类型(4种,枚举)
* DIRECT("direct"):定向
* FANOUT("fanout"):广播,发送消息给每个与该交换机绑定的队列
* TOPIC("topic"):通配符方式
* HEADERS("headers"):参数匹配(不常用)
* durable:是否持久化
* autoDelete:自动删除
* internal:是否mq内部使用(一般设为false)
* arguments:参数列表
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 6.创建队列Queue
/**
* 参数解析
* String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
* queue:设置队列名称
* durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
* exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
* autoDelete:设置当没有消费者时是否自动删除该队列
* arguments:设置删除队列时的参数
*/
// 如果该名称队列不存在则会创建否则不会
String queue1Name = "lc_test1";
String queue2Name = "lc_test2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 7.绑定队列和交换机
/**
* 参数解析
* String queue, String exchange, String routingKey
* queue:被绑定队列名称
* exchange:被绑定交换机名称
* routingKey:路由键,即绑定规则
*/
// 当交换机类型为fanout时,需要将routingKey设置为""以实现广播效果
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
// 8.发送消息
/**
* 参数解析
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:设置交换机名称。简单模式下会使用默认的""
* routingKey:路由名称,当队列名称与路由名称相同时才能绑定
* props:配置信息
* body:具体发送的消息数据
*/
String msg = "hello_rabbitmq";
channel.basicPublish(exchangeName,"",null,msg.getBytes());
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
// 9. 释放资源
try {
if (!Objects.isNull(connection) && !Objects.isNull(channel)) {
channel.close();
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
这里创建了一个交换机(test_fanout)和两个队列(lc_test1和lc_test2),并且将这两个队列与交换机绑定。交换机类型设置为fanout
广播类型,这样同一条消息会由交换机分发给这两个队列进行消费。注意设置交换机的routingKey为""(空字符串)
消息消费端
与之前无异,注意修改监听的队列名称即可。可以创建多个消费者来监听不同的队列
Routing路由模式(Exchange根据routingKey分发消息)
架构图:
队列与交换机的绑定,不再是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routing key完全—致,才会接收到消息
消息生产端
package com.lc.example;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
/**
* @Title ProductMsg
* @Author LC
* @Description //TODO $
* @Date $ $
**/
public class ProductRouting {
public static void main(String[] args) {
// 1.创建链接(Connection)工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数(比如虚拟机、用户名、密码、IP、端口等等)
connectionFactory.setHost("服务器IP");
// 端口默认为5672
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("rabbitmq登录用户名");
connectionFactory.setPassword("rabbitmq登录密码");
Connection connection = null;
Channel channel = null;
try {
// 3.创建链接
connection = connectionFactory.newConnection();
// 4.创建Channel
channel = connection.createChannel();
// 5.创建交换机
/**
* 参数解析
* String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments
* exchange:设置交换机名称
* type:设置交换机类型(4种,枚举)
* DIRECT("direct"):定向
* FANOUT("fanout"):广播,发送消息给每个与该交换机绑定的队列
* TOPIC("topic"):通配符方式
* HEADERS("headers"):参数匹配(不常用)
* durable:是否持久化
* autoDelete:自动删除
* internal:是否mq内部使用(一般设为false)
* arguments:参数列表
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 6.创建队列Queue
/**
* 参数解析
* String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments
* queue:设置队列名称
* durable:设置队列是否持久化,如果为true则队列信息会持久化到硬盘,当mq重启时队列信息不会丢失
* exclusive:两个功能:1.设置是否只能有一个消费者监听这个队列 2.当Connection链接关闭时是否删除队列
* autoDelete:设置当没有消费者时是否自动删除该队列
* arguments:设置删除队列时的参数
*/
// 如果该名称队列不存在则会创建否则不会
String queue1Name = "lc_test1_direct";
String queue2Name = "lc_test2_direct";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 7.绑定队列和交换机
/**
* 参数解析
* String queue, String exchange, String routingKey
* queue:被绑定队列名称
* exchange:被绑定交换机名称
* routingKey:路由键,即绑定规则
*/
// 当交换机类型为direct时,需要将routingKey设置为不同的string以实现定向分发效果
// queue1的routingKey为error,所有routingKey也为error的消息都会分发给queue1
channel.queueBind(queue1Name,exchangeName,"error");
// queue2的routingKey为info和warning,所有routingKey也为info或warning的消息都会分发给queue2
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"warning");
// 8.发送消息
/**
* 参数解析
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:设置交换机名称。简单模式下会使用默认的""
* routingKey:路由名称,当队列名称与路由名称相同时才能绑定
* props:配置信息
* body:具体发送的消息数据
*/
for (int i=0 ; i<10 ; i++){
String msg = "hello_rabbitmq"+i;
if (0 == i%2) {
channel.basicPublish(exchangeName,"error",null,(msg+"error").getBytes());
}else {
channel.basicPublish(exchangeName,"info",null,(msg+"info").getBytes());
channel.basicPublish(exchangeName,"warning",null,(msg+"warning").getBytes());
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
// 9. 释放资源
try {
if (!Objects.isNull(connection) &&