在消息系统中,消息的生成和消费通常支持三种模式: at-most-once,at-least-once 和 exactly-once。 这几种模式的区别主要在于当系统发生错误的时候,系统表现何种行为。前两种模式,非常容易理解,出错后不处理或者不停重试直到成功为止,对于exactly-once的模式,系统在出错的时候,则需要进行特殊处理来保证一条消息只被处理一次。
发生错误的场景
上图是常见的消息系统(如Kafka/Plusar/sls loghub等)的架构,消息(message)由生产者(Producer)发送至消息系统的接收者(Broker),Broker将Message持久化到特定分区(Partition)后,供消费者(Consumer)进行消费。在这个过程中,任意阶段都可能出错。
- Broker 错误:Broker作为系统的主要组成部分,负责数据的接收和读取,当单个broker fail的时候,不同的系统表现行为也不同,可能会重新选举Leader或进行broker的failover,在这个过程中,消息的读写可能发生失败
- Producer到Broker RPC错误 : Producer将数据发送至Broker后,需要收到Broker的ack才能确定消息写入成功,在Broker出错,或Producer到Broker网络异常等情况下,Producer可能无法收到ack信息,这时,Producer无法确认消息是否已经正确持久化,如果Producer忽略错误,这表现为at-most-once模式,进行重试直到成功,则可能是at-least-once
- Producer :消息的生产者本身也可能会因为程序异常、机器宕机等行为导致异常,在Producer内存中尚未发送的消息,如果不做特殊处理,则会丢失
- Consumer :作为消息的消费者,在发生错误的时候如何恢复消费的状态以及从哪个位置再次开始消费数据,则决定消费情况系统表现那种模式
写入模式下Exactly-Once
从上面的错误场景可以看到,错误在任意阶段都可能发生,要做到任意情况下完全的exactly-once写入代价极其昂贵,在线上大规模生产系统中,很难承受:
- 每条消息有一个唯一的ID
- Broker在接收到一条消息后,进行全局校验ID是否重复
- Broker对于消息ID的校验和写入原子操作
在分布式场景下,做全局全量数据的原子排他性操作,成本无法接受的。那退而求其次,在一定限定条件下,则可以更高效达到exactly-once的效果。可以从以下两方面进行限定:
- Producer 发送的一条消息,在各种错误情况下,限定只到某个Broker下的确定Partition。这样消息无需做全局的校验,只需要在Partition级别即可;
- 单个Producer 到单个Partition的消息ID(Sequence ID)单调递增,这样虽然降低了单个Producer到单个Partition的吞吐(串行),但是每个Partition对于Producer消息的ID校验大大简化,只需要效验一个ID即可,无需维护历史所有ID
通过以上两个简化,在Partition级的exactly-once的写入操作,只需要额外一次HashMap的查询即可,而持久化的数据,也只会增加极少量的字段(ProducerID, Sequence ID)。
Broker Failover 处理
在Broker Failover的时候,必须将Broker内存中各个Producer当前写入的SequenceID完全恢复出来,才能保证数据exactly-once写入。在持久化的数据中,有每条消息的ProducerID和SequenceID信息,可以通过扫描持久化信息进行恢复,同时为了加快恢复速度,可以定期将Broker内存中的HashMap作为snapshot保存下来,在恢复的时候,首先恢复snapshot,然后只需要读取少量的信息完成ProducerID和SequenceID映射重建。
Producer Failover 处理
当Producer Failover的时候,对于部分数据源和SequenceID可映射的场景,可以根据ProducerID从Broker中获取最新的SequenceID,根据该ID对数据源进行重置(如Producer的数据源是文件,SequenceID 和文件行号能进行映射),Producer可以从上次最后写入成功的位置继续写入。
消费模式下 Exactly-Once
在完成exactly-once写入后,为了支持端到端的exactly-once处理, 同样需要消费端的配合,这里主要指在消费端failover时,如何确保每条消息只被处理一次。 虽然不少消息系统提供ack机制,消费者只需关系数据的处理即可,如Plusar提供的consumer会自动拉取消息供应用消费,应用只需要关心消息的处理,在处理完毕后,对消息进行ack即可。
Consumer consumer = client.subscribe(...);
while (true) {
Message msg = consumer.receive();
// Process the message...
consumer.acknowledge(msg);
}
但是,这种简单的消费模式,无法支持exactly-once,核心在于消息的处理和ack是非原子操作。当消息处理完毕尚未进行ack时,consumer可能crash,重启后,这条消息将被重复消费(broker尚未收到这条消息的ack信息)。
因此,为了支持exactly-once消费,需要将消息处理的结果(通常使用状态表示)和消息的ID持久化到其他外部系统中,以确保failover能恢复到某个完全精确的状态继续消费,如flink使用的checkpoint机制,将flink内部某时刻状态以及消费的位置信息进行持久化。
即使如此,如果在消费过程中,会额外产生结果并写入其他下游系统时,如果这些系统不支持幂等操作,那么在failover时,consumer重复消费数据时,下游系统还可能看到at-least-once的结果(如消费信息进行短信报警的场景)。
以上是对于消息系统的端对端支持exactly-once的简单探讨,通过一定的条件限定,写入端支持相对容易,而消费端除了较复杂的checkpoint机制外,还依赖消费产出下游系统的支持。