1.概述
1.1.定义
开源消息引擎系统,支持两种消息传输模型:
- 点对点
- 发布/订阅模型
Kafka同时支持这两种模型,采用消费组实现(后续介绍)。
1.2.作用
削峰填谷:缓冲上下游瞬时突发的流量。
松耦合:减少发送方和接收方的耦合性
2.架构
2.1.架构图
2.2.术语
- Producer:生产者,发送消息。
- Consumer:消费者,接收消息,进行业务逻辑处理。
- Broker:服务代理节点。
- 理解为:独立的Kafka节点 or Kafka实例
- 一个或者多个Broker组成Kafka集群
- Zookeeper:分布式系统,管理Kafka集群数据。
2.3.流程
- 生产者发送消息至集群
- 消费者从集群拉取数据
2.4.细节
2.4.1.Topic(主题)
发送的每一条消息需指定一个topic,消息以topic进行归类。
2.4.2.Partition(分区)
topic可分为多个分区;同主题下不同分区的消息不同。
2.4.3.Offset(偏移量)
offset是消息在分区中的唯一标志,分区内新增消息,偏移量递增,从而保证消息在分区内的顺序。
offset不跨越分区,所以kafka只保证分区内有序而不是主题有序。
2.4.4.图示
说明:
- 如图:4个分区,消息在分区末尾写入。
- 分区可分布在不同的brker上,即主题可横跨多个broker。
- 每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。规则合理,消息可均匀分配。
- 不考虑多副本的话,一个分区对应一个日志。消息不断写入,分区变大,日志变大,为防止日志变大,kafka将日志(Log)切分为日志段(LogSegment),也就是文件夹与文件的关系。
3.多副本机制
3.1.定义
相同的数据拷贝到多台机器,形成副本,提高容灾能力。
3.2.一主多从
主(leader):处理读写请求,对外提供服务
从(follower):与leader消息同步,不对外提供服务
当leader故障,从follower选举新leader。
为什么Kafka不像MySQL和Redis那样允许follwer副本对外提供读服务呢?
1.方便实现“Read-your-writes”
顾名思义,就是当你使用生产者api向kafka成功写入消息后,就马上使用消费者api去读取刚才的消息。
举个例子,就是你刚发完一条微博,肯定是希望立马能够看到的。这就是Read-your-writes场景了。如果追随者副本对外提供服务的话,由于副本同步是异步的,因此有可能发生追随者副本还没有及时从领导者副本中拉取最新消息,从而使客户端看不到最新的消息。
2.方便实现单调读
什么是单调读。单调读就是消费者在多次读消息时候,不会看到一条消息一会儿存在一会儿不存在。
例如:如果允许追随者副本提供读服务,那么假设当前有两个追随者副本F1,F2。生产者往领导者中发送了消息后,F1,F2开始异步拉取消息。若F1拉取成功了,而F2还未拉取成功。此时消费者第一次消费F1副本获取最新消息,第二次消费的时候消费到了F2副本。就获取不到该条消息了。这就不是单调读一致性。所以都由Leader副本来处理请求的话,就能实现单调读。
3.同样可以达到负载均衡效果
主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度。
3.3.AR、ISR、OSR
3.3.1.相关概念
AR(Assigned Replicas):分区内的所有副本。
ISR(In-Sync Replicas):消息与leader副本保持一定程度同步的所有副本,是AR的子集。因为Kafka为异步拉取消息,所以会有部分副本未与leader同步,其不属于ISR集合。(ISR不只包括follower,还包括leader)
OSR(Out-of-Sync Replicas):与 leader 副本同步滞后过多的副本(不包括leader副本),AR=ISR+OSR
一定程度同步:可忍受的滞后范围,可通过参数配置(replica.lag.time.max.ms)配置。
replica.lag.time.max.ms:允许follower副本能够落后leader副本的最长时间间隔,默认值为10s。
HW(High Watermark):高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。
LEO(Log End Offset):标识当前日志文件中下一条待写入消息的offset。ISR中的每个副本都维护自身的LEO,其中LEO最小的即为分区的HW,消费者只能消费HW之前的消息。
3.3.2.规矩
- leader副本负责维护和跟踪ISR中所有副本的滞后状态
- 若滞后太多,从ISR中移入OSR;若追上leader,从OSR中移入ISR。
- leader发生故障时需重新选举,只有在ISR集合中的副本才有资格被选举为新leader(此规矩可修改配置来改变)。
- ISR与HW、LEO有紧密的联系,ISR中的每个副本都维护自身的LEO,其中LEO最小的即为分区的HW,消费者只能消费HW之前的消息。
3.4.分区机制
**定义:**副本积累太多数据,单台broker无法容纳。将数据分割成多份保存在不同的Broker上,即所谓的分区。分区以实现系统的高伸缩性。
再次回顾kafka消息架构:
- 主题(topic)层:每个主题可配置A个分区。
- 分区(partition)层:每个分区可配置B个副本,其中一个为leader。
- 消息(message)层:每个分区中包含C个消息,offset从0开始,消息+1则offset+1。
3.5.图示
从图中可以看出:
- 有3个分区,每个分区1个leader副本和2个follower副本。
- leader与生产者或消费者交互,follower只负责同步消息。
- follower的消息相对于leader会有一定的滞后。
4.生产者
4.1.分区的原因
- 实现系统的高伸缩性,提供负载均衡的能力。
- 不同的分区放置在不同节点的机器上,而消息的读写以分区为粒度,所以每个机器能够独立执行各自分区的读写请求。
- 可以通过增加新的机器来增加整个系统的吞吐量。
4.2.分区策略
分区是决定系统负载均衡与吞吐量的关键,所以生产者一端需要决策合适的分区策略,避免造成消息的“倾斜”,某些分区达到性能瓶颈,导致下游消费能力下降。
Kafka提供默认的分区策略,也支持自定义分区策略,常见的分区策略如下:
- 轮询策略:按顺序分配,能够保证消息被平均分配到所有分区。默认是最合理、最常用的分区策略。
- 随机策略:随意将消息放置到任意一个分区上。
- 按消息键保存策略:kafka允许每条消息定义一个key(可为业务ID等),相同key的消息进入同一个分区。
5.消费者
5.1.消费模型
Kafka采用拉取模型(pull),有消费者自主控制消费状态:
- 是否消费
- 消费到哪儿了(offset)
这样做的好处:
- 消费者可控制offset按任意顺序消费消息,可消费已消费数据 or 消费最新数据。
- 不同消费者可以安装自己最大的处理能力来拉取数据
5.2.消费组
多个消费者实例共同组成一个组来消费一组主题,这组主题中的每个分区都只会被组内的一个消费者实例消费,从而提高消费者端的吞吐量。
5.2.1.结构
既然为组,那么组内所有消费者实例共享一个GroupId,组内所有消费者协调在一起消费主题下的所有分区,每个消费者实例处理一个分区。
那么一个Group下该有多少个Consumer实例呢
理想情况下,Consumer实例的数量应该等Group订阅主题的分区总数。
将分区分配给消费者有以下几种场景。
- 线程数量多于分区的数量
- 线程数量少于分区的数量
- 线程数量等于分区的数量
针对消费组,kafka如何管理位移?
针对 Consumer Group,Kafka 是怎么管理位移的呢?你还记得吧,消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中,这个位置信息有个专门的术语:位移(Offset)。老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以*地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
但是,ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。于是,在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是__consumer_offsets,现在新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。
5.2.2.作用
避开传统模型(点对点模型/发布与订阅模型)的缺陷,兼顾其优点。组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制,Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。
5.2.3.重平衡
5.2.3.1.定义
重平衡(Rebalance):所有 Consumer 实例共同参与,在协调者组件(Coordinator)的帮助下,完成订阅主题分区的分配,也就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。
5.2.3.2.触发条件
-
组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,或是有 Consumer 实例崩溃被“踢出”组。
-
订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
-
订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要被“踢出”组呢?
心跳检测。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Rebalance 发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与。
5.2.3.3.缺点
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。万物静止,所有应用线程都会停止工作,著名的 stop the world,简称 STW。
- 太慢
- 目前 Rebalance 的设计是**所有 Consumer 实例共同参与,全部重新分配所有分区。**其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
So,尽量避免一些非必要的Rebalance:
- 第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。需仔细地设置 session.timeout.ms(决定了 Consumer 存活性的时间间隔)和 heartbeat.interval.ms(控制发送心跳请求频率的参数) 。
- 第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。max.poll.interval.ms 参数限定消费者消费消息的最大时间。
5.2.3.4.重平衡流程
- 触发与通知
重平衡过程通过消息者端的心跳线程(Heartbeat Thread)通知到其他消费者实例。
消费者端的参数heartbeat.interval.ms,从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。
- 消费组状态机
Kafka设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。
状态如下:
(1)Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
(2)Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。
(3)PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组
(4)CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。
(5)stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka定期自动删除过期位移的条件就是,组要处于Empty状态。如果消费者组停了很长时间(超过7天),那么Kafka很可能就把该组的位移数据删除了。
- 消费者端重平衡视角
完成重平衡需要消费者与协调者共同参与完成。在消费者端,两个步骤:
(1)加入组:JoinGroup请求
新成员加入组时,发送JoinGroup请求,新成员上报自己所订阅主题。协调者得到所有成员的订阅信息。
通常,第一个发送JoinGroup请求的成员自动成为领导者。
这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
(2)等待领导者消费者分配方案:SyncGroup请求
领导者向协调者发送SyncGroup请求,将分配方案发给协调者。目的是让协调者接收分配方案,然后统一以SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。
值得注意的是,其他成员也会向协调者发送SyncGroup请求,只是请求体中并没有实际内容。
(3)当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
- Broker端(协调者端)重平衡视角
(1)新成员加入组
新成员加入,协调者收到JoinGroup请求,其通过心跳请求响应的方式通知组内现有所有成员,强制开启新一轮重平衡。
(2)组成员主动离组
**主动离组:**消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。
协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
(3)组成员崩溃离组
**崩溃离组:**指消费者实例出现严重故障,突然宕机导致的离组。
协调者无法立马感知,通过检测消费者心跳频率来感知。通过session.timeout.ms控制。
(4) 对组内成员提交位移的处理
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后在开启正常JoinGroup/SyncGroup请求发送。
5.3.位移主题
offset topic(位移主题)指的是kafka的内部主题__consumer_offsets。
旧版本如何保存位移?
旧版本Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。但是,ZooKeeper 其实并不适用于这种高频的写操作。
新版本如何保存位移?
Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,__consumer_offsets topic 的主要作用是保存 Kafka 消费者的位移信息。
消息格式
一个kv对。
key:<Group ID,主题名,分区号>
value:有三种格式:
- 位移值
- 用于保存 Consumer Group 信息的消息
- 用于删除 Group 过期位移甚至是删除 Group 的消息。专属名字:tombstone,即墓碑消息。
位移主题是如何被创建的?
通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。
那 Consumer 是怎么提交位移的呢?
两种:自动提交位移和手动提交位移。
Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 10,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 10。由于是自动提交位移,位移主题中会不停地写入位移 =10 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。
Kafka 是怎么删除位移主题中的过期消息的呢?
Compaction。Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
5.4.Java Consumer设计原理
从 Kafka 0.10.1.0 版本开始,KafkaConsumer就变为了双线程的设计:
- 用户主线程:启动Consumer应用程序main方法的那个线程
- 心跳线程:负责定期给对应的Broker机器发送心跳请求,检验消费者的存活性。目的是将检测心跳频率与KafkaConsumer.poll 方法的频率分开,解耦解耦真实的消息处理逻辑与消费者组成员存活性管理。
KafkaConsumer类不是线程安全的
所有的网络 I/O 处理都是发生在用户主线程中,使用过程中必须保证线程安全:不能在多个线程*享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
两套多线程方案
- 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。
- 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。好处是可以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影响,伸缩性很高。
两种方案优缺点比较:
6.服务端
6.1.处理请求方式
无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。
很容易想到想到两个方案:
-
顺序处理请求
实现简单,但是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。
-
完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。这个方法的好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。但缺陷也同样明显。为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。
Kafka 使用的是 Reactor 模式
从图中可以看出:
- 多个客户端发送请求到Reactor
- Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。所以Acceptor的职责是只分发请求,不含逻辑处理。
- 线程池中的每个线程接收到Acceptor分发过来的请求。
对于Kafka而言
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nmAGSczu-1640679848558)(/Users/zhenghaoyun/Library/Application Support/typora-user-images/image-20211225213504299.png)]
- Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,用于接收多个请求。
- SocketServer 中也有Acceptor 线程和一个工作线程池,在Kafka中,此工作线程池被称作网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。
- Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。
那么当网络线程池收到请求后,它是如何处理的呢?
- 网络线程拿到请求后,将请求放入共享请求队列中。
- IO线程池从队列中取出请求,执行真正的处理。IO 线程池处中的线程才是执行请求逻辑的线程。
- 如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
注意:所有网络线程共享IO请求队列,但响应队列是每个网络线程专属的。Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以响应没必要公共共享。
6.2.组件
6.2.1.协调者
消费组如何确定为它服务的Coordinator在哪台Broker上?
- 确定由位移主题的哪个分区来保存Group数据
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
- 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator
6.2.2.控制器
控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器,行使其管理和协调的职责。
控制器的功能
-
主题的创建
-
分区重分配
Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能
-
Preferred领导者选举
Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案。
-
集群成员管理
新增Broker、Broker主动关闭、Broker宕机
-
保存集群元数据信息
控制器中保存的这些数据在Zookeeper中也保存了一份,每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。比较重要的数据包括:
-
所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
-
所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
-
所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。
-
控制器故障转移
在 Kafka 集群运行过程中,只能有一台 Broker 充当控制器的角色,那么这就存在单点失效(Single Point of Failure)的风险,Kafka 是如何应对单点失效的呢?答案就是,为控制器提供故障转移功能,也就是说所谓的 Failover。故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。
6.2.3.定时器
7.存储层
8.常见问题讨论
8.1.Kafka的无消息丢失配置
Kafka的无消息丢失配置
分布式系统处理故障容错时,需要明确地定义节点的存活状态。Kafka对节点的存活定义有两个条件:
-
节点必须和ZK保持会话;
-
如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多。
满足这两个条件,叫作“正在同步中”( in-sync)。 每个分区的主副本会跟踪正在同步中的备份副本节点( In Sync Replicas ,即ISR)。如果一个备份副本挂掉、没有响应或者落后太多,主副本就会将其从同步副本集合中移除。反之,如果备份副本重新赶上主副本,它就会加入到主副本的同步集合中。Kafka 中, 一条消息只有被ISR集合的所有副本都运用到本地的日志文件,才会认为消息被成功提交了。任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失“。只有已经提交的消息才能被消费者消费,因此消费者不用担心会看到因为主副本失败而丢失的消息,下面我们举例分析Kafka的消息提交机制如何保证消费者看到的数据是一致的。
-
生产者发布了10条消息,但都还没有提交(没有完全复制到ISR中的所有副本) 如果没有提交机制,消息写到主副本的节点就对消费者立即可见,即消费者可以立即看到这10条消息。但之后主副本挂掉了,这10条消息实际上就丢失了,而消费者之前能看到这 10 条丢失的数据,在主副本挂掉后就看不到了,导致消费者看到的数据出现了不一致。
-
如果有提交机制的保证,并且生产者发布的 10条消息还没有提交,则对消费者不可见。即使10条消息都已经写入主副本,但是它们在还没有来得及复制到其他备份副本之前,主副本就挂掉了。那么,这 10条消息就不算写入成功,生产者会重新发送这 10条消息。当这10条消息成功地复制到ISR 的所有副本后,它们才会认为是提交的,即对消费者才是可见的。在这之后,即使主副本挂掉了也没有关系,因为原先消费者能看到主副本的10条消息,在新的主副本上也能看到这10条消息,不会出现不一致的情况。
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。
第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。
常见消息丢失情况
-
生产者丢失数据
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这种情况下,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。
解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer 端而非 Broker 端。你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。
-
消费者程序丢失数据
我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。
同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。当然,这种处理方式可能带来的问题是消息的重复处理,这个问题后续继续探讨。
还有一类消息丢失场景是,Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。这里的关键在于 Consumer 自动提交位移,没有真正地确认消息是否真的被消费就盲目地更新了位移。这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。
8.2.消息堆积
消息堆积的可能原因如下:
- 生产速度大于消费速度。可以适当增加分区,增加consumer数量,提升消费TPS;
- 消费者性能低。可查询是否消费逻辑很重,是否可优化消费者TPS;
- 确保消费者端没有异常而导致hang
- 如果使用的是消费者组,确保没有频繁地发生rebalance
8.3.消息交付可靠性保障以及精确处理一次语义的实现
可靠性保障指Kafka对生产者及消费者的承诺:
- 最多一次:消息可能丢失,但绝不重复发送。
- 至少一次:消息不会丢失,但可能重复发送。
- 精确一次:消息不会丢失,也不会重复发送。
Kafka默认提供的的是“至少一次”。只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。如果消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。
幂等性
幂等指某些操作或函数执行多次,但每次结果不变。幂等性可保证安全重试任何幂等性操作。如果非幂等,需担心某些操作执行多次对状态的影响。
Kafka中,生产者默认为非幂等性,但我们可创建幂等性生产者。当生产者自动升级成幂等性后,Kafka自动会消息去重。
在底层,kafka为实现幂等性,引入了ProducerID和SequenceNumber。
- ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,用来标识本次会话。
- SequenceNumber:对于每个ProductId,Product发送数据的每个Topic和Partition都对应一个从0开始底层的SequenceNumber。
Broker维护(pid,seq)的映射关系,收到消息后检查seq。生产者在收到明确的消息丢失ack或超时后未收到ack,要进行重试,
new_seq = old_seq+1: 正常消息;
new_seq <= old_seq : 重复消息;
new_seq > old_seq+1: 消息丢失;
因为SequenceNumber对应的是partition与topic,所以Kafka的幂等性的作用范围:
- 只能保证某个主题的一个分区上不出现重复消息,无法实现多个分区的幂等性。
- 只能实现单会话的幂等性,不能实现跨会话的幂等性。可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
若需实现多分区以及会话上的消息无重复,需引入事务(transaction)或者依赖事务性Producer。
8.3.1.事务性Producer
事务性Producer能保证将消息原子性的写入到多个分区中,这批消息要么全部写入成功,要么全部写入失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
事务消息是由producer、事务协调器、broker、组协调器、consumer共同参与实现的
-
producer
为producer指定固定的TransactionalId,可以穿越producer的多次会话(producer重启/断线重连)中,持续标识producer的身份。
使用epoch标识producer的每一次"重生",防止同一producer存在多个会话。
producer遵从幂等消息的行为,并在发送的BatchRecord中增加事务id和epoch。
-
事务协调器(Transaction Coordinator)
引入事务协调器,以两阶段提交的方式,实现消息的事务提交。
事务协调器使用一个特殊的topic:transaction,来做事务提交日志。
事务控制器通过RPC调用,协调 broker 和 consumer coordinator 实现事务的两阶段提交。
每一个broker都会启动一个事务协调器,使用hash(TransactionalId)确定producer对应的事务协调器,使得整个集群的负载均衡。
-
broker
broker处理事务协调器的commit/abort控制消息,把控制消息向正常消息一样写入topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移hw。
-
组协调器
如果在事务过程中,提交了消费偏移,组协调器在offset log中写入事务消费偏移。当事务提交时,在offset log中写入事务offset确认消息。
-
consumer
consumer过滤未提交消息和事务控制消息,使这些消息对用户不可见。
有两种实现方式,
- consumer缓存方式
设置isolation.level=read_uncommitted,此时topic的所有消息对consumer都可见。consumer缓存这些消息,直到收到事务控制消息。若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息。
- broker过滤方式
设置isolation.level=read_committed,此时topic中未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。broker给consumer的BatchRecord消息中,会包含以列表,指明哪些是"abort"事务,consumer丢弃abort事务的消息即可。
8.4.高水位和Leader Epoch
在 Kafka 的世界中,水位是和位置信息绑定的,具体来说,它是用消息位移来表征的。高水位在界定 Kafka 消息对外可见性以及实现副本机制等方面起到了非常重要的作用,但其设计上的缺陷给 Kafka 留下了很多数据丢失或数据不一致的潜在风险。为此,社区引入了 Leader Epoch 机制,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation),尝试规避掉这类风险。
8.4.1.高水位的作用
高水位的作用主要有2个:
-
定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
-
帮助 Kafka 完成副本同步。
需要注意的是:位移值等于高水位的消息也属于未提交消息。
8.4.2.高水位更新机制
这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
未完待续…
8.5.Kafka控制器的选举
Kafka利用了ZK的领导选举机制,每个代理节点都会参与竞选主控制器,但只有一个代理节点可以成为主控制器,其他代理节点只有在主控制器出现故障或者会话失效时参与领导选举。Kafka实现领导选举的做法是:每个代理节点都会作为ZK的客户端,向 ZK服务端尝试创建/controller临时节点,但最终只有一个代理节点可以成功创建/controller节点。由于主控制器创建的ZK节点是临时节点,因此当主控制器出现故障,或者会话失效时,临时节点会被删除。这时候所有的代理节点都会尝试重新创建/controller节点,并选举出新的主控制器。
主节点选举,首先需要面对的就是集群节点达成某种一致(共识)的问题。对于主从复制的数据库,所有节点需求就谁来充当主节点达成一致。如果由于网络故障原因出现节点之间无法通信,就很容易出现争议。此时,共识对于避免错误的故障切换十分重要,后者会导致两个节点都自认为是主节点即脑裂。如果集群中存在两个这样的节点,每个都在接受写请求,最终会导致数据产生分歧、不一致甚至数据丢失。
Zookeeper里采用的是Zab共识算法/协议。广义上说,共识算法必须满足以下的性质:
1)协商一致性
- 所有的节点都接受相同的协议。
2)诚实性
- 所有节点不能反悔,即对一项提议不能有两种决定。
3)合法性
- 如果决定了值v,则v一定是由某个节点所提议的。
4)可终止性
- 节点如果不崩溃则最终一定可以达成决议。
协商一致性和诚实性属性定义了共识的核心思想:决定一致的结果,一旦决定,就不能改变。如果不关心容错,那么满足前三个属性很容易:可以强行指定某个节点为“*者”,由它做出所有的决定。但是,如果该节点失败,系统就无法继续做出任何决定。可终止性则引入了容错的思想,它重点强调一个共识算法不能原地空转,永远不做事情。换句话说,它必须取得实质性进展,即使某些节点出现了故障,其他节点也必须做出最终决定。可终止性属于一种活性,另外三种则属于安全性方面的属性。
当然,如果所有的节点都崩溃了,那么无论何种算法都不可能继续做出决定。算法所能容忍的失败次数和规模都有一定的限制。事实上,可以证明任何共识算法都需要至少大部分节点正确运行才能确保终止性,而这个大多数就可以安全地构成quorum。因此,可终止性的前提是,发生崩溃或者不可用的节点必须小于半数节点。这里,我们暂时假定系统不存在拜占庭式错误。
最著名的容错式共识算法有Paxos,Raft和Zab。这些算法大部分其实并不是直接使用上述的形式化模型(提议并决定某个值,同时满足上面4个属性)。相反,他们是决定了一系列值,然后采用全序关系广播算法。全序关系广播的要点是,消息按照相同的顺序发送到所有的节点,有且只有一次。这其实相当于进行了多轮的共识过程:在每一轮,节点提出他们接下来想要发送的消息,然后决定下一个消息的全局顺序。所以,全序关系广播相当于持续的多轮共识(每一轮共识的决定对应于一条消息):
- 由于协商一致性,所有节点决定以相同的顺序发送相同的消息。
- 由于诚实性,消息不能重复。
- 由于合法性,消息不回被破坏。也不是凭空捏造的。
- 由于可终止性,消息不会丢失。
Raft和Zab都直接采取了全序关系广播,这比重复性的一轮共识只解决一个提议更加高效。
ZooKeeper主要针对保存少量、完全可以放在内存中的数据(虽然最终仍然会写入磁盘以保证持久性),所以不要用它保存大量的数据。这些少量数据会通过容错的全序广播算法复制到所有节点上从而实现高可靠。ZooKeeper模仿了Google的Chubby锁服务,不仅实现了全序广播(因此也实现了共识),而且还构建了一组有趣的其他特性,这些特性在构建分布式系统时格外重要:
- 线性一致性的原子操作
使用原子CAS操作可以实现锁:如果多个节点同时尝试执行相同的操作,只有一个节点会成功。共识协议保证了操作的原子性和线性一致性,即使节点发生故障或网络在任意时刻中断。分布式锁通常以租约(lease)的形式实现,租约有一个到期时间,以便在客户端失效的情况下最终能被释放。
- 操作的全序排序
当某个资源受到锁或租约的保护时,你需要一个fencing令牌来防止客户端在进程暂停的情况下彼此冲突。fencing令牌是每次锁被获取时单调增加的数字。ZooKeeper通过全局排序操作来提供这个功能,它为每个操作提供一个单调递增的事务ID( zxid )和版本号( cversion )。
- 故障检测
客户端在ZooKeeper服务器上维护一个长期会话,客户端和服务器周期性地交换心跳包来检查节点是否存活。即使连接暂时中断,或者某个ZooKeeper节点发生失效,会话仍保持在活跃状态。但如果心跳停止的持续时间超出会话超时,ZooKeeper会声明会话失败。此时,所有该会话持有的锁资源可以配置为自动全部释放(ZooKeeper称之为ephemeral nodes即临时节点)。
- 变更通知
客户端不仅可以读取其他客户端创建的锁和键值,还可以监听它们的变更。因此,客户端可以知道其他客户端何时加入集群(基于它写入ZooKeeper的值),以及客户端是否发生了故障(会话超时导致临时节点消失)。通过订阅通知机制,客户端不用再通过频繁轮询的方式来找出变更。
8.6.Kafka 2.0版本后新特性
-
2.4版本提出follower副本读取数据(consumer fetch from closest replica)
比较常见的场景,kafka存在多个数据中心,不同数据中心存在于不同的机房,当其中一个数据中心需要向另一个数据中心同步数据的时候,由于只能从leader replica消费数据,那么它不得不进行跨机房获取数据,而这些流量带宽通常是比较昂贵的(尤其是云服务器)。即无法利用本地性来减少昂贵的跨机房流量。所以kafka推出这一个功能,就是帮助类似这种场景,节约流量资源。
未完待续…