mq使用场景、不丢不重、时序性

mq使用场景、不丢不重、时序性、削峰

参考:

http://zhuanlan.51cto.com/art/201704/536407.htm

http://zhuanlan.51cto.com/art/201703/535090.htm

http://zhuanlan.51cto.com/art/201704/536306.htm

http://zhuanlan.51cto.com/art/201611/521602.htm

http://zhuanlan.51cto.com/art/201611/521602.htm

http://zhuanlan.51cto.com/art/201703/534752.htm

http://zhuanlan.51cto.com/art/201703/534475.htm

微信公众号:架构师之路

到底什么时候该使用MQ?

一、缘起

一切脱离业务的架构设计与新技术引入都是耍流氓。

引入一个技术之前,首先应该解答的问题是,这个技术解决什么问题。

就像微服务分层架构之前,应该首先回答,为什么要引入微服务,微服务究竟解决什么问题(详见《互联网架构为什么要做微服务?》)。

最近分享了几篇MQ相关的文章:

MQ如何实现延时消息

MQ如何实现消息必达

MQ如何实现幂等性

不少网友询问,究竟什么时候使用MQ,MQ究竟适合什么场景,故有了此文。

二、MQ是干嘛的

消息总线(Message Queue),后文称MQ,是一种跨进程的通信机制,用于上下游传递消息。

mq使用场景、不丢不重、时序性

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

使用了MQ之后,消息发送上游只需要依赖MQ,逻辑上和物理上都不用依赖其他服务。

三、什么时候不使用消息总线

mq使用场景、不丢不重、时序性

既然MQ是互联网分层架构中的解耦利器,那所有通讯都使用MQ岂不是很好?这是一个严重的误区,调用与被调用的关系,是无法被MQ取代的。

MQ的不足是:

1)系统更复杂,多了一个MQ组件

2)消息传递路径更长,延时会增加

3)消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证

4)上游无法知道下游的执行结果,这一点是很致命的

举个栗子:用户登录场景,登录页面调用passport服务,passport服务的执行结果直接影响登录结果,此处的“登录页面”与“passport服务”就必须使用调用关系,而不能使用MQ通信。

无论如何,记住这个结论调用方实时依赖执行结果的业务场景,请使用调用,而不是MQ

四、什么时候使用MQ

【典型场景一:数据驱动的任务依赖】

什么是任务依赖,举个栗子,互联网公司经常在凌晨进行一些数据统计任务,这些任务之间有一定的依赖关系,比如:

1)task3需要使用task2的输出作为输入

2)task2需要使用task1的输出作为输入

这样的话,tast1, task2, task3之间就有任务依赖关系,必须task1先执行,再task2执行,载task3执行。

mq使用场景、不丢不重、时序性

对于这类需求,常见的实现方式是,使用cron人工排执行时间表:

1)task1,0:00执行,经验执行时间为50分钟

2)task2,1:00执行(为task1预留10分钟buffer),经验执行时间也是50分钟

3)task3,2:00执行(为task2预留10分钟buffer)

这种方法的坏处是:

1)如果有一个任务执行时间超过了预留buffer的时间,将会得到错误的结果,因为后置任务不清楚前置任务是否执行成功,此时要手动重跑任务,还有可能要调整排班表

2)总任务的执行时间很长,总是要预留很多buffer,如果前置任务提前完成,后置任务不会提前开始

3)如果一个任务被多个任务依赖,这个任务将会称为关键路径,排班表很难体现依赖关系,容易出错

4)如果有一个任务的执行时间要调整,将会有多个任务的执行时间要调整

无论如何,采用“cron排班表”的方法,各任务耦合,谁用过谁痛谁知道(采用此法的请评论留言)

mq使用场景、不丢不重、时序性

优化方案是,采用MQ解耦:

1)task1准时开始,结束后发一个“task1 done”的消息

2)task2订阅“task1 done”的消息,收到消息后第一时间启动执行,结束后发一个“task2 done”的消息

3)task3同理

采用MQ的优点是:

1)不需要预留buffer,上游任务执行完,下游任务总会在第一时间被执行

2)依赖多个任务,被多个任务依赖都很好处理,只需要订阅相关消息即可

3)有任务执行时间变化,下游任务都不需要调整执行时间

需要特别说明的是,MQ只用来传递上游任务执行完成的消息,并不用于传递真正的输入输出数据。

【典型场景二:上游不关心执行结果】

上游需要关注执行结果时要用“调用”,上游不关注执行结果时,就可以使用MQ了。

举个栗子,58同城的很多下游需要关注“用户发布帖子”这个事件,比如招聘用户发布帖子后,招聘业务要奖励58豆,房产用户发布帖子后,房产业务要送2个置顶,二手用户发布帖子后,二手业务要修改用户统计数据。

mq使用场景、不丢不重、时序性

对于这类需求,常见的实现方式是,使用调用关系:

帖子发布服务执行完成之后,调用下游招聘业务、房产业务、二手业务,来完成消息的通知,但事实上,这个通知是否正常正确的执行,帖子发布服务根本不关注。

这种方法的坏处是:

1)帖子发布流程的执行时间增加了

2)下游服务当机,可能导致帖子发布服务受影响,上下游逻辑+物理依赖严重

3)每当增加一个需要知道“帖子发布成功”信息的下游,修改代码的是帖子发布服务,这一点是最恶心的,属于架构设计中典型的依赖倒转,谁用过谁痛谁知道(采用此法的请评论留言)

mq使用场景、不丢不重、时序性

优化方案是,采用MQ解耦:

1)帖子发布成功后,向MQ发一个消息

2)哪个下游关注“帖子发布成功”的消息,主动去MQ订阅

采用MQ的优点是:

1)上游执行时间短

2)上下游逻辑+物理解耦,除了与MQ有物理连接,模块之间都不相互依赖

3)新增一个下游消息关注方,上游不需要修改任何代码

典型场景三:上游关注执行结果,但执行时间很长

有时候上游需要关注执行结果,但执行结果时间很长(典型的是调用离线处理,或者跨公网调用),也经常使用回调网关+MQ来解耦。

举个栗子,微信支付,跨公网调用微信的接口,执行时间会比较长,但调用方又非常关注执行结果,此时一般怎么玩呢?

mq使用场景、不丢不重、时序性

一般采用“回调网关+MQ”方案来解耦:

1)调用方直接跨公网调用微信接口

2)微信返回调用成功,此时并不代表返回成功

3)微信执行完成后,回调统一网关

4)网关将返回结果通知MQ

5)请求方收到结果通知

这里需要注意的是,不应该由回调网关来调用上游来通知结果,如果是这样的话,每次新增调用方,回调网关都需要修改代码,仍然会反向依赖,使用回调网关+MQ的方案,新增任何对微信支付的调用,都不需要修改代码啦。

五、总结

MQ是一个互联网架构中常见的解耦利器。

 

什么时候不使用MQ?

上游实时关注执行结果

什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

==【完】==

相关阅读:

MQ如何实现延时消息

MQ如何实现消息必达

MQ如何实现幂等性

消息总线能否实现消息必达?

一、缘起

上周讨论了两期环形队列的业务应用:

《高效定时任务的触发》

《延迟消息的快速实现》

两期的均有大量读者提问:

  • 任务、延迟消息都放在内存里,万一重启了怎么办?
  • 能否保证消息必达?

今天就简单聊聊消息队列(MsgQueue)的消息必达性架构与流程。

二、架构方向

MQ要想尽量消息必达,架构上有两个核心设计点:

(1)消息落地

(2)消息超时、重传、确认

三、MQ核心架构

mq使用场景、不丢不重、时序性

上图是一个MQ的核心架构图,基本可以分为三大块:

(1)发送方 -> 左侧粉色部分

(2)MQ核心集群 -> 中间蓝色部分

(3)接收方 -> 右侧黄色部分

粉色发送方又由两部分构成:业务调用方与MQ-client-sender

其中后者向前者提供了两个核心API:

  • SendMsg(bytes[] msg)
  • SendCallback()

蓝色MQ核心集群又分为四个部分:MQ-server,zk,db,管理后台web

黄色接收方也由两部分构成:业务接收方与MQ-client-receiver

其中后者向前者提供了两个核心API:

  • RecvCallback(bytes[] msg)
  • SendAck()

MQ是一个系统间解耦的利器,它能够很好的解除发布订阅者之间的耦合,它将上下游的消息投递解耦成两个部分,如上述架构图中的1箭头和2箭头:

(1)发送方将消息投递给MQ,上半场

(2)MQ将消息投递给接收方,下半场

四、MQ消息可靠投递核心流程

MQ既然将消息投递拆成了上下半场,为了保证消息的可靠投递,上下半场都必须尽量保证消息必达。

mq使用场景、不丢不重、时序性

MQ消息投递上半场,MQ-client-sender到MQ-server流程见上图1-3:

  • MQ-client将消息发送给MQ-server(此时业务方调用的是API:SendMsg)
  • MQ-server将消息落地,落地后即为发送成功
  • MQ-server将应答发送给MQ-client(此时回调业务方是API:SendCallback)

MQ消息投递下半场,MQ-server到MQ-client-receiver流程见上图4-6:

  • MQ-server将消息发送给MQ-client(此时回调业务方是API:RecvCallback)
  • MQ-client回复应答给MQ-server(此时业务方主动调用API:SendAck)
  • MQ-server收到ack,将之前已经落地的消息删除,完成消息的可靠投递

1. 如果消息丢了怎么办?

MQ消息投递的上下半场,都可以出现消息丢失,为了降低消息丢失的概率,MQ需要进行超时和重传。

2. 上半场的超时与重传

MQ上半场的1或者2或者3如果丢失或者超时,MQ-client-sender内的timer会重发消息,直到期望收到3,如果重传N次后还未收到,则SendCallback回调发送失败,需要注意的是,这个过程中MQ-server可能会收到同一条消息的多次重发。

3. 下半场的超时与重传

MQ下半场的4或者5或者6如果丢失或者超时,MQ-server内的timer会重发消息,直到收到5并且成功执行6,这个过程可能会重发很多次消息,一般采用指数退避的策略,先隔x秒重发,2x秒重发,4x秒重发,以此类推,需要注意的是,这个过程中MQ-client-receiver也可能会收到同一条消息的多次重发。

MQ-client与MQ-server如何进行消息去重,如何进行架构幂等性设计,下一次撰文另述,此处暂且认为为了保证消息必达,可能收到重复的消息。

五、总结

消息总线是系统之间的解耦利器,但切勿滥用,未来也会撰文细究MQ的使用场景,消息总线为了尽量保证消息必达,架构设计方向为:

  • 消息收到先落地
  • 消息超时、重传、确认保证消息必达

消息总线真的能保证幂等?

一、缘起

如《消息总线消息必达》所述,MQ消息必达,架构上有两个核心设计点:

(1)消息落地

(2)消息超时、重传、确认

mq使用场景、不丢不重、时序性

再次回顾消息总线核心架构,它由发送端、服务端、固化存储、接收端四大部分组成。

为保证消息的可达性,超时、重传、确认机制可能导致消息总线、或者业务方收到重复的消息,从而对业务产生影响。

举个栗子:

购买会员卡,上游支付系统负责给用户扣款,下游系统负责给用户发卡,通过MQ异步通知。不管是上半场的ACK丢失,导致MQ收到重复的消息,还是下半场ACK丢失,导致购卡系统收到重复的购卡通知,都可能出现,上游扣了一次钱,下游发了多张卡。

消息总线的幂等性设计至关重要,是本文将要讨论的重点。

二、上半场的幂等性设计

mq使用场景、不丢不重、时序性

MQ消息发送上半场,即上图中的1-3

  • 1,发送端MQ-client将消息发给服务端MQ-server
  • 2,服务端MQ-server将消息落地
  • 3,服务端MQ-server回ACK给发送端MQ-client

如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息。

此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:

(1)全局唯一

(2)MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽

有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。

三、下半场的幂等性设计

mq使用场景、不丢不重、时序性

MQ消息发送下半场,即上图中的4-6

  • 4,服务端MQ-server将消息发给接收端MQ-client
  • 5,接收端MQ-client回ACK给服务端
  • 6,服务端MQ-server将落地消息删除

需要强调的是,接收端MQ-client回ACK给服务端MQ-server,是消息消费业务方的主动调用行为,不能由MQ-client自动发起,因为MQ系统不知道消费方什么时候真正消费成功。

如果5丢失,服务端MQ-server超时后会重发消息,可能导致MQ-client收到重复的消息。

此时重发是MQ-server发起的,消息的处理是消息消费业务方,消息重发势必导致业务方重复消费(上例中的一次付款,重复发卡),为了保证业务幂等性,业务消息体中,必须有一个biz-id,作为去重和幂等的依据,这个业务ID的特性是:

(1)对于同一个业务场景,全局唯一

(2)由业务消息发送方生成,业务相关,对MQ透明

(3)由业务消息消费方负责判重,以保证幂等

最常见的业务ID有:支付ID,订单ID,帖子ID等。

具体到支付购卡场景,发送方必须将支付ID放到消息体中,消费方必须对同一个支付ID进行判重,保证购卡的幂等。

有了这个业务ID,才能够保证下半场消息消费业务方即使收到重复消息,也只有1条消息被消费,保证了幂等。

三、总结

MQ为了保证消息必达,消息上下半场均可能发送重复消息,如何保证消息的幂等性呢?

上半场

  • MQ-client生成inner-msg-id,保证上半场幂等。
  • 这个ID全局唯一,业务无关,由MQ保证。

下半场

  • 业务发送方带入biz-id,业务接收方去重保证幂等。
  • 这个ID对单业务唯一,业务相关,对MQ透明。

结论:幂等性,不仅对MQ有要求,对业务上下游也有要求。

消息“时序”与“一致性”为何这么难?

分布式系统中,很多业务场景都需要考虑消息投递的时序,例如:

(1)单聊消息投递,保证发送方发送顺序与接收方展现顺序一致

(2)群聊消息投递,保证所有接收方展现顺序一致

(3)充值支付消息,保证同一个用户发起的请求在服务端执行序列一致

消息时序是分布式系统架构设计中非常难的问题,ta为什么难,有什么常见优化实践,是本文要讨论的问题。

一、为什么时序难以保证,消息一致性难?

为什么分布式环境下,消息的时序难以保证,这边简要分析了几点原因:

【时钟不一致】

mq使用场景、不丢不重、时序性

分布式环境下,有多个客户端、有web集群、service集群、db集群,他们都分布在不同的机器上,机器之间都是使用的本地时钟,而没有一个所谓的“全局时钟”,所以不能用“本地时间”来完全决定消息的时序。

【多客户端(发送方)】

mq使用场景、不丢不重、时序性

多服务器不能用“本地时间”进行比较,假设只有一个接收方,能否用接收方本地时间表示时序呢?遗憾的是,由于多个客户端的存在,即使是一台服务器的本地时间,也无法表示“绝对时序”。

如上图,绝对时序上,APP1先发出msg1,APP2后发出msg2,都发往服务器web1,网络传输是不能保证msg1一定先于msg2到达的,所以即使以一台服务器web1的时间为准,也不能精准描述msg1与msg2的绝对时序。

【服务集群(多接收方)】

mq使用场景、不丢不重、时序性

多发送方不能保证时序,假设只有一个发送方,能否用发送方的本地时间表示时序呢?遗憾的是,由于多个接收方的存在,无法用发送方的本地时间,表示“绝对时序”。

如上图,绝对时序上,web1先发出msg1,后发出msg2,由于网络传输及多接收方的存在,无法保证msg1先被接收到先被处理,故也无法保证msg1与msg2的处理时序。

【网络传输与多线程】

mq使用场景、不丢不重、时序性

多发送方与多接收方都难以保证绝对时序,假设只有单一的发送方与单一的接收方,能否保证消息的绝对时序呢?结论是悲观的,由于网络传输与多线程的存在,仍然不行。

如上图,web1先发出msg1,后发出msg2,即使msg1先到达(网络传输其实还不能保证msg1先到达),由于多线程的存在,也不能保证msg1先被处理完。

【怎么保证绝对时序】

通过上面的分析,假设只有一个发送方,一个接收方,上下游连接只有一条连接池,通过阻塞的方式通讯,难道不能保证先发出的消息msg1先处理么?

回答:可以,但吞吐量会非常低,而且单发送方单接收方单连接池的假设不太成立,高并发高可用的架构不会允许这样的设计出现。

二、优化实践

【以客户端或者服务端的时序为准】

多客户端、多服务端导致“时序”的标准难以界定,需要一个标尺来衡量时序的先后顺序,可以根据业务场景,以客户端或者服务端的时间为准,例如:

(1)邮件展示顺序,其实是以客户端发送时间为准的,潜台词是,发送方只要将邮件协议里的时间调整为1970年或者2970年,就可以在接收方收到邮件后一直“置顶”或者“置底”

(2)秒杀活动时间判断,肯定得以服务器的时间为准,不可能让客户端修改本地时间,就能够提前秒杀

【服务端能够生成单调递增的id】

这个是毋庸置疑的,不展开讨论,例如利用单点写db的seq/auto_inc_id肯定能生成单调递增的id,只是说性能及扩展性会成为潜在瓶颈。对于严格时序的业务场景,可以利用服务器的单调递增id来保证时序。

【大部分业务能接受误差不大的趋势递增id】

消息发送、帖子发布时间、甚至秒杀时间都没有这么精准时序的要求:

(1)同1s内发布的聊天消息时序乱了

(2)同1s内发布的帖子排序不对

(3)用1s内发起的秒杀,由于服务器多台之间时间有误差,落到A服务器的秒杀成功了,落到B服务器的秒杀还没开始,业务上也是可以接受的(用户感知不到)

所以,大部分业务,长时间趋势递增的时序就能够满足业务需求,非常短时间的时序误差一定程度上能够接受。

关于绝对递增id,趋势递增id的生成架构,详见文章《细聊分布式ID生成方法》,此处不展开。

【利用单点序列化,可以保证多机相同时序】

数据为了保证高可用,需要做到进行数据冗余,同一份数据存储在多个地方,怎么保证这些数据的修改消息是一致的呢?利用的就是“单点序列化”:

(1)先在一台机器上序列化操作

(2)再将操作序列分发到所有的机器,以保证多机的操作序列是一致的,最终数据是一致的

典型场景一:数据库主从同步

mq使用场景、不丢不重、时序性

数据库的主从架构,上游分别发起了op1,op2,op3三个操作,主库master来序列化所有的SQL写操作op3,op1,op2,然后把相同的序列发送给从库slave执行,以保证所有数据库数据的一致性,就是利用“单点序列化”这个思路。

典型场景二:GFS中文件的一致性

mq使用场景、不丢不重、时序性

GFS(Google File System)为了保证文件的可用性,一份文件要存储多份,在多个上游对同一个文件进行写操作时,也是由一个主chunk-server先序列化写操作,再将序列化后的操作发送给其他chunk-server,来保证冗余文件的数据一致性的。

【单对单聊天,怎么保证发送顺序与接收顺序一致】

单人聊天的需求,发送方A依次发出了msg1,msg2,msg3三个消息给接收方B,这三条消息能否保证显示时序的一致性(发送与显示的顺序一致)?

回答:

(1)如果利用服务器单点序列化时序,可能出现服务端收到消息的时序为msg3,msg1,msg2,与发出序列不一致

(2)业务上不需要全局消息一致,只需要对于同一个发送方A,ta发给B的消息时序一致就行,常见优化方案,在A往B发出的消息中,加上发送方A本地的一个绝对时序,来表示接收方B的展现时序

msg1{seq:10, receiver:B,msg:content1 }

msg2{seq:20, receiver:B,msg:content2 }

msg3{seq:30, receiver:B,msg:content3 }

mq使用场景、不丢不重、时序性

潜在问题:如果接收方B先收到msg3,msg3会先展现,后收到msg1和msg2后,会展现在msg3的前面。

无论如何,是按照接收方收到时序展现,还是按照服务端收到的时序展现,还是按照发送方发送时序展现,是pm需要思考的点,技术上都能够实现(接收方按照发送时序展现是更合理的)。

总之,需要一杆标尺来衡量这个时序。

【群聊消息,怎么保证各接收方收到顺序一致】

群聊消息的需求,N个群友在一个群里聊,怎么保证所有群友收到的消息显示时序一致?

回答:

(1)不能再利用发送方的seq来保证时序,因为发送方不单点,时间也不一致

(2)可以利用服务器的单点做序列化

mq使用场景、不丢不重、时序性

此时群聊的发送流程为:

(1)sender1发出msg1,sender2发出msg2

(2)msg1和msg2经过接入集群,服务集群

(3)service层到底层拿一个唯一seq,来确定接收方展示时序

(4)service拿到msg2的seq是20,msg1的seq是30

(5)通过投递服务讲消息给多个群友,群友即使接收到msg1和msg2的时间不同,但可以统一按照seq来展现

这个方法能实现,所有群友的消息展示时序相同。

缺点是,这个生成全局递增序列号的服务很容易成为系统瓶颈,还有没有进一步的优化方法呢?

思路:群消息其实也不用保证全局消息序列有序,而只要保证一个群内的消息有序即可,这样的话,“id串行化”就成了一个很好的思路。

mq使用场景、不丢不重、时序性

这个方案中,service层不再需要去一个统一的后端拿全局seq,而是在service连接池层面做细小的改造,保证一个群的消息落在同一个service上,这个service就可以用本地seq来序列化同一个群的所有消息,保证所有群友看到消息的时序是相同的。

关于id串行化的细节,可详见《利用id串行化解决缓存与数据库一致性问题》,此处不展开。

三、总结

(1)分布式环境下,消息的有序性是很难的,原因多种多样:时钟不一致,多发送方,多接收方,多线程,网络传输不确定性等

(2)要“有序”,先得有衡量“有序”的标尺,可以是客户端标尺,可以是服务端标尺

(3)大部分业务能够接受大范围趋势有序,小范围误差;绝对有序的业务,可以借助服务器绝对时序的能力

(4)单点序列化,是一种常见的保证多机时序统一的方法,典型场景有db主从一致,gfs多文件一致

(5)单对单聊天,只需保证发出的时序与接收的时序一致,可以利用客户端seq

(6)群聊,只需保证所有接收方消息时序一致,需要利用服务端seq,方法有两种,一种单点绝对时序,另一种id串行化

1分钟实现“延迟消息”功能

一、缘起

很多时候,业务有“在一段时间之后,完成一个工作任务”的需求。

例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

一般来说怎么实现这类“48小时后自动评价为5星”需求呢?

1. 常见方案:

启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价。

假设订单表的结构为:t_order(oid, finish_time, stars, status, …),更具体的,定时任务每隔一个小时会这么做一次:

  1. select oid from t_order where finish_time > 48hours and status=0;
  2. update t_order set stars=5 and status=1 where oid in[…];

如果数据量很大,需要分页查询,分页update,这将会是一个for循环。

2. 方案的不足:

(1)轮询效率比较低

(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑

(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时

(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显

如何利用“延时消息”,对于每个任务只触发一次,保证效率的同时保证实时性,是今天要讨论的问题。

二、高效延时消息设计与实现

高效延时消息,包含两个重要的数据结构:

  • 环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
  • 任务集合,环上每一个slot是一个Set

同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:

  • Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
  • Task-Function:需要执行的任务指针

mq使用场景、不丢不重、时序性

假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:

  • 计算这个Task应该放在哪一个slot,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个slot的Set中
  • 计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1

Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set,每个Task看Cycle-Num是不是0:

  • 如果不是0,说明还需要多移动几圈,将Cycle-Num减1
  • 如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从Set中删除

使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:

  • 无需再轮询全部订单,效率高
  • 一个订单,任务只执行一次
  • 时效性好,精确到秒(控制timer移动频率可以控制精度)

三、总结

环形队列是一个实现“延时消息”的好方法,开源的MQ好像都不支持延迟消息,不妨自己实现一个简易的“延时消息队列”,能解决很多业务问题,并减少很多低效扫库的cron任务。

一、缘起

很多时候,业务有定时任务或者定时超时的需求,当任务量很大时,可能需要维护大量的timer,或者进行低效的扫描。

例如:58到家APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps。

一般来说怎么实现这类需求呢?

1. “轮询扫描法”

1)用一个Map

2)当某个用户uid有请求包来到,实时更新这个Map

3)启动一个timer,当Map中不为空时,轮询扫描这个Map,看每个uid的last_packet_time是否超过30s,如果超过则进行超时处理

2. “多timer触发法”

1)用一个Map

2)当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer,30s之后触发

3)每个uid请求包对应的timer触发后,看Map中,查看这个uid的last_packet_time是否超过30s,如果超过则进行超时处理

  • 方案一:只启动一个timer,但需要轮询,效率较低
  • 方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源

特别在同时在线量很大时,很容易CPU100%,如何高效维护和触发大量的定时/超时任务,是本文要讨论的问题。

二、环形队列法

废话不多说,三个重要的数据结构:

1)30s超时,就创建一个index从0到30的环形队列(本质是个数组)

2)环上每一个slot是一个Set,任务集合

3)同时还有一个Map

mq使用场景、不丢不重、时序性

同时:

1)启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…

2)有一个Current Index指针来标识刚检测过的slot

1. 当有某用户uid有请求包到达时:

1)从Map结构中,查找出这个uid存储在哪一个slot里

2)从这个slot的Set结构中,删除这个uid

3)将uid重新加入到新的slot中,具体是哪一个slot呢 =>Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到

(4)更新Map,这个uid对应slot的index值

2. 哪些元素会被超时掉呢?

Current Index每秒种移动一个slot,这个slot对应的Set中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。

3. 优势:

(1)只需要1个timer

(2)timer每1s只需要一次触发,消耗CPU很低

(3)批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉

三、总结

这个环形队列法是一个通用的方法,Set和Map中可以是任何task,本文的uid是一个最简单的举例。

问:为什么会有本文?

答:上一篇文章《到底什么时候该使用MQ?》引起了广泛的讨论,有朋友回复说,MQ的还有一个典型应用场景是缓冲流量,削峰填谷,本文将简单介绍下,MQ要实现什么细节,才能缓冲流量,削峰填谷。

问:站点与服务,服务与服务上下游之间,一般如何通讯?

答:有两种常见的方式

mq使用场景、不丢不重、时序性

一种是“直接调用”,通过RPC框架,上游直接调用下游。

mq使用场景、不丢不重、时序性

在某些业务场景之下(具体哪些业务场景,见《到底什么时候该使用MQ?》),可以采用“MQ推送”,上游将消息发给MQ,MQ将消息推送给下游。

问:为什么会有流量冲击?

答:不管采用“直接调用”还是“MQ推送”,都有一个缺点,下游消息接收方无法控制到达自己的流量,如果调用方不限速,很有可能把下游压垮。

举个栗子,秒杀业务:

上游发起下单操作

下游完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)

上游下单业务简单,每秒发起了10000个请求,下游秒杀业务复杂,每秒只能处理2000个请求,很有可能上游不限速的下单,导致下游系统被压垮,引发雪崩。

为了避免雪崩,常见的优化方案有两种:

1)业务上游队列缓冲,限速发送

2)业务下游队列缓冲,限速执行

不管哪种方案,都会引入业务的复杂性,有“缓冲流量”需求的系统都需要加入类似的机制(具体怎么保证消息可达,见《消息总线能否实现消息必达?》),正所谓“通用痛点统一解决”,需要一个通用的机制解决这个问题。

问:如何缓冲流量?

答:明明中间有了MQ,并且MQ有消息落地的机制,为何不能利用MQ来做缓冲呢?显然是可以的。

问:MQ怎么改能缓冲流量?

答:由MQ-server推模式,升级为MQ-client拉模式。

mq使用场景、不丢不重、时序性

MQ-client根据自己的处理能力,每隔一定时间,或者每次拉取若干条消息,实施流控,达到保护自身的效果。并且这是MQ提供的通用功能,无需上下游修改代码。

问:如果上游发送流量过大,MQ提供拉模式确实可以起到下游自我保护的作用,会不会导致消息在MQ中堆积?

答:下游MQ-client拉取消息,消息接收方能够批量获取消息,需要下游消息接收方进行优化,方能够提升整体吞吐量,例如:批量写。

结论

1)MQ-client提供拉模式,定时或者批量拉取,可以起到削平流量,下游自我保护的作用(MQ需要做的)

2)要想提升整体吞吐量,需要下游优化,例如批量处理等方式(消息接收方需要做的)

58到家架构优化具备整体性,需要通用服务和业务方一起优化升级。

一、消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

二、消息队列应用场景
以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。

2.1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
mq使用场景、不丢不重、时序性

b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
mq使用场景、不丢不重、时序性

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
mq使用场景、不丢不重、时序性
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
mq使用场景、不丢不重、时序性
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:
mq使用场景、不丢不重、时序性
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.3流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
mq使用场景、不丢不重、时序性
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理

2.4日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
mq使用场景、不丢不重、时序性
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据

2.5消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
点对点通讯:
mq使用场景、不丢不重、时序性
客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:
mq使用场景、不丢不重、时序性
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

三、消息中间件示例 
3.1电商系统
mq使用场景、不丢不重、时序性
消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

3.2日志收集系统
mq使用场景、不丢不重、时序性
分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
Zookeeper注册中心,提出负载均衡和地址查找服务
日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
Kafka集群:接收,路由,存储,转发等消息处理
Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

MQ选型对比文档

mq使用场景、不丢不重、时序性 
综合选择RabbitMq

Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务。

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

上一篇:.Net Core with 微服务 - 分布式事务 - 可靠消息最终一致性


下一篇:【分布式事务】使用atomikos+jta解决分布式事务问题