消息队列MQ总结

消息队列MQ

优点

1. 解耦

使用之前…
消息队列MQ总结
使用之后…
消息队列MQ总结

2. 异步

使用之前…
消息队列MQ总结
使用之后…
消息队列MQ总结

3. 削峰

使用之前…
消息队列MQ总结

使用之后…
消息队列MQ总结

使用时遇到的问题及解决方法

1. 重复消息问题

有哪些场景会出现重复的消息呢?

  • 消息生产者产生了重复的消息
  • kafka和rocketmq的offset被回调了
  • 消息消费者确认失败
  • 消息消费者确认时超时了
  • 业务系统主动发起重试

解决方法

消费者在做业务处理时,要做幂等设计
具体如何做幂等设计,查看这里

在这里推荐增加一张消费消息表,来解决mq的这类问题。消费消息表中,使用messageId做唯一索引,在处理业务逻辑之前,先根据messageId查询一下该消息有没有处理过,如果已经处理过了则直接返回成功,如果没有处理过,则继续做业务处理。

消息队列MQ总结

2. 数据一致性问题

在一个处理流程中,有些服务调用成功,有些服务调用失败,导致数据不一致

解决方法

我们都知道数据一致性分为:

  • 强一致性
  • 弱一致性
  • 最终一致性

mq为了性能考虑使用的是最终一致性,那么必定会出现数据不一致的问题。这类问题大概率是因为消费者读取消息后,业务逻辑处理失败导致的,这时候可以增加重试机制

重试分为:同步重试异步重试

有些消息量比较小的业务场景,可以采用同步重试,在消费消息时如果处理失败,立刻重试3-5次,如何还是失败,则写入到记录表中。但如果消息量比较大,则不建议使用这种方式,因为如果出现网络异常,可能会导致大量的消息不断重试,影响消息读取速度,造成消息堆积。
消息队列MQ总结
消息量比较大的业务场景,建议采用异步重试,在消费者处理失败之后,立刻写入重试表,有个job专门定时重试。

还有一种做法是,如果消费失败,自己给同一个topic发一条消息,在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式

3. 消息丢失问题

有哪些场景会出现消息丢失问题呢?

  • 消息生产者发生消息时,由于网络原因,发生到mq失败了。
  • mq服务器持久化时,磁盘出现异常
  • kafka和rocketmq的offset被回调时,略过了很多消息。
  • 消息消费者刚读取消息,已经ack确认了,但业务还没处理完,服务就被重启了。

导致消息丢失问题的原因挺多的,生产者、mq服务器、消费者 都有可能产生问题

解决方法

可以增加一张消息发送表,当生产者发完消息之后,会往该表中写入一条数据,状态status标记为待确认。如果消费者读取消息之后,调用生产者的api更新该消息的status为已确认。有个job,每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息

消息队列MQ总结
这样不管是由于生产者、mq服务器、还是消费者导致的消息丢失问题,job都会重新发消息。

4. 消息顺序问题

有些业务数据是有状态的,比如订单有:下单、支付、完成、退货等状态,如果订单数据作为消息体,就会涉及顺序问题了。如果消费者收到同一个订单的两条消息,第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了,没有下单就先支付了

解决方法

解决这类问题之前,我们先确认一下,消费者是否真的需要知道中间状态,只知道最终状态行不行?
消息队列MQ总结
其实很多时候,我真的需要知道的是最终状态,这时可以把流程优化一下:

消息队列MQ总结
这种方式可以解决大部分的消息顺序问题。

但如果真的有需要保证消息顺序的需求。订单号路由到不同的partition,同一个订单号的消息,每次到发到同一个partition

消息队列MQ总结

5. 消息堆积

如果消息消费者读取消息的速度,能够跟上消息生产者的节奏,那么整套mq机制就能发挥最大作用。但是很多时候,由于某些批处理,或者其他原因,导致消息消费的速度小于生产的速度。这样会直接导致消息堆积问题,从而影响业务功能。

解决方法

这个要看消息是否需要保证顺序。

如果不需要保证顺序,可以读取消息之后用多线程处理业务逻辑

消息队列MQ总结
这样就能增加业务逻辑处理速度,解决消息堆积问题。但是线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。

如果需要保证顺序,可以读取消息之后,将消息按照一定的规则分发到多个队列中,然后在队列中用单线程处理
消息队列MQ总结

6. 系统复杂度提升

mq的机制需要:生产者、mq服务器、消费者。

有一定的学习成本,需要额外部署mq服务器,而且有些mq比如:rocketmq,功能非常强大,用法有点复杂,如果使用不好,会出现很多问题。有些问题,不像接口调用那么容易排查,从而导致系统的复杂度提升了。

这个就需要使用者深入了解使用的MQ中间件了。

MQ还有其他一些问题,比如 定时发送、延迟发送、私信队列、事务问题等等,本文只是讲了其中一部分。





本文来自 [公众号](https://mp.weixin.qq.com/s/V2I3kN43pgQiQPlygc2ObQ):
上一篇:kafka相关问题


下一篇:为啥要用消息队列(MQ),几大主流MQ框架优劣势,你会怎么选择?