消息队列(mq)是什么?
对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。下面我们以这个本质作为根,一起由浅入深地聊聊 MQ。
01 从 MQ 的本质说起将
MQ 掰开了揉碎了来看,都是「一发一存一消费」,再直白点就是一个「转发器」。
生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。
上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。
1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
02 原始模型的进化
2.1 队列模型
最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个消息。
这便是队列模型:它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。
2.2 发布-订阅模型
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。
一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。
为了解决这个问题,就演化出了另外一种消息模型:发布-订阅模型。
在发布-订阅模型中,存放消息的容器变成了 “主题”,订阅者在接收消息之前需要先 “订阅主题”。最终,每个订阅者都可以收到同一个主题的全量消息。
仔细对比下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。
2.3 小结
最后做个小结,上面两种模型说白了就是:单播和广播的区别。而且,当发布-订阅模型中只有 1 个订阅者时,它和队列模型就一样了,因此在功能上是完全兼容队列模型的。
这也解释了为什么现代主流的 RocketMQ、Kafka 都是直接基于发布-订阅模型实现的?此外,RabbitMQ 中之所以有一个 Exchange 模块?其实也是为了解决消息的投递问题,可以变相实现发布-订阅模型。
包括大家接触到的 “消费组”、“集群消费”、“广播消费” 这些概念,都和上面这两种模型相关,以及在应用层面大家最常见的情形:组间广播、组内单播,也属于此范畴。
所以,先掌握一些共性的理论,对于大家再去学习各个消息中间件的具体实现原理时,其实能更好地抓住本质,分清概念。
03 透过模型看 MQ 的应用场景
目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。
那到底是先有消息模型,还是先有应用场景呢?答案肯定是:先有应用场景(也就是先有问题),再有消息模型,因为消息模型只是解决方案的抽象而已。
MQ 经过 30 多年的发展,能从最原始的队列模型发展到今天百花齐放的各种消息中间件(平台级的解决方案),我觉得万变不离其宗,还是得益于:消息模型的适配性很广。
我们试着重新理解下消息队列的模型。它其实解决的是:生产者和消费者的通信问题。那它对比 RPC 有什么联系和区别呢?
通过对比,能很明显地看出两点差异:
1、引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。
2、多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。
再返过来思考 MQ 的所有应用场景,就不难理解 MQ 为什么适用了?因为这些应用场景无外乎都利用了上面两个特性。
举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等
引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。
改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本
。
这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。
除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰。
我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……
MQ 其他的应用场景基本类似,都能回归到消息模型的特性上,找到它适用的原因,这里就不一一分析了。
总之,就是建议大家多从复杂多变的实践场景再回归到理论层面进行思考和抽象,这样能吃得更透。
04 如何设计一个 MQ?
了解了上面这些理论知识以及应用场景后,下面我们再一起看下:到底如何设计一个 MQ?
4.1 MQ 的雏形
我们还是先从简单版的 MQ 入手,如果只是实现一个很粗糙的 MQ,完全不考虑生产环境的要求,该如何设计呢?
文章开头说过,任何 MQ 无外乎:一发一存一消费,这是 MQ 最核心的功能需求。另外,从技术维度来看 MQ 的通信模型,可以理解成:两次 RPC + 消息转储。
有了这些理解,我相信只要有一定的编程基础,不用 1 个小时就能写出一个 MQ 雏形:
1、直接利用成熟的 RPC 框架(Dubbo 或者 Thrift),实现两个接口:发消息和读消息。
2、消息放在本地内存中即可,数据结构可以用 JDK 自带的 ArrayBlockingQueue 。
4.2 写一个适用于生产环境的 MQ
当然,我们的目标绝不止于一个 MQ 雏形,而是希望实现一个可用于生产环境的消息中间件,那难度肯定就不是一个量级了,具体我们该如何下手呢?
1、先把握这个问题的关键点
假如我们还是只考虑最基础的功能:发消息、存消息、消费消息(支持发布-订阅模式)。
那在生产环境中,这些基础功能将面临哪些挑战呢?我们能很快想到下面这些:
1、高并发场景下,如何保证收发消息的性能?
2、如何保证消息服务的高可用和高可靠?
3、如何保证服务是可以水平任意扩展的?
4、如何保证消息存储也是水平可扩展的?
5、各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?
可见,高并发场景下的三高问题在你设计一个 MQ 时都会遇到,「如何满足高性能、高可靠等非功能性需求」才是这个问题的关键所在。
2、整体设计思路
先来看下整体架构,会涉及三类角色:
另外,将「一发一存一消费」这个核心流程进一步细化后,比较完整的数据流如下:
基于上面两个图,我们可以很快明确出 3 类角色的作用,分别如下:
1、Broker(服务端):MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护等。
2、Producer(生产者):MQ 的客户端之一,调用 Broker 提供的 RPC 接口发送消息。
3、Consumer(消费者):MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消费确认。
3、详细设计
下面,再展开讨论下一些具体的技术难点和可行的解决方案。
难点1:RPC 通信
解决的是 Broker 与 Producer 以及 Consumer 之间的通信问题。如果不重复造*,直接利用成熟的 RPC 框架 Dubbo 或者 Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题了。
当然,你也可以基于 Netty 来做底层通信,用 Zookeeper、Euraka 等来做注册中心,然后自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大。
难点2:高可用设计
高可用主要涉及两方面:Broker 服务的高可用、存储方案的高可用。可以拆开讨论。
Broker 服务的高可用,只需要保证 Broker 可水平扩展进行集群部署即可,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。
存储方案的高可用有两个思路:1)参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;2)还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,它们都有自己的高可用方案。
难点3:存储设计
消息的存储方案是 MQ 的核心部分,可靠性保证已经在高可用设计中谈过了,可靠性要求不高的话直接用内存或者分布式缓存也可以。这里重点说一下存储的高性能如何保证?这个问题的决定因素在于存储结构的设计。
目前主流的方案是:追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二份查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。
如果不追求很高的性能,也可以考虑现成的分布式文件系统、KV 存储或者数据库方案。
难点4:消费关系管理
为了支持发布-订阅的广播模式,Broker 需要知道每个主题都有哪些 Consumer 订阅了,基于这个关系进行消息投递。
由于 Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。
难点5:高性能设计
存储的高性能前面已经谈过了,当然还可以从其他方面进一步优化性能。
比如 Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取等等。
4.3 小结
再总结下,要回答好:如何设计一个 MQ?
1、需要从功能性需求(收发消息)和非功能性需求(高性能、高可用、高扩展等)两方面入手。
2、功能性需求不是重点,能覆盖 MQ 最基础的功能即可,至于延时消息、事务消息、重试队列等高级特性只是锦上添花的东西。
3、最核心的是:能结合功能性需求,理清楚整体的数据流,然后顺着这个思路去考虑非功能性的诉求如何满足,这才是技术难点所在。
05 写在最后
上面这些内容从 MQ 一发一存一消费这个本质出发,讲解了消息模型的演进过程,这是 MQ 最核心的理论基础。基于此,大家也能更容易理解 MQ 的各种新名词以及应用场景。
最后通过回答:如何设计一个 MQ?目的是让大家对 MQ 的核心组件和技术难点有一个清晰的认识。另外,带着这个问题的答案再去学习 Kafka、RocketMQ 等具体的消息中间件时,也会更有侧重点。
面试开始
一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向你走来,看着铮亮的头,心想着肯定是尼玛*架构师吧!
但是我们看过暖男敖丙的系列,腹有诗书气自华,虚都不虚。
你为啥用消息队列?
噗此,这也叫问题?别人用了我能不用么?别人用了我就用了呗,我就是为了用而用。
你心里嘀咕就好了,千万别说出来哈,说出来了没拿到Offer别到时候就在那说,元芳那个渣男教我说的!
面试官你好:我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想,分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件。
哦?你说到业务场景越来越复杂,你那说一下你都在什么场景用到了消息队列?
嗯,我从三个方面去说一下我使用的场景吧
Tip:这三个场景也是消息队列的经典场景,大家基本上要烂熟于心那种,就是一说到消息队列你脑子就要想到**异步、削峰、解耦,**条件反射那种。
异步:
我们之前的场景里面有很多步骤都是在一个流程里面需要做完的,就比如说我的下单系统吧,本来我们业务简单,下单了付了钱就好了,流程就走完了。
但是后面来了个产品经理,搞了个优惠券系统,OK问题不大,流程里面多100ms去扣减优惠券。
后来产品经理灵光一闪说我们可以搞个积分系统啊,也行吧,流程里面多了200ms去增减积分。
再后来后来隔壁的产品老王说:下单成功后我们要给用户发短信,也将就吧,100ms去发个短信。
再后来。。。(元芳你有完没完!!!)
反正就流程有点像这样 ↓
你们可以看到这才加了三个,我可以斩钉截铁的告诉你真正的下单流程涉及的系统绝对在10个以上(主流电商),越大的越多。
这个链路这样下去,时间长得一批,用户发现我买个东西你特么要花几十秒,垃圾电商我不在你这里买了,不过要是都像并夕夕这么便宜,真香!
但是我们公司没有夕夕的那个经济实力啊,那只能优化系统了。
Tip:我之前在的电商老东家要求所有接口的Rt(ResponseTime响应时间)在200ms内,超出的全部优化,我现在所负责的系统QPS也是9W+就是抖动一下网络集群都可能炸锅那种,RT基本上都要求在50ms以内。
大家感受一下这个QPS。
嗯不错,链路长了就慢了,那你怎么解决的?
那链路长了就慢了,但是我们发现上面的流程其实可以同时做的呀,你支付成功后,我去校验优惠券的同时我可以去增减积分啊,还可以同时发个短信啊。
那正常的流程我们是没办法实现的呀,怎么办,异步。
你对比一下是不是发现,这样子最多只用100毫秒用户知道下单成功了,至于短信你迟几秒发给他他根本不在意是吧。
小伙子我打断你一下,你说了异步,那我用线程,线程池去做不是一样的么?
既然面试官这么问了,我就说一下为啥我们不能用线程去做,因为用线程去做,你是不是要写代码?
你一个订单流程,你扣积分,扣优惠券,发短信,扣库存。。。等等这么多业务要调用这么多的接口,每次加一个你要调用一个接口然后还要重新发布系统,写一次两次还好,写多了你就说:老子不干了!
而且真的全部都写在一起的话,不单单是耦合这一个问题,你出问题排查也麻烦,流程里面随便一个地方出问题搞不好会影响到其他的点,小伙伴说我每个流程都try catch不就行了,相信我别这么做,这样的代码就像个定时炸弹 ,你不知道什么时候爆炸,平时不炸偏偏在你做活动的时候炸,你就领个P0故障收拾书包提前回家过年吧。
Tip:P0—PN 是互联网大厂经常用来判定事故等级的机制,P0是最高等级了。
但是你用了消息队列,耦合这个问题就迎刃而解了呀。
哦,元芳怎么说?
且听我娓娓道来:
你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了。
那你的流程走完了,你不用管别人是否成功么?比如你下单了积分没加,优惠券没扣怎么办?
问题是个好问题,但是没必要考虑,业务系统本身就是自己的开发人员维护的,你积分扣失败关我下单的什么事情?你管好自己下单系统的就好了。
Tip:话是这么说,但是这其实是用了消息队列的一个缺点,涉及到分布式事务的知识点,我下面会提到。
削峰:
就拿我上一期写的秒杀来说(暗示新同学看我上一期),你平时流量很低,但是你要做秒杀活动00 :00的时候流量疯狂怼进来,你的服务器,Redis,MySQL各自的承受能力都不一样,你直接全部流量照单全收肯定有问题啊,直接就打挂了。
那怎么办?
简单,把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器处理能力,你能处理5000QPS你就消费这么多,可能会比正常的慢一点,但是不至于打挂服务器,等流量高峰下去了,你的服务也就没压力了。
你看阿里双十一12:00的时候这么多流量瞬间涌进去,他有时候是不是会慢一点,但是人家没挂啊,或者降级给你个友好的提示页面,等高峰过去了又是一条好汉了。
听你说了辣么多,怎么都是好处,那我问你使用了消息队列有啥问题么?
诶,看过前面我写的文章的人才都知道,我经常说的就是,技术是把双刃剑!
没错面试官,我使用他是因为他带给我们很多好处,但是使用之后问题也是接踵而至。
同样的暖男我呀,也从三个点介绍他主要的缺点:
系统复杂性
本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费、消息丢失、消息的顺序消费等等,反正用了之后就是贼烦。
我插一句嘴,上面的问题(重复消费、消息丢失、顺序消费)你能分别介绍一下,并且说一下分别是怎么解决的么?
不要!元芳都说了下一章写啥?
其实不是暖男我不想在这里写,这三个问题我想了下,统统都是MQ的重点问题,单独拿一个出来就是一篇文章了,篇幅实在太长了,我会在下一章挨个介绍一遍的。
-------->>>消息队列的坑(重复消费、顺序消费、消息丢失)
数据一致性
这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。
就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?
我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法,这样就像个渣男,没有格局,这样呀你的路会越走越窄的。
所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?
**分布式事务:**把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。
**Tip:**分布式事务在互联网公司里面实在常见,我也不在这里大篇幅介绍了,后面都会专门说的。
可用性
你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是杀一个程序员能搞定的吧,感觉得杀一片。
至于怎么保证高可用,还是那句话也不在这里展开讨论了,我后面一样会写,像写Redis那样写出来的。
放心元芳不是渣男来的,肯定会对你们负责的。点赞!
看不出来啊,你有点东西呀,那我问一下你,你们是怎么做技术选型的?
目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。
不过元芳想说的是,ActiveMQ和RabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。
Kafka和RocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源。
就像我们公司就是是基于Kafka和RocketMQ两者的优点自研的消息队列中间件,吞吐量、可靠性、时效性等都很可观。
我们回归正题,我这里用网上找的对比图让大家看看差距到底在哪里:
大家其实一下子就能看到差距了,就拿吞吐量来说,早期比较活跃的ActiveMQ 和RabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要。
比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?
再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继。
就部署方式而言前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。
我们再聊一下RabbitMQ这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。
至于RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。
Tip:Dubbo等我写到RPC我会详细介绍的。
Kafka我放到最后说,你们也应该知道了,压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。
以上这些都只是一些我自己的个人意见,真正的选型还是要去深入研究的,不然那你公司一天UV就1000你告诉我你要去用Kafka我只能说你吃饱撑的。
记住,没有最好的技术,只有最适合的技术,不要为了用而用。
面试结束
一>在接着下来讲讲什么是消息队列?
消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。
消息队列,一般我们会简称它为MQ(Message Queue),嗯,就是很直白的简写。
我们先不管消息(Message)这个词,来看看队列(Queue)。这一看,队列大家应该都熟悉吧。
队列是一种先进先出的数据结构。
在Java里边,已经实现了不少的队列了:
那为什么还需要消息队列(MQ)这种中间件呢???其实这个问题,跟之前我学Redis的时候很像。Redis是一个以key-value形式存储的内存数据库,明明我们可以使用类似HashMap这种实现类就可以达到类似的效果了,那还为什么要Redis?《Redis合集》
到这里,大家可以先猜猜为什么要用消息队列(MQ)这种中间件,下面会继续补充。
消息队列可以简单理解为:把要传输的数据放在队列中。
科普:
*把数据放到消息队列叫做**生产者**
*从消息队列里边取数据叫做**消费者**
二>为什么要用消息队列?
为什么要用消息队列,也就是在问:用了消息队列有什么好处。我们看看以下的场景
2.1 解耦
现在我有一个系统A,系统A可以产生一个userId
然后,现在有系统B和系统C都需要这个userId去做相关的操作
写成伪代码可能是这样的:
public class SystemA {
// 系统B和系统C的依赖
SystemB systemB = new SystemB();
SystemC systemC = new SystemC();
// 系统A独有的数据userId
private String userId = "Java3y";
public void doSomething() {
// 系统B和系统C都需要拿着系统A的userId去操作其他的事
systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
}
结构图如下:
ok,一切平安无事度过了几个天。
某一天,系统B的负责人告诉系统A的负责人,现在系统B的SystemBNeed2do(String userId)这个接口不再使用了,让系统A别去调它了。
于是,系统A的负责人说"好的,那我就不调用你了。",于是就把调用系统B接口的代码给删掉了:
public void doSomething() {
// 系统A不再调用系统B的接口了
//systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
又过了几天,系统D的负责人接了个需求,也需要用到系统A的userId,于是就跑去跟系统A的负责人说:“老哥,我要用到你的userId,你调一下我的接口吧”
于是系统A说:“没问题的,这就搞”
然后,系统A的代码如下:
public class SystemA {
// 已经不再需要系统B的依赖了
// SystemB systemB = new SystemB();
// 系统C和系统D的依赖
SystemC systemC = new SystemC();
SystemD systemD = new SystemD();
// 系统A独有的数据
private String userId = "Java3y";
public void doSomething() {
// 已经不再需要系统B的依赖了
//systemB.SystemBNeed2do(userId);
// 系统C和系统D都需要拿着系统A的userId去操作其他的事
systemC.SystemCNeed2do(userId);
systemD.SystemDNeed2do(userId);
}
}
时间飞逝:
*又过了几天,系统E的负责人过来了,告诉系统A,需要userId。
*又过了几天,系统B的负责人过来了,告诉系统A,还是重新掉那个接口吧。
*又过了几天,系统F的负责人过来了,告诉系统A,需要userId。
*……
于是系统A的负责人,每天都被这给骚扰着,改来改去,改来改去…
还有另外一个问题,调用系统C的时候,如果系统C挂了,系统A还得想办法处理。如果调用系统D时,由于网络延迟,请求超时了,那系统A是反馈fail还是重试??
最后,系统A的负责人,觉得隔一段时间就改来改去,没意思,于是就跑路了。
然后,公司招来一个大佬,大佬经过几天熟悉,上来就说:将系统A的userId写到消息队列中,这样系统A就不用经常改动了。为什么呢?下面我们来一起看看:
系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据。这样有什么好处?
- 系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心。
- 即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。
- 系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,只跟消息队列有关。
这样一来,系统A与系统B、C、D都解耦了。
2.2 异步
我们再来看看下面这种情况:系统A还是直接调用系统B、C、D
代码如下:
public class SystemA {
SystemB systemB = new SystemB();
SystemC systemC = new SystemC();
SystemD systemD = new SystemD();
// 系统A独有的数据
private String userId ;
public void doOrder() {
// 下订单
userId = this.order();
// 如果下单成功,则安排其他系统做一些事
systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
systemD.SystemDNeed2do(userId);
}
}
假设系统A运算出userId具体的值需要50ms,调用系统B的接口需要300ms,调用系统C的接口需要300ms,调用系统D的接口需要300ms。那么这次请求就需要50+300+300+300=950ms
并且我们得知,系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处理一些小事而已。
那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。所以,我们可以弄成是这样的:
系统A执行完了以后,将userId写到消息队列中,然后就直接返回了(至于其他的操作,则异步处理)。
- 本来整个请求需要用950ms(同步)
- 现在将调用其他系统接口异步化,只需要100ms(异步)
(例子可能举得不太好,但我觉得说明到点子上就行了,见谅。)
2.3削峰/限流
我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。
那多出来的1000个请求,可能就把我们整个系统给搞崩了…所以,有一种办法,我们可以写到消息队列中:
系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。
三>使用消息队列有什么问题?
经过我们上面的场景,我们已经可以发现,消息队列能做的事其实还是蛮多的。
说到这里,我们先回到文章的开头,"明明JDK已经有不少的队列实现了,我们还需要消息队列中间件呢?"其实很简单,JDK实现的队列种类虽然有很多种,但是都是简单的内存队列。为什么我说JDK是简单的内存队列呢?下面我们来看看要实现消息队列(中间件)可能要考虑什么问题。
3.1高可用
无论是我们使用消息队列来做解耦、异步还是削峰,消息队列肯定不能是单机的。试着想一下,如果是单机的消息队列,万一这台机器挂了,那我们整个系统几乎就是不可用了。
所以,当我们项目中使用消息队列,都是得集群/分布式的。要做集群/分布式就必然希望该消息队列能够提供现成的支持,而不是自己写代码手动去实现。
3.2 数据丢失问题
我们将数据写到消息队列上,系统B和C还没来得及取消息队列的数据,就挂掉了。如果没有做任何的措施,我们的数据就丢了.
学过Redis的都知道,Redis可以将数据持久化磁盘上,万一Redis挂了,还能从磁盘从将数据恢复过来。同样地,消息队列中的数据也需要存在别的地方,这样才尽可能减少数据的丢失。
那存在哪呢?
- 磁盘?
- 数据库?
- Redis?
- 分布式文件系统?
同步存储还是异步存储?
3.3消费者怎么得到消息队列的数据?
消费者怎么从消息队列里边得到数据?有两种办法:
- 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
- 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)
3.4其他
除了这些,我们在使用的时候还得考虑各种的问题:
- 消息重复消费了怎么办啊?
- 我想保证消息是绝对有顺序的怎么做?
- ………
虽然消息队列给我们带来了那么多的好处,但同时我们发现引入消息队列也会提高系统的复杂性。市面上现在已经有不少消息队列*了,每种消息队列都有自己的特点,选取哪种MQ还得好好斟酌。
最后
本文主要讲解了什么是消息队列,消息队列可以为我们带来什么好处,以及一个消息队列可能会涉及到哪些问题。希望给大家带来一定的帮助。
参考资料:
Kafka简明教程
消息队列使用的四种场景介绍,有图有解析,一看就懂
消息队列设计精要
消息队列的使用场景是怎样的
什么是消息队列还不懂的接着看这
消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
Producer:消息生产者,负责产生和发送消息到 Broker;
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
二、为什么需要消息队列
1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。
2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。
3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
4、复用:一次发送多次消费。
5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。
6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。
三、消息队列有什么优点和缺点
1、核心优点
解耦、异步、削峰
2、缺点
1)系统可用性降低:系统引入的外部依赖越多,越容易挂掉。
2)系统复杂度提高了
3)一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不一致
四、消息队列对比
各种对比之后,有如下建议:
ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐;
RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐中小型公司使用;推荐
RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景;强烈推荐
kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
五、RabbitMQ介绍
1、RabbitMQ基础概念
通常我们谈到消息队列服务, 会有三个概念: 发消息者、消息队列、收消息者。
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。
消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。
1.Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。
7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
7.Broker:RabbitMQ Server,服务器实体。
2、Exchange消息调度策略
调度策略是指Exchange在收到生产者发送的消息后依据什么规则把消息转发到一个或多个队列中保存。调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)。
Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者在将消息发送给Exchange的时候,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效。
在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里。
1.Fanout (订阅模式|广播模式)
交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式
与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
2、Direct(路由模式)
精确匹配:当消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。
RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。
3、Topic (通配符模式)
按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。
Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ ”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
4、Headers(键值对模式)
Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,交换器的路由规则是通过消息头的Headers属性来进行匹配转发的,类似HTTP请求的Headers。
在绑定Queue与Exchange时指定一组键值对,键值对的Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或all,代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)。
当消息发送到Exchange时,交换器会取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。Headers交换机的优势是匹配的规则不被限定为字符串(String),而是Object类型。
如下图所示,RabbitMQdirect模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。
我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例:
RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。
RoutingKey为“red.critical.high”的日志会只投递到queue2。
RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。
3、RabbitMQ其他概念
1、rpc
生产者根据消费者返回的值,针对不同的消息(有唯一标识 或 类型标识)执行相应的逻辑
2、消息应答
消费者拿到消息并处理完相应逻辑后,必须应答,否则队列中的消息会不断堆积
3、消息持久化
队列可以设置将消息持久化,防止服务器突然宕机,导致消息丢失,当然这样会损耗性能,占用磁盘空间
4、分发机制
①轮询分发:队列给每一个消费者发送数量一样的数据
②公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。
5、事务
对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,但背离了消息队列解耦的本质
6、Confirm机制
对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,是异步的,性能比事务好很多
7、Alternate Exchange(代替交换器)
是Rabbitmq自己扩展的功能,不是AMQP协议定义的,可以将没有正确分发的消息交由该 交换机 再次分发,再次分发失败,则消息丢失
8、TTL(生存时间)
9、Queue Length Limit(队列长度限制)
10、Dead Letter Exchange(死信交换器)
11、priority queue(优先级队列)
12、延迟队列
①延迟消费。比如:
用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
②延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
通过设置TTL(生存时间) 和 Dead Letter Exchange(死信交换器),将消息重新分发。
六、RabbitMQ使用
1、添加maven依赖
org.springframework.boot
spring-boot-starter-amqp
2、添加rabbitmq java config 配置类
3、发送消息
4、消费消息
5、禁用spring boot 自动配置(约定大于配置)
以上就是基于spring boot 的 maven项目 集成rabbitmq 的所有配置。
剩下的就是交换机的灵活运用了。
消息队列的好处
消息队列可用于取消重量级处理的耦合,缓冲或批处理工作以及平滑尖峰工作负载。
1、更好的性能
消息队列启用异步通信,这意味着正在生成和使用消息的端点与队列交互,而不是彼此交互。生产者可以将请求添加到队列中,而无需等待它们被处理。消费者仅在消息可用时对其进行处理。系统中的任何组件都不会停止等待其他组件,从而优化了数据流。
2、可靠性更高
队列使数据持久化,并减少了系统不同部分脱机时发生的错误。通过使用消息队列分隔不同的组件,可以创建更多的容错能力。如果系统的某个部分永远无法访问,则另一部分仍然可以继续与队列进行交互。队列本身也可以进行镜像以提高可用性。
3、可扩展性
消息队列使精确缩放到所需位置成为可能。当工作负载达到高峰时,你的应用程序的多个实例都可以将请求添加到队列中,而不会发生冲突。随着这些传入请求的队列越来越长,可以在一组使用者中分配工作负载。生产者,消费者和队列本身都可以按需增长和收缩。
4、简化去耦
消息队列消除了组件之间的依赖关系,并大大简化了分离应用程序的编码。软件组件无需考虑通信代码,而可以设计成执行离散的业务功能。
无论使用的是单片应用程序,微服务还是无服务器架构,消息队列都是一种使分布式系统脱钩的简便方法。
消息队列的类型
1、点对点
点对点意味着消息通过队列从一个应用程序(生产者/发送者)发送到另一应用程序(消费者/接收者)。队列中可以有不止一个消费者,但是只有其中一个会很乐意接收消息。因此,它是点对点或一对一。
2、发布/订阅
另一方面,发布/订阅是一种消息传递模型,其中消息通过主题发送给多个使用者(或订户)。主题是发布者和订阅者之间的链接。订户可能会也可能不会确认已发布的消息。
更多答案请走这---->>>消息队列(mq)是什么?