消息队列服务 RabbitMQ

消息队列服务 RabbitMQ

消息发布消费

简单模型:
消息队列服务 RabbitMQ

为什么选择RabbitMQ

mq优点

1.解耦合,使系统于系统之前的通信更加灵活
2.异步处理,减少请求响应的时间
3.削峰(错峰),增加系统的流量承受能力

相对缺点

1.系统可用性降低了,依赖mq的高可用
2.系统复杂性提高了,处理消息重复消费,消息丢失,消息阻塞等等问题
3.数据一致性问题,

常用消息队列服务对比

消息队列服务 RabbitMQ
1.微秒级延迟
2.持久性机制和高可用性机制
3.投递确认、发布者证实和灵活的路由策略
4.多协议,可视化管理工具,广泛的客户端和插件
5.多数公司的首选

消息队列RabbitMQ基本对象

对象 介绍
消息message 消息队列中用来传输数据的载体。主要属性 Delivery mode 投递模式(持久化 或 非持久化)Message priority(消息优先权)Expiration period(消息有效期)
交换机exchange 交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。
队列queue 队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。Durable(消息代理重启后,队列依旧存在)Exclusive(只被一个连接(connection)使用,关闭后队列即被删除)Auto-delete(当最后一个消费者退订后即被删除)Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)
绑定Binding 绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
连接connection AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。
通道channels AMQP 0-9-1提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。
虚拟主机vhosts 为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念

交换机类型的路由特点

Direct

消息队列服务 RabbitMQ

fanout

消息队列服务 RabbitMQ

topic

借用一张图
消息队列服务 RabbitMQ
通配符:
#:0个或若干个关键字;
*:一个关键字。

如“usa.*”能与“usa.news”匹配
无法与“usa.news.instant”匹配;
但是“usa.#”能与上述两者匹配。

头交换机(headers exchange)实际中使用最少

"x-match参数
当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。

交换机性能

性能排序:fanout > direct >> topic。比例大约为11:10:6

如何确保消息可靠性

  • 发送端发送可能失败
  • mq broker 可靠性
  • 消费端消费可能失败

消息发送阶段

  • 链接异常,重试
  • 多次重试失败, 再次发送机制
rabbitTemplate.setRetryTemplate(retryTemplate());

retryTemplate.registerListener(retryListener);
rabbitTemplate.setRecoveryCallback(recoveryCallback);

public interface RetryListener {
      <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback);
      <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable);
       <T, E extends Throwable> void one rror(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable);
}
//如果重试次数全部用完,还是异常,触发( close 前 )
public interface RecoveryCallback<T> {
       T recover(RetryContext context) throws Exception;
}
RetryContext 中是没有交换机,routingKey等信息的
  • 没有触达交换机?发送确认机制
    Rabbitmq提供两种确认机制
    1.通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
    2.通过将channel设置成confirm模式来实现;
    普通confirm模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端confirm。
    批量confirm模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端confirm。
    异步confirm模式:提供一个回调方法,服务端confirm了一 条或者多条消息后Client端会回调这个方法。
# 消息发送确认-异步确认机制
spring. rabbitmq .publisher-confirms: true
//发送时连接异常,没有触达交换机等
rabbitTemplate.setConfirmCallback(
new RabbitTemplate.ConfirmCallback() {   @Override   public void confirm(CorrelationData correlationData, boolean ack, String cause) {      log.info("confirm :{} ack:{}, cause:{}", correlationData.getId(), ack, cause);   }});
//发送信息
rabbitTemplate.convertAndSend("exchange", "route_key", data,      new CorrelationData("correlationDataId"));
  • 没有找到队列?
spring. rabbitmq .publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(
new RabbitTemplate.ReturnCallback() {   
@Override   
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {      log.error("message: {}", message);      
log.error("replyCode: {},replyText :{}", replyCode, replyText);      log.error("exchange: {},routingKey :{}", exchange, routingKey);   }
}
);

一张图来说明
消息队列服务 RabbitMQ

mq broker 高可用

基于主从的方式进行实现. 其有三种工作模式: 单机模式、普通集群模式、镜像集群模式

普通集群模式

即在多个服务器上部署多个MQ实例, 每台机器一个实例. 创建的每一个queue,只会存在一个MQ实例上. 但是每一个实例都会同步queue的元数据(即queue的标识信息). 当在进行消费的时候, 就算 连接到了其他的MQ实例上, 其也会根据内部的queue的元数据,从该queue所在实例上拉取数据过来。
只是通过集群部署的方式提高了消息的吞吐量,但是并没有考虑到高可用,并且性能开销巨大.容易造成单实例的性能瓶颈。
消息队列服务 RabbitMQ

镜像集群模式:

高可用模式与普通集群模式的主要区别在于. 无论queue的元数据还是queue中的消息都会同时存在与多个实例上.。
缺点:
1: 性能开销大: 因为需要进行整个集群内部所有实例的数据同步
2:无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 单个服务器节点的容量无法容纳了怎么办?.
消息队列服务 RabbitMQ

消息消费阶段

消费消息异常失败

RabbitMQ确认机制

  • 自动确认模式(automatic acknowledgement model),
    当消息代理(broker)将消息发送给应用后立即删除
  • 显式确认模式(explicit acknowledgement model)
    待应用(application)发送一个确认回执(acknowledgement)后再删除消息

Spring下确认模式

  • 不确认
    spring.rabbitmq.listener.simple.acknowledge-mode=none
  • 自动确认(spring没有异常,进行确认)
    spring.rabbitmq.listener.simple.acknowledge-mode=auto
  • 手动确认
    Spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 成功确认 void basicAck(long deliveryTag, boolean multiple) throws IOException;
  2. 失败确认 void basicNack(long deliveryTag, boolean multiple, boolean requeue);
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

自动确认配置总结
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=1000
spring.rabbitmq.listener.simple.default-requeue-rejected=false

手动确认总结:
1.监听的方法内部必须使用channel进行消息确认,包括消费成功或消费失败
如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接
2.rabbitmq断开,连接后会自动重新推送(不管是网络问题还是宕机重启)
3. 消费端处理消息的时候宕机,消息会自动推给其他的消费者
4.监听消息的方法抛出异常,消息会按照listener.retry的配置进行重试,但是重发次数完了之后还抛出异常的话,消息不会重发(也不会重发到其他消费者),只有应用重启后会重新推送。
5.spring.rabbitmq.listener.retry配置的重试是在消费端应用内处理的,不是rabbitqq重发
6.可以配置MessageRecoverer对异常消息进行处理,此处理会在listener.retry次数尝试完并还是抛出异常的情况下才会调用。
7.通过给队列(Queue)绑定死信队列,使用nack反馈给mq,会将消息转发到死信队列里面,此种方式需要自己在消费消息的方法内部将异常处理掉
8.消息变成死信有以下几种情况:
消息被拒绝(basic.reject/ basic.nack)并且requeue=false
消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
队列达到最大长度

重复消费消息

原因:重试机制;rabbitmq断开导致重发
去重处理:
1.不处理,业务代码兼容或者幂等操作
2.消息接收表,统一处理messageId
2.1先获取消息ID,然后判断是否存在主键为消息ID(加队列名)的记录,如果存在的话,就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成,并且将消息记录进行入库(channel重连,消息重发可能会出现并发问题)
2.2先获取消息ID,开启事务,然后插入主键为消息ID (加队列名)的记录,如果插入不成功的话,就不处理这条消息;如果插入成功的话,则消费者进行消费,消费完成、提交事务(存在大事务问题,业务代码可以建新事务来避免)
消息队列服务 RabbitMQ

消息发生堆积

原因:
1.消费者消费不过来
2.消费者依赖服务异常,无法完成消费

处理:
1.增加消费者数量
2.建消息临时表,直接消费消息入库(带上交换机和路由键值)

监控与告警:

官方推荐生产环境监控
3.8 集成 plugin rabbitmq_prometheus , prometheus/grafana
3.8 之前需要单独的plugin prometheus_rabbitmq_exporter,

上一篇:Listener


下一篇:oracle连接数据库问题(sid和listener)