作者:阿里云智能IoT事业部 高级技术专家 吕建文
随着接触客户越来越多,也越来越颠覆了我对“传统队列”(kafka、rocketmq、rabbitmq...)的看法。 当然本文不是说“传统队列”做得不好, 这些队列系统经过多年打磨,在高性能、海量堆积、消息可靠性等诸多方面都已经做得非常极致了,都做得非常的优秀。 但今天我觉得大家在设计方案时动不动任何一个异步、系统解耦等就来选用队列,然后线上又频繁出一些问题,这些问题的背后我们得看看到底什么场景适合、什么场景过渡使用了、有没有更好解法, 尤其今天IoT领域场景复杂,既有面向自身SAAS业务又要承担多租户PAAS平台化模式,面临更多的队列方面问题,拿来和大家讨论分享。
本文讨论的产品:阿里云物联网平台
IoT使用队列场景
队列在IoT领域是个极其重要部件,上到服务器,下到一个月发一次消息的嵌入式芯片,都需要传递事件消息;比如共享充电宝的开柜子、开灯指令从服务器发到设备、工业网关高频消息流等, 队列最大意义在于让整个消息事件在不可控的环境因素变成一个平稳运行的系统。 比如设备时不时会网络抖动,某些设备故障、某些抖动导致大量消息飙涨。
我们把队列用于了2个场景,一个是上行,一个是下行;我们在队列网关上即复用了kafka、rocketmq,又加入了自研的实时队列,尽量做到了即使在某一个队列大范围堆积时候,即时的业务也还是相对可用的。比如充电宝,哪怕用户的队列消息大量堆积,新用户过来控制,也开能打开柜子,即时在有大量堆积情况也能快速恢复业务,而不是一定要运维同学清理掉堆积才能快速让业务可用。单纯使用kafka或者rocketmq在大量堆积时很可能是个可怕故障。
设备上行消息,总量qps非常高,通常需要汇集到一组消费者,这个场景适合使用市面上队列kafka、mq等选型,但在IoT控制设备场景对实时性要求较高,直接使用kafka等 在堆积时候 那和这一个队列挂钩的入口设备都会有业务影响,并非最佳方式。
设备下行消息,总量qps不高,一般是下行控制设备开柜子等人为触发操作,这个场景要求到达率很高,而且设备级别隔离要求也高,不能因为一个设备消息拥塞干扰其它设备控制,这个情况比较适合IM 比如钉钉消息类似的情况,也是我们自研的IoT设备级队列。
我们做了哪些优化
1.上下行隔离拆分
下行尤其重要。 比如控制一个设备,比如一个开锁、支付成功要下发打开柜子等等,上行出任何问题,千万不能影响到下行业务。
2.IoT队列-设备级/海量topic
用于下行推送设备消息情况的队列。
图1和图2对比较明显, 一个队列拥塞尽量减少对其它设备影响。
3.IoT队列-服务器订阅
用于把用户设备消息发送给用户服务器的队列。相当于一个队列网关,我们即支持用户规则流转到kafka、rocketmq,也能使用IoT自研的AMQP实时队列网关。即使在消息有堆积情况,把堆积态的消息降级处理,实时生成消息依然优先发送。
大家用队列初衷是什么?
提高业务吞吐、异步化
消峰填谷
系统间解耦
高息可靠、事务、有序消息、最终一致性 .....
大家都用得非常多 都很了解了,队列基本上是设计一个分布式系统里面必备的组件,可能还有更多场景就不多举了。
线上出现堆积你的反应是什么?
假如没有堆积,队列就失去意义。
大部分使用队列的系统,可能大家都遭遇过“堆积”的案例。 如果你还没碰到堆积,那说明比较幸运,你的生产和消费都还比较稳定,也还没遭受过异常抖动,但是你依然需要为线上系统的稳定性考虑堆积了你的预案是什么,有什么样的影响。
不论是我们自己,还是用户使用,都面临过“堆积”案例。我们IoT这边光内部系统之间流转,使用了RocketMq(阿里内部是metaq)有上W级别数量topic,还有个场景是我们要把用户设备消息流转到用户自己购买的队列里面去。
短暂抖动这种堆积一般自我消化。奇怪的现象是,绝大部分出现堆积的场景,比如有个几十w条堆积的,用户第一反应是什么呢?
很多结论是: 有堆积几乎已经是线上故障了, 需要先清理堆积!
堆积能力强是不是件好事情呢?
1.为什么需要清理堆积?
a.先说IoT “实时性让队列始料不及”:
例子:一个快递柜业务的队列堆积,然后“此时此刻”在柜子旁边的用户死命的在旁边用手机点开柜子 怎么也打不开,(后端系统都恢复了)可是他还是打不开,问题就是队列里面还有几十w条的消息,新来的消息不好意思,要排队哦。 之前等着的那些消息源源不断的消费,鬼才知道到底这些消息还有没有用。 然后这个“此时此刻”的人 又气急败坏走了, 再过来下一个“此时此刻”的人。。。 。 所以运维的人就要手工清理大量堆积态消息,先让等在设备旁边的人能打卡他的快递。
b.即便不是IoT场景, 我相信肯定有同学碰到过线上清理堆积、重置消费点位案例,可能发生于:业务雪崩、快一点恢复业务
丢失一些数据和快速恢复交易 到底哪个重要点... 等了一会,不行啊 客户都投诉了,压力山大 还是清理堆积吧, 然后后面再补一个设计数据一致性方案。
前断时间活生生碰到个用户反馈情况,kafka在消费端堆积时候写入延时很高,最后发现由于堆积消息数据需要从硬盘加载冷数据导致io打高。
2.不需要清理堆积有多少情况?
a.短暂抖动,能自我恢复 (大部分能分钟内消费完可能都不是问题) -- 这种情况确实是比较多的
b.时间不重要的数据,比如日志
所以, “需要先清理线上堆积” 就是把双刃剑,没堆积不是队列,堆积多了又影响你的业务;不能过渡拥抱堆积。
有办法优化吗?
先总结下问题
1.市面上所有队列承诺的消息实时性,前提都是消费端消费能力足够
也有同学说把实时的消息topic和其它topic分开不同优先不就行了吗? 不好意思,这个跑题了,继续看上面例子把。
2.IoT是一个PAAS平台,内部我可以要求其它团队消费能力上去; 外部ISV用户,客户自己说了算,很难让每个开发者按照你希望优化的方法去做。
虽然我们也可以做很多租户隔离措施,甚至跟用户说,你自己队列问题,是你们业务处理能力不够 或者直接说 你们自己有问题。 可是,用户还是委屈呀,业务出问题就是他得背锅的。
3.队列的存储和内核机制,几乎都是FIFO (就是先进先出,后面进来的一定要等前面的出去)
3.1 这样才能有序,要么是分区有序,要么是全局有序。
3.2 不论如何 一旦发生堆积,新来的消息一定是要前面堆积的出去了才能被消费。这种情况 一定会导致后来的消息被动延时。
3.3 注意,有的队列说的无序消息只是发送方是同步还是异步无序,每个子消费队列单元底层还是这个结构运行,底层存储和分发上如果堆积状态有新进来消息是追加在堆积后面的;
4.全局保序能降级吗
在某些场景,保序真的很重要。 但我觉得肯定存在无需保序场景,或者业务上设计时间戳而不是依赖消息的序列也能解决很多场景问题,这2个加起来 至少目前我们了解到的业务80%以上都能满足。
补充下,现实系统中一个业务流程通常链路比较多,要经过ABCD 多个系统->队列0 ->E/F/G ->队列1->XYZ。。。 这条完整链路才是一笔完整的业务交易,仅仅在一个队列0这里做个有序,是不够的,要从A到Z整个都变成有序,代价是非常高的 已经不单是一个队列有序就可以解决的问题。
为了保序,队列产品花了很大开销,就像上面提到活生生例子 kafka在堆积时候突然消费端恢复导致kafka需要加载硬盘冷数据把io打崩,原本可以高效的把生产成功的数据直接发送出去,但因为堆积就得先加载硬盘数据,导致新进去数据没机会。
目前的队列产品即使有无序队列类型,每个消费队列结构上依然是FIFO的, 并不是设计上的问题,是有史以来“队列” 的基准。
一个大胆的尝试-IoT队列诞生
我们需要一个针对IoT场景,实时优先的“队列”,而且思路和rocketMq、kafka都不一样,得开发:
1.实时生成消息优先发送,堆积的消息进入降级模式
保序降级,不保障整体消息序列,仅实时消息相对有序
堆积消息和实时消息是并行任务发送,堆积速率降级
实时消息发送失败马上降级为堆积消息
2.海量topic
传统队列核心点是 不论堆积多少不影响它的性能;kafka topic一多 原本消息顺序写文件优势 就会导致一个broker要退化到随机写,失去优势,另外要zk来协调这么多topic也是有局限,所以这些队列本身有提供一个外挂代理桥接器 对外入口是多个设备topic,再桥接映射到少量的实际kafka topic,这方案有一定可行性,但做不到我们希望的隔离效果 只是治标。
我们需要的是“海量topic 尽量相互隔离 并且不影响整体性能”,尽量做到设备A的消息堆积topic,不影响设备B,上面做法基本上一个实际topic问题 这一面的设备就影响。 阿里云物联网平台今天首先面临的是多租户, 10w条消息有影响,你是影响一个厂商的10w设备,还是影响1w个厂商 每个厂商10个设备 完全是不一样的故障面。 我们要先保租户隔离,再保设备隔离,不能完全按设备Id去分散多个队列。理想情况是设备级别的topic数量,那么就是 亿级别topic 了!
面向设备端海量topic和IM聊天系统稍微类似,所以我们也参考了IM产品碰到过的问题,比如阿里内部手淘、钉钉,但有个最大区别是已有产品基于自有协议和客户端,IoT领域做硬件同学可能更体感,端不可控;需要支持开源,也需要IoT的SDK。
最后topic数量也要考虑成本,我们希望是topic数量是几乎无成本的!
我暂时也没办法定义这个产物到底还是不是队列,但我们得弥补目前队列一些问题。
总体思路
目前IoT队列还没有好名字,对外叫服务端订阅,意思就是用户用服务器订阅他们设备消息,欢迎拍砖。 为了降低接入成本,用户可以使用AMQP1.0协议接入 符合开源生态。
注意,AMQP本身是个消息协议,队列可以用AMQP做协议,但协议本身不代表队列。就像Kafka也可以用http消费,但不代表 http是个kafka队列。AMQP1.0是最新的标准,不止于队列本身,它更多是一个消息通信的统一消息标准,而内核是否是队列还是仅仅作为管道由Provider来实现,AMQP1.0标准被逐步用于消息网关,而队列模型只是网关里的一个可选实现。
兼容传统队列和新队列,交给用户按场景来推荐选择,用户即可选择使用kafka、mq(ONS、MNS) 也可以选用iot队列(物联网平台的服务端订阅),甚至组合模式,比如按消息特征规则来配置流转队列。
IoT队列的设计思路
设计目标 :实时优先、使用便捷、海量topic支持的 队列网关、Follow开源客户端
这套东西其实已经不再是一个大家理解中的“队列”,不在一个维度。 但是目前队列产品也在做设备接入,所以还是给大家一个对比把:
IoT队列-服务端订阅部分 | 流转传统队列RocketMQ、KAFKA、AMQP for MQ | |
---|---|---|
堆积能力 | 海量堆积的消息会直接降级,堆积消息不会影响实时消息,优先恢复设备即时可用性 | 海量堆积不影响性能,突出在性能点。 当同一个队列有堆积时,实时生成的消息一般会排在队末尾,直到堆积的逻辑处理完,或者进入了死信消息 |
实时处理能力 | 实时和堆积处理分离; 即使有堆积,实时消息的也会先推送,因此实时性会相对更优 | 队列在堆积时,实时消息一生成就会变成堆积排队 ,消息实时性要求“消费端必须快速恢复消费能力” |
功能对比 | 云监控、消费者状态、轨迹、多消费组、海量topic、不保序 -- (功能简单) | 消息监控、消费者状态、轨迹、多消费组、有限topic、可保序(或分区有序)、事务消息、广播消息、死信等 -- (更丰富)Q |
为什么更适合IoT实时优先场景 | 1.出现海量堆积时,设备实时控制依然能保持一定可用性,无需特别惧怕堆积;2.海量topic,部分设备消息拥塞不要影响整个队列 | 1. 出现海量堆积时,用户第一反应通常是需要人工运维去清理堆积,才能让系统恢复可用;堆积是个“可怕”的线上问题 ;2.同一个业务场景要用多个topic去拆分来模拟实现消息优先级,一个topic堆积还是会出现多个设备受到影响;3.外挂逻辑设备多个topic映射到内部有限topic,治标不治本 |
性能差异 | 依赖分布式存储OTS/HBASE,broker自己不做存储 单机性能较弱 ,需要利用集群发挥性能;堆积态消息性能有限,取决底层存储单分区扫描能力*分区数; | broker本地异步/同步刷盘,IO性能非常高, 单机即可达到非常高QPS ;单机性能非常优秀; |
成本对比 | 1.topic无开销,海量topic时成本较低、隔离性较好;2.消息QPS较高时成本比kafka高;3.不利于小型化部署 依赖多 | 1.topic因为要预分配资源,有成本开销,有数量限制;“海量topic”通过 合用一个topic实现 底层无法较好隔离;2.Kafka按带宽收费成本最低;3.小型化部署成本低,依赖少 |
消息模式
这个图只是最基本片段,从这个模式可以看出来机制差别,大家都没有错,出发点不同。
消息策略-推拉结合
这个应该是队列的核心难点之一,和传统队列区分在于,我们考虑为平台化模式,独享资源过于昂贵。 但带来问题是消费端不可控,所以使用结合模式,只有在消费者在线时会拉取堆积消息,而拉取是由AMQP队列网关来做,给到用户接口始终是推送过去的onMessage回调。
- broker不是直接让consumer来连接,而是把队列网关剥离出来, 这样会更灵活,甚至对于部分用户 我们的queue可以切换到ons、kafka等实现。 kafka、rocketmq做法是在连接时会分配给客户端一个broker接入地址。
2.消息高可用:broker实时消息优先推送给consumer,失败才会落到queue ;这是一个完整事件,如果没有完成 则不给producer commit。如果宕机,需要发送方超时来决定重试。
3.异步ACK
线性扩展-离线消息部分
实时部分消息采用推方式,基本上不会成为瓶颈,消费不过来消息进入堆积模式。
底层使用KV存储(阿里云自研表格存储产品ots、hbase)已经解决存储的扩展,剩余主要问题如何消除写入热点和消费热点,这样broker几乎可以做到无状态水平扩展。
1.为了更快速方便实现cleansession、清理堆积、消费点位等特性,消息id借鉴了snowflake 使用非自然序自增ID ,这么做好处是 如果需要清理离线消息,那么生成一个当前时间的id即可。
2.消费记录按租户的消费者ID (cid和sharding分区值决定),即同一个租户消费组,可以调整sharding数量,一个sharding分区 是一个单线程任务处理,所以离线消息总消费速率取决于sharding数量,为了谨慎防止热点,目前一个sharding跑1000QPS。 这里碰到过一些坑,比如写前读,尤其消费状态更新问题。阿里云表格存储在自增ID上做了不少优化, Hbase版本目前正在验证。
3.实时消息“看起来没落盘 会丢吗” ,不是不落,是先推后看场景落。 每个发送给消费者的消息,都需要对方ack,如果异常或者超时没有答复 ,一个是落到堆积消息; 另外一个是这个过程如果都失败,将直接告诉发送方失败。
如何解决海量topic问题
首先面对“大量”的问题 一般都是考虑分区,单元化,分组等隔离和拆分,这里海量topic我们讨论针对一个单实例模式下如何尽可能做到更多topic,完全任意数量都能100%没问题肯定是不现实的。
海量的标准
这个标准起码一个实例(4c8g ecs虚拟机)支撑10W设备规格,每个设备最多50topic,那么就是500w topic/起步。
由于broker和存储已经隔离,broker和topic已经没有什么关系,或者说任何topic数据生成,broker做的事情就是写入和分发。
1.海量topic,每个topic有限数量订阅: topic和订阅者关系使用ots存储加载redis或本地缓存,针对mqtt topic匹配有个topic tree的树算法,hivemq有实现版本, 详情参考 https://github.com/hivemq TopicTreeImpl.java
2.单个topic 海量订阅: 这个场景其实是组播和广播, 我们不会考虑在队列本身上面去做这个事情,而是在上层封装广播组件来协调任务和批量发送。 简单说就是每个消费者永远订阅自己topic,或者每个人都是自己的收件箱,你不能直接拿别人的;但是我可以一份消息发到多个人的收件箱(topic)里面去。如果是100w topic数量级别,一般使用上面的topic tree能在本地内存构造快速匹配。但平台化这个量级是不够的,所以全局的海量广播需要在上层构建任务系统来做。这个广播目前还在建设中。
回压
实时消息推送fail,将会进入堆积,如果超时或操作失败,由于producer没有收到commit可以决定重试。假如设备是Mqtt协议可以利用qos1,如果borker未推送成功或者写入堆积失败 不给producer ack,那么producer会重试。但是解决不了其它协议或者qos0消息, 这里还有个优化点是在发送方网关来做推送消息的failover ack机制。 由于producer的客户端是开源sdk,不受管控 这个地方是一个局限点。
尤其面向碎片化的设备端协议,回压不能完全依赖设备端机制,有了发送方网关后,我们可以在这里干些活。篇幅有限这里先不展开了。
监控运维
这块云上集成云监控、专有云使用云生态Prometheus等构造
未来探索点
队列网关除了支持AMQP,也支持KAFKA协议,面向开发者编程接口更贴切。 (注意只是kafka的消息protocol,我们绝对没有能力去做一个一样的kafka队列)
总结
IoT场景的队列实践,在现有mq队列、kafka队列融合互补之外,加了种自有的实时优先队列实现;同时加入了队列网关代理,即能让用户选择市面上队列,也可以选择轻便的IoT队列。可能我把这个也叫“队列”不是很贴切,目前只是实现了基本功能,还需要进一步完善和打磨。 再次强调,本文没有说目前队列做得不好,我们系统内部环节也在享受已有队列产品的服务,比如我们系统内部自己某些分支的FO就可能利用了rocketmq或者kafka,只是在某些场景化方向 我们做了些微小的探索,希望能解决更多用户问题。