RocketMQ

一.MQ的相关概念

什么是MQ?

  • Message Queue,消息队列。简单的来说,就是一个先进先出的队列,用来发送消息(信息)。

为什么要用 MQ?

  • 流量消峰:在电商系统中,比如双11下单太多,来不及处理,生产者就把下单的信息先放在MQ中,后来慢慢交给消费者进行消费。
  • 异步任务:有些服务的调用,比如A调用B,A还有别的事做,不能一直阻塞等待B的结果。就可以使用异步,A调用的信息交给MQ,A继续执行自己的业务。
  • 应用解耦:比如分布式系统中,各个系统相互调用。比如下图的库存系统出错,就会导致整个订单系统都出错,这样很不好,就是用MQ进行解耦。比如日志处理。

RocketMQ

MQ的选择?

  • Kafka:大吞吐量,专门为大数据而生。大数据领域的实时计算以及日志采集被大规模使用。只支持简单的MQ功能,大型项目建议使用。
  • RabbitMQ:结合Erlang语言本身的并发优势,并发性强,性能好,社区活跃。适合中小型项目。
  • RocketMQ:阿里巴巴开源框架,天生为金融互联网领域而生,可靠性非常高,尤其对于订单扣款业务。在双11经过多次考验。
二.RabbitMQ工作原理

RocketMQ

  • prodeucer:生产者,用来发送消息
  • Consumer:消费者,用来处理消息
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel:如果每次访问MQ都建立Connection是很浪费性能的,Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
      • 每次连接都会创建一个Channel,通过Channel进行一系列操作(创建queue,发消息)
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker,就是Rabbit的实例。
    • Exchange(交换机):message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。
    • Queue(队列):消息最终被送到这里等待 consumer 取走

Exchange说明:

类型:

  • 直接(direct):通过Binding(exchange 和 queue 之间的虚拟连接进行连接,binding 中可以包含 routing key)进行绑定

RocketMQ

  • 扇出(fanout):相当于广播

RocketMQ

  • 主题(topic):找到匹配的一组,比如abc.orange.def就匹配到Q1

RocketMQ

  • 标题(headers) :忽略
三.队列

死信队列:

RocketMQ

  • 顾名思义就是无法被消费的消息(由于特定的原因导致 queue 中的某些消息无法被消费)。
  • 解释图:设置死信队列和死信交换机,过期的消息发送给死信交换机,死信交换机再发给死信队列,由特定的消费者进行处理。
  • TTL:消息/队列最大的存活时间。
    • 每一个消息可以单独设置TTL,每一个队列也可以设置TTL。

 

延迟队列:

  • 延迟队列是死信队列的一种形式。看上图,如果我们不设置C1,那么所有的消息只能在队列中等待到过期,进入死信队列,马上被消费,相当于延迟消费。
  • 使用场景:
    • 订单在十分钟之内未支付则自动取消
    • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议...
  • 存在问题:不同的延迟时长,就需要增加不同的延迟队列。如下图:如果再增加一个1小时后执行的事务,就需要再加一个1小时的延迟队列。RocketMQ
    • 进阶1:使用一个延迟队列,不要设置队列的TTL,设置每一个消息的TTL。这样不同的消息就会在不同的时间点后进行消费。
      • 新问题:RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
    • 进阶2:使用延迟插件可以解决这个问题。  
优先队列:队列者设置为优先级队列,发消息的时候也设置优先级消息。这样才能保证真正的优先   镜像队列:在Rabbit集群中,如何保证队列同步到其他的节点,就使用镜像队列。实现:随便找一个节点添加一个策略policy就可以了。

四.消息可靠性怎么保证?

RocketMQ要保证消息不丢失,需要三方面都进行保证:生产者(发布确认机制,事务机制),消费者(消息应答机制,死信队列),MQ(持久化,集群)


发布确认:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果超时没有收到消息,或者收到未收到的消息,进行重发或者警报管理员。

  • 单个发布确认:是一种同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布。channel.waitForConfirms()
    • 缺点:效率低
  • 批量发布确认:也是一种同步确认发布的方式,先发布一批消息然后一起确认,可以极大地提高吞吐量
    • 缺点:一旦出现问题但很难推断出是那 消息出现了问题。
  • 异步发布确认:生产者只负责不断的发就行,确认的方面由MQ自己决定,收到未收到都会回复一个消息(回调函数)。不会造成生产者等待。用下图的俩个回调方法实现
    • 每一个Channel中的消息都有自己的编号,MQ只回复对应的编号就行

RocketMQ

事务机制:RabbitMQ 客户端中与事务机制相关的方法有三个

  • channel.txSelect  用于将当前的信道设置成事务模式。
  • channel.txCommit  用于提交事务 。

  • channel.txRollback 用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。

 消息应答:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。如果没收到ack,消息自动重新入队。

  • 自动应答:默认的,消费者一收到消息就马上进行应答。
    • 缺点:可能消费者后续处理出错,就没有办法了。
  • 手动应答:可以在消费者处理完逻辑之后,再进行应答,就保证了消息的可靠。实现:Channel.basicAck()
    • 手动应答还可以设置批量应答:比如发送消息1,2,3,4,5。批量应答是只要收到消息5,只应答5,前面的几个消息就不需要应答了。

RocketMQ


 持久化:需要将队列和消息都进行持久化。

RocketMQ

  •  刷盘的实现:刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘
    • 同步刷盘:在Broker把消息写到CommitLog映射区后,就会等待写入完成。
    • 异步刷盘:只是唤醒对应的线程,不保证执行的时机,流程如图所示。

RocketMQ

五.如何解决消息重复,消息积压,消息有序性?

消息重复:采用幂等性接口。

消息积压:

  • 因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
    • 解决方案:
      • 定位消费慢的原因,如果有bug,调bug。或者进行数据库层面的调优。
      • 多增加几个队列和消费者。

消息有序性

  • 有一些消息需要保证顺序,比如下订单,要先锁库存,才能生成订单信息
    • 解决方案:
      • 使用CompletableFuture工具类进行异步编排。
      • 全局有序:只能由一个生产者往Broker发送消息,并且一个Broker内部只能有一个队列。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!
      • 部分有序:消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。

RocketMQ

.其他小问题

vhost 是什么?起什么作用?

  • vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server
  • 其内部均含有独立的 queueexchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。
  • 从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

消息基于什么传输?

  • 由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。
  • 信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。

消息如何分发?

  • 轮询:默认
  • 不公平分发:比如消费者1的消费能力高,消费者2的消费能力低。就多给消费者1发消息。实现:channel.basicQos(1);

什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?

  • 在非 cluster 模式下,元数据主要分为 Queue 元数据(queue 名字和属性等)、Exchange元数据(exchange 名字、类型和属性等)、Binding 元数据(存放路由关系的查找表)、Vhost元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。
  • 在 cluster 模式下,还包括 cluster 中 node 位置信息和 node 关系信息。
  • 元数据按照 erlang node 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。元数据在 cluster 中是全 node 分布的。

RabbitMQ的集群模式和集群节点类型?

RocketMQ

 

 寄语:年轻人,你的职责是平整土地,而非焦虑时光,你做三四月的事,在七八月自有答案   ---余世存《时间之书》

上一篇:SharePoint 2010:搜索服务当前处于脱机状态


下一篇:PHP MySQL Order By 关键词