文章目录
前言
提示:RaabitMQ消息队列的学习。
一、MQ是什么?
- MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统
之间进行通信。 -
RabbitMQ 是一个消息中间件:
它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
- 工作原理
1.1 AMQP
-
AMQP
,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用
层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵
循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。
二、在Linux安装RabbitMQ
2.1 安装
1. 我们把erlang环境与rabbitMQ 安装包解压到Linux
2. rpm -ivh erlang安装包
3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps
4. rpm -ivh rabbitmq安装包
2.2 RabbitMQ启动命令
1. 开启服务 /sbin/service rabbitmq-server start / service rabbitmq-server start
2. 停止服务 service rabbitmq-server stop
3. 重启服务 service rabbitmq-server restart
2.3 开启RabbitMQ 后台管理界面
1. rabbitmq-plugins enable rabbitmq_management
- 添加一个新的用户
1. 创建rabbitMQ账号
rabbitmqctl add_user 用户名 密码
2. 设置用户角色
rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员
3. 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
4. 查看rabbitmq的用户和角色
rabbitmqctl list_users
5. 登录rabbitMQ 界面: Linux虚拟机ip:15672 即可
2.3.1 登录rabbitMQ UI界面
记得开放15672端口
访问 Linux虚拟机ip:15672 即可
输入账户密码后看到这个界面代表成功
2.3 Docker启动RabbitMQ
Docker安装
1. docker pull rabbitmq:3-management
2. 开启rabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
2.4 常见消息模型
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
2.5 生产者(Producer) / 消费者(Consumer)
- 所需依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
1234567891011121314
- 生产者代码
/**
* 生产者:发消息
*/
public class Producer {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来发消息
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的信息是否持久化 默认false 信息存储在内存中
* 3.该列队是否只供一个消费者进行消费,是否进行消息共享
* true:可以多个消费者消费
* false:只能一个消费者消费
* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
* true:自动删除
* false:不自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message="hello rabbitMQ";
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的KEY值是哪个? 指的是本次队列的名称
* 3.其他参数信息
* 4.发送的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
channel.close();
connection.close();
}
}
- 消费者
/**
* 消费者:接收消息
*/
public class Consumer {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
2.6 工作队列模式(Work Queues)
- 模式说明
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消费,采用的是 轮询机制
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
- 工作模式:生产者
public class ProducerWorkQueue {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来发消息
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的信息是否持久化 默认false 信息存储在内存中
* 3.该列队是否只供一个消费者进行消费,是否进行消息共享
* true:可以多个消费者消费
* false:只能一个消费者消费
* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
* true:自动删除
* false:不自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 1; i <= 10; i++) {
//发消息
String message=i+"hello rabbitMQ";
/**
* 发送一个消息
* 1.发送到哪个交换机
* 2.路由的KEY值是哪个? 指的是本次队列的名称
* 3.其他参数信息
* 4.发送的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
channel.close();
connection.close();
}
}
- 工作模式:两个消费者
/**
* 消费者:接收消息
*/
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
/**
* 消费者:接收消息
*/
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//工厂IP连接rabbitMQ队列
factory.setHost("ip地址");
//设置用户名密码
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
- 结果:各执行五次,也验证了 我们上面所说的 轮询机制
- 小结:
一个消息只能有一个接收者,但是可以有多个接收者
2.7 参数细节
- durable:是否进行持久化,当前队列如果进行持久化,我们重启rabbitMQ后当前队列依旧存在
//消费者生成的队列
channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
- props :队列中的信息是否持久化,若消息持久化,我们重启rabbitMQ后当前队列依旧存在
//MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
- autoDelete:是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);
- autoAck:自动应答
若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。
若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失
channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);
2.8 实现能者多劳
-
业务场景:
当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢?
- 开启不公平分发,能者多劳 channel.basicQos(1); 0:轮询机制 1:能者多劳
- 开启手动确认
-
消费者a
/**
* 消费者:接收消息
*/
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//开启不公平分发,能者多劳
channel.basicQos(1);
DeliverCallback deliverCallback=(consumerTag, message)-> {
String data = new String(message.getBody());
System.out.println(new String(message.getBody()));
//参数1:确认队列中那个具体的消息:
// 可以获取消息的id
// 消息routingkey
// 交换机 exchange
// 消息和重传标志
//参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
- 消费者b:消费消息时然消费者b休眠100毫秒
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//开启不公平分发,能者多劳
channel.basicQos(1);
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(message.getBody()));
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
- 执行结果:
消费者a执行
消费者b执行
2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送
- 应用场景:两个消费者每次都从队列中来获取消息,若消费者a正常执行,消费者b在执行过程中出现了宕机,挂掉了那么我们未被消费的消息会被重新放回到队列中,防止消息丢失。
- 生产者
public class ProducerWorkQueue {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
System.out.println("消息发送完毕");
}
}
}
- 消费者a
public class ConsumerWorkQueues1 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
String data = new String(message.getBody());
System.out.println("消费者1===>"+new String(message.getBody()));
try {
int i=3/0;//模拟业务发生异常
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("拒收消息发生了异常");
//拒收消息
//参数一:表示投递的消息标签
//参数二:是否开启多个消息同时确认
//参数三:是否重新给队列发送
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
}
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
- 消费者b
public class ConsumerWorkQueues2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
System.out.println("睡10秒");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(message.getBody()));
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
- 当消费者b在消费消息时,我们让消费者b睡眠10秒模拟业务流程,在这10秒内我们手动关掉消费者b
- 发送 aa 消费者a接收
- 发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果
此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
2.8.2 预取值
channel.basicQos(1); 0:轮询机制 1:能者多劳 若值>1代表当前队列的预取值,代表当前队列大概会拿到多少值
2.9 Publish/Subscribe 发布/订阅
- 也可以叫
广播模式
,当我们的P消费者发送了消息,交给了X(交换机),所有绑定了这个X(交换机)的队列都可以接收到P消费者发送的消息 - 代码实现生产者
public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机, 参数一:交换机名称 参数二:交换机类型 fanout广播类型
//参数2:交换机类型也可使用 BuiltinExchangeType. 的方式来查看选择
channel.exchangeDeclare("order", "fanout");
channel.basicPublish("order","",null,"fanout type message".getBytes());
channel.close();
connection.close();
}
}
- 代码实现消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("order","fanout");
//获取临时队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"order","");
channel.basicConsume(queueName,true,(consumerTag,message)->{
System.out.println("消费者1===>"+new String(message.getBody()));
},consumerTag -> System.out.println("取消消费消息"));
}
}
2.10 Routing(路由) - Direct
routing值订阅模型-Direct(直连)
-
在上面广播模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致,才会接受到消息
-
生产者
public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通过信道声明交换机, 参数一:交换机名称 参数二:direct 路由模式
channel.exchangeDeclare("logsExchange","direct");
//发送消息 参数一:发送信息到的交换机名称
// 参数二:绑定路由 发送给队列的那个路由key,
//只有当队列的路由key与交换机的路由key相对应时,队列才会接受到消息
channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());
channel.close();
connection.close();
}
}
- 消费者
public class Consumer1 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","direct");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:路由key,若消费者的路由key与生产者的路由key相同则可以收到消息
channel.queueBind(queueName,"logsExchange","infoRouting");
channel.queueBind(queueName,"logsExchange","msgRouting");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
- 消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","direct");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs","error");
channel.queueBind(queueName,"logs","msg");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
2.11 Routing(路由)- Topic
- Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
- 只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符!
#通配符
* (star) can substitute for exactly one word :匹配一个词
# (hash) can substitute for zero or more words :匹配一个或多个词
- 生产者
public class Provider {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通过信道声明交换机, 参数一:交换机名称 参数二:topic 动态路由
channel.exchangeDeclare("order","topic");
String routingKey="user.order";
//发送消息 参数一:发送信息到的交换机名称 参数二:绑定路由 发送给队列的那个路由key
channel.basicPublish("order",routingKey,null,("routing logs topic发送了消息"+routingKey).getBytes());
channel.close();
connection.close();
}
}
- 消费者1
public class Consumer1 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("order","topic");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
channel.queueBind(queueName,"order","user.*");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
- 消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("order","topic");
//获取临时队列名
String queueName = channel.queueDeclare().getQueue();
//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
channel.queueBind(queueName,"order","user.#");
channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
}
}
三、进阶篇 高级特性
3.1 死信队列
死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
- 应用场景
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
-
生产者
:给正产的消息队列发送消息,并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列
public class TTLProvider {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//发送死信 设置TTL过期时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
String msg=""+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());
}
System.out.println("结束发送");
}
}
正常队列消费者
public class TTLConsumer1 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//当消息被拒绝接受/未被消费 会将消息转发到死信队列
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
死信队列消费者
public class TTLConsumer2 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("账户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
-
结果:
当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列
死信队列产生三大原因 - 消息被拒接
- 消息TTL过期
- 队列达到最大长度
3.1.1 死信队列实战:消息TTL过期
配置类
@Configuration
public class RabbitMQConfiguration {
//普通交换机
public static final String X_EXCHANGE="X";
//死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE="Y";
//普通队列
public static final String QUEUE_A="QA";
public static final String QUEUE_B="QB";
//死信队列
public static final String DEAD_QUEUE_D="QD";
//声明普通x交换机
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明死信交换机
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明普通队列A TTL:10S
@Bean
public Queue queueA(){
Map<String,Object> arg=new HashMap<>();
//设置死信交换机
arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routingKey
arg.put("x-dead-letter-routing-key","YD");
//设置TTL过期时间
arg.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();
}
//声明普通队列B TTL:40S
@Bean
public Queue queueB(){
Map<String,Object> arg=new HashMap<>();
//设置死信交换机
arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routingKey
arg.put("x-dead-letter-routing-key","YD");
//设置TTL过期时间
arg.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();
}
//死信队列
@Bean
public Queue queueD(){
return QueueBuilder.durable(DEAD_QUEUE_D).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
TTL生产者
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{msg}")
public void sendMsg(@PathVariable("msg") String msg){
log.info("当前发送时间:{}发送了一条消息",new Date().toString());
rabbitTemplate.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg);
rabbitTemplate.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg);
}
}
死信队列消费者
@Component
@Slf4j
public class DeadLetterConsumer {
@RabbitListener(queues = "QD")
public void t1(Message message, Channel channel)throws Exception{
log.info("收到死信队列的消息{},时间为{}",new String(message.getBody(),"UTF-8"),new Date().toString());
}
}
-
死信队列-TTL过期时间测试结果
3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度
- 生产者
public class Producer {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i = 1; i <= 10; i++) {
String msg=""+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
}
}
}
- 消费者a
//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
map.put(“x-max-length”,6);
public class Consumer01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
map.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
- 消费者b
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
3.1.3 死信队列实战:消息被拒
- 生产者
public class Producer {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i = 1; i <= 10; i++) {
String msg="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
}
}
}
- 消费者a
- 此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
- 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
- 2.并且开启手动应答
public class Consumer01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("登录账户");
factory.setPassword("登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列
HashMap<String, Object> map = new HashMap<>();
//正常队列设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信队列的routingKey
map.put("x-dead-letter-routing-key","dead");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
DeliverCallback deliverCallback=( consumerTag, message)->{
String msg=new String(message.getBody());
if("info5".equals(msg)){
System.out.println("Consumer1接收消息===>"+msg+"此消息被拒绝");
//此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
//开启手动应答
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
}
}
- 消费者b
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=( consumerTag, message)->{
System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
};
CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
3.2 基于SpringBoot实现延迟队列
- 配置队列交换机
@Configuration
public class QueueConfig {
@Bean("exchange")
public DirectExchange exchange(){
return new DirectExchange("msg");
}
@Bean("simpleQue")
public Queue simpleQue(){
HashMap<String, Object> map = new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange","dead");
//设置死信路由
map.put("x-dead-letter-routing-key","deadKey");
//消息失效时间
map.put("x-message-ttl",10000);
return new Queue("simple",false,false,false,map);
}
@Bean
public Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{
return BindingBuilder.bind(simple).to(msg).with("info");
}
@Bean("deadExchange")
public DirectExchange exchange1(){
return new DirectExchange("dead");
}
@Bean("deadQueue")
public Queue deadQ(){
return new Queue("deadQue",false,false,false,null);
}
@Bean
public Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){
//绑定死信队列到死信交换机通过路由
return BindingBuilder.bind(queue).to(dead).with("deadKey");
}
}
- 生产者
@RestController
public class Provider {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl/{message}")
public void t1(@PathVariable String message){
String queueName="simple";
Date date = new Date();
System.out.println(date);
rabbitTemplate.convertAndSend("msg","info",message);
}
}
- 消费者
@Component
public class Consumer {
@RabbitListener(queues = "deadQue")
public void hello(Message msg, Channel channel)throws Exception{
System.out.println("接收到消息"+new String(msg.getBody()));
Date date1 = new Date();
System.out.println(date1);
}
}
- 我们看到消息每隔十秒更新一次
3.3 发布确认 高级特性
3.3.1 可靠性投递confirm模式
- 场景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间的生产者消息投递失败,导致消息丢失,需要手动处理和恢复。-可靠性投递confirm模式
- 需要在application核心配置文件中设置发布确认类型
- spring-rabbitmq-publisher-confirm-type: correlated
- 类型1:none:禁用发布确认模式,是默认值
- 类型2:correlated:发布消息成功到交换机后出发回调方法
- 类型3:simple:和correlated效果一样,但是如果回调返回的是false,会关闭信道,接下来无法发送消息
- 配置类
@Component
public class confirmConfig {
public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
public static final String CONFIRM_QUEUE="confirm.queue";
public static final String CONFIRM_ROUTING_KEY="confirm";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return new Queue(CONFIRM_QUEUE);
}
@Bean
public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
- 当生产者发送给交换机消息时,交换机的名字错了,或者交换机挂掉了,会导致消息的丢失,那么我们需要实现回调接口,当交换机收到消息后会给生产者发送回调消息
- 实现回调接口:实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct //当所有注解执行完后,再执行这个注解
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调方法
* 发消息,交换机接收到了,回调
* 参数
* 1. correlationData:保存消息的ID及相关信息,这个消息是我们生产者手动传入的
* 2. 交换机收到消息 true
* 3. null
*/
/**
* 交换机确认回调方法
* 发消息,交换机接收失败,回调
* 参数
* 1. correlationData:保存消息的ID及相关信息
* 2. 交换机收到消息 false
* 3. cause:失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到了ID为{}的消息",id);
}else {
log.info("交换机为收到了ID为{}的消息,原因是:{}",id,s);
}
}
}
- 生产者
@RestController
public class ConfirmProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}")
public void t1(@PathVariable String msg){
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);
}
}
- 消费者
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
public void consumer(Message message){
System.out.println("高级特性确认发布消费者收到了消息===>"+new String(message.getBody()));
}
}
- 测试:当我们正常发送消息
- 测试:当我们把交换机名字换掉
3.3.2 可靠性投递return模式
- 场景:若交换机收到消息,队列没有收到消息,应该如何解决?
- 需要在application核心配置文件中设置是否回退消息,当消息路由不到消费者
- spring-rabbitmq-publisher-returns=true 开启回退消息
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct //当所有注解执行完后,再执行这个注解
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确认回调方法
* 发消息,交换机接收到了,回调
* 参数
* 1. correlationData:保存消息的ID及相关信息
* 2. 交换机收到消息 true
* 3. null
*/
/**
* 交换机确认回调方法
* 发消息,交换机接收失败,回调
* 参数
* 1. correlationData:保存消息的ID及相关信息
* 2. 交换机收到消息 false
* 3. cause:失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到了ID为{}的消息",id);
}else {
log.info("交换机未收到了ID为{}的消息,原因是:{}",id,s);
}
}
/**
* 消息传递过程中 不可达 消费者的队列时将消息返回给生产者
* 只有当消息 不可达 目的地的时候 才进行回调
* 参数1:消息体
* 参数2:回复代码
* 参数3:回复原因
* 参数4:交换机
* 参数5:路由key
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);
}
}
3.4 优先级队列
- 优先级越高,消息先被消费者消费
- 官方设置最大优先级 0-255 超出优先级则报错 自己使用时数字不必设置很大,会浪费CPU效率
- 生产者
public class PriorityProducer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置优先级参数
AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();
for (int i = 1; i <= 10; i++) {
String msg="info"+i;
if(i==5){
channel.basicPublish("","hi",build,msg.getBytes());
}else {
channel.basicPublish("","hi",null,msg.getBytes());
}
}
}
}
- 消费者
public class PriorityConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("RabbitMQ登录用户名");
factory.setPassword("RabbitMQ登录密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> map = new HashMap<>();
//设置当前队列为优先级队列
map.put("x-max-priority",10);
channel.queueDeclare("hi",false,false,false,map);
channel.basicConsume("hi",true,(consumerTag,message)->{
System.out.println("优先级队列接收消息顺序===>"+new String(message.getBody()));
},(consumerTag) -> System.out.println("取消回调"));
}
}
- 测试结果:我们定义的是消息5优先级最高,其他消息为默认优先级
3.5 消费端限流
-
参数一:
prefetchSize:预先载入的大小 0表示不限制大小 -
参数二:
prefetchCount:预先载入的消息条数 -
参数三:
global:false 注意:autoAck手动应答一定要为false
//设置每次确定一个消息
channel.basicQos(0,1,false);
12
生产者
public class AckProvider {
//队列名称
public static final String QUEUE_NAME="hello_Ack";
//发消息
public static void main(String[] args) throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
System.out.println("消息发送完毕");
}
}
}
消费者
public class AckConsumer2 {
//队列名称,接收此队列的消息
public static final String QUEUE_NAME="hello_Ack";
public static void main(String[] args) throws Exception{
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户");
factory.setPassword("密码");
//创建连接
Connection connection = factory.newConnection();
//通过连接来获取 信道来收消息
Channel channel = connection.createChannel();
//声明 接收消息的回调
DeliverCallback deliverCallback=(consumerTag, message)-> {
//message:包含消息头和消息体,我们只想拿到消息体
//若不进行转换,直接输出message我们拿到的则是地址
System.out.println(new String(message.getBody()));
try {
Thread.sleep(1000*5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动确认消息:
//参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//声明 取消消费的回调
CancelCallback cancelCallback=consumerTag->{
System.out.println("消费消息被中断");
};
//每次只消费一个
channel.basicQos(0,1,false);
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动答应
* true:代表自动应答
* false:手动应答
* 3.消费成者成功消费的回调
* 4.消费者取消消费的回调
*/
//关闭自动应答 false
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}