RabbitMQ-AMQP模型详解_踩踩踩从踩的博客-CSDN博客
前言
上篇文章介绍了AMQP得流程,以及介绍Vhost Host、连接 、通道 、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布 事务机制,发布确认机制,消费者独占等机制
publisher
路由不可达
当消息发送给交换器或队列,在发送中,出现没有队列。
- 交换没有绑定队列
- 交换没法根据消息的路由key把消息路由到队列。
可以处理的情况 但是别抛异常 只是为找到交换器之类的
- 退回
- 死信队列(备用交换)
退回
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)
mandatory
:
true
强制退回,
false
不需退回,直接丢弃。
在发送数据时设置
设置返回消息的回调处理
channel.addReturnListener(returnMessage -> {
try {
System.out.println("收到退回消息:" + new String(returnMessage.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
在spring中使用
- 设置消息不可以路由退回,设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
@Bean
public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true); // 设置消息不可以路由退回
// 设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
template.setReturnCallback(myReturnCallback());
return template;
}
- replyCode broker的回应码 replyText 回应描述
private ReturnCallback myReturnCallback() {
return new ReturnCallback() {
@Override
// replyCode broker的回应码 replyText 回应描述
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
// 在这里写退回处理逻辑
System.out.println("收到回退消息 replyCode=" + replyCode + " replyText=" + replyText + " exchange=" + exchange
+ " routingKey=" + routingKey);
System.out.println(" 消息:" + message);
}
};
}
在spring中,要重写 ReturnCallback,如果在spring中 配置文件中添加属性配置,这个是没有用的。在返回的数据可以知道
备用交换
- policy 设置好策略,
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
#对那些交换器进行匹配 指定备用交换器
- 代码中声明交换时通过参数指定备用交换
//声明参数
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
//备用交换参数指定
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");
加上备用参数进行指定上 myae上通道上去。
事务机制
怎么确认可靠发布,这就是事务机制要做的事情,保证网络传输的可靠发布。保证一个收,要么都收,无论是数据库,还是mq都是一样的,都是通过保证的。
当方法里面发布消息,并且需要做其他事情时,所以开启事务spring 事务管理需要的组件
事务管理器 TransactionManager@Configuration public class TxConfiguration {
@Bean
// 配置事务管理器
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
在
spring
该怎么玩事务就怎么玩
.
RabbitTransactionManager
只能做
Rabbitmq
的消息事务管理 只能是单连接的连接工厂
如果方法中,即要做数据库又要做rabbitmq,它没办法实现的。
没有分布式事务管理器实现。
rabbitmq
中事务机制来保证消息的可靠发布,性能是比较差 这是相对的发布确认机制。
调用 开启
@Transactional
public void send(int i) {
// 一定要设置ChannelTransacted(true) 表示开启通道事务
this.template.setChannelTransacted(true);
String message = "Hello World!-" + i;
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
if (i % 2 == 0)
throw new RuntimeException();
}
发布确认机制
性能是事务机制的 250 倍。 发布者发布消息,一般是走异步。 channel 有三种确认模式- 异步流式确认 事件驱动 优点 :开销低,吞吐量大
- 批量发布确认 批次等待,确认不ok 一批重发
- 单条确认 发一条就等待确认
- ack 接收成功
- nack 接收失败
- 发布者收不到Broker的确认(超时)
这都是确认会出现的情况。
异步流式确认
- 开启发布确认模式 就不能再做事务管理了
- 待确认消息的Map
- 指定流式确认事件回调处理
- 从Map中移除对应的消息
- 重发,或做其他处理
// 1 开启发布确认模式 就不能再做事务管理了
channel.confirmSelect();
// 2 待确认消息的Map
Map<Long, String> messagesMap = new ConcurrentHashMap<>();
// 3 指定流式确认事件回调处理
channel.addConfirmListener((deliveryTag, multiple) -> { // multiple表示是否是多条的确认
System.out.println("收到OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + ",从Map中移除消息");
// 从Map中移除对应的消息
messagesMap.remove(deliveryTag);
}, (deliveryTag, multiple) -> {
System.out.println("收到 NON OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + " 从Map中移除消息,重发或做其他处理");
// 从Map中移除对应的消息
String message = messagesMap.remove(deliveryTag);
// 重发,或做其他处理
System.out.println("失败消息:" + message);
});
for (int i = 1; i < 100; i++) {
// 消息内容
String message = "消息" + i;
// 4 将消息放入到map中
messagesMap.put(channel.getNextPublishSeqNo(), message);
// 5、发送消息
channel.basicPublish("mandatory-ex", "", true, null, message.getBytes());
System.out.println("发布消息:" + message);
Thread.sleep(2000L);
}
在spring中添加 publisher-confirms 开启消息确认
这里做流式确认 设置回调 发送
// 配置RabbitTemplate Bean
@Bean
public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置发布确认回调,一个RabbitTemplate只可设置一个回调。
template.setConfirmCallback(confirmCallback());
return template;
}
Consumer
消费者的两种消息模式、消费者 注册 取消 、独占消费者 消费者 优先级 消息确认 pull 拉模式消费。这几种模式。
两种消费模式
- push 推模式
- pull 拉模式
在这两种模式下面的问题出现
push 模式
broker client 消费者 client 向 broker 注册对某个队列的消费者// 对感兴趣的队列注册消费者,返回Server生成的consumerTag(消费者标识) String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});
取消消费者注册
channel.basicCancel(consumerTag);
独占消费者
独占队列:被创建它的连接独占 这个连接上的 channel 可以共享。连接关闭,独占队列没有了。 独占消费者:消费者独占一个队列进行消息消费,适用场景: 消息一定要严格按序消费处理。 一旦独占消费者挂了的话,消息队列里面的数据就会一直存在着,因此需要备用的 在spring中 如何添加 只需要添加 exclusive设置为true就可以了
采用不断的重试去抢独占,也是防止被挂了。