二, Kafka架构深入

二, Kafka架构深入

2.1 Kafka 工作流程(待完善)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bAk1dhqF-1635778465190)(2021-10-25-14-00-42.png)]

  • Kafka 中消息是以topic进行分类的, 生产者生产消息, 消费者消费消息, 都是面向topic的.
  • topic是逻辑上的概念, 而partition是物理上的概念, 每个partition对应于一个log文件目录(目录里面是分片了的.log文件和), 该log文件中存储的就是producer生产的数据.
  • producer生产的数据会被不断追加到该log文件的末端, 且每条数据都有自己的offset, 消费者组中的每个消费者, 都会实时记录自己消费到了哪个offset, 以便出错恢复时, 从上次的位置继续消费.

2.2 Kafka 文件存储机制

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tUW2RhHZ-1635778465194)(2021-10-25-14-07-21.png)]

  • 由于生产者生产的消息会不断追加到log文件的末尾, 为了防止log文件过大而导致数据定位效率低下, Kafka采取了分片和索引机制, 将每个partition对应的log数据文件分为多个segment,每个segment对应着两个文件–.index文件 + .log文件 , 另外每个.log 的文件是在config/server.properties中log.segment.bytes规定了的, 通常是1GB.
  • 这些文件位于一个topic分区文件夹下, 该文件夹的命名规则为: topic名称+分区序号. 比如: 有个topic叫first, 他有三个partition(分区), 则其对应的文件夹为first-0, first-1, first-2, 每个文件夹中都有着一定数量的.index和.log文件.

看下图, 这是一个topic分区文件夹中的几个分片, 可以看到每个分片的.log和.index的命名相同, 都是以当前分片的最小偏移量命名的.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g5td1Sny-1635778465977)(2021-10-25-14-13-52.png)]


index文件和log文件内的数据详解
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jeEToM9D-1635778465979)(2021-10-25-16-01-17.png)]
.index文件存储大量的索引信息, .log文件存储大量的数据, 索引文件中的元数据指向数据文件中消息的物理存储偏移地址.

kafka定位消息数据位置的步骤:

  1. 首先需要知道offset, 通过二分查找法, 找到存有这个offset信息的.index文件,
  2. 然后读取.log文件, 找到目标offset对应的消息数据在.log物理存储的偏移量(因为每条消息的存储大小都是固定的), 根据偏移量我们可以迅速定位到目标数据.

2.3 Kafka 生产者

2.3.1 生产者对topic分区的原因

  1. 方便在集群中扩展,每个partition可以通过调整以适应它所在的机器, 而一个topic又可以由多个partition组成, 因此整个集群就可以适应任意大小的数据了.
  2. 提高读写并行度,在分布式系统中, 同一个topic 的不同partition可以分布在多台主机中, 这就打破了单主机的IO和处理能力限制, 提高了读写的并行度.
  3. 便于数据复制备份, 提高数据冗余度和容灾能力

2.3.2 生产者消息分区的原则和策略

我们需要将producer发送的数据封装成一个ProducerRecord对象.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZXB5vtSh-1635778465197)(2021-10-26-09-04-45.png)]

  1. 前五种方法, 即指明partition的情况下, 直接将指明的值直接作为partition值;
  2. 没有指明partition但有key的情况下, 将key的hash值%topic的partition值, 取余结果即为partition值;
  3. 既没有指明partition值又没有key值的情况下, 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增), 将这个随机整数值%topic可用的partition总数, 取余结果就是partition值, 这种是常说的round-robin算法.

Q:待解决问题: key是什么玩意?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a6WQ0eoM-1635778465200)(2021-11-01-14-57-02.png)]

2.3.3 数据可靠性保证

  • 为了保证Producer发送的数据, 能够可靠的发送到指定的topic, topic的每个partition收到producer发送的数据后, 都需要向producer发送ack, 如果producer收到ack, 就会进行下一轮的发送, 否则重新发送数据.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sDrGlrVd-1635778465203)(2021-10-26-09-17-27.png)]

follower与leader数据同步的策略(副本同步策略)

方案 优点 缺点
1.半数以上完成同步, 就发送ack 延迟低 选举新的leader时, 容忍n台结点的故障, 需要设置2n+1个副本
2. 全部完成同步,才发送ack 选举新的leader时, 容忍n台节点的故障, 只需要设置n+1个副本 延迟高

kafka选择了第2种策略, 原因是:

1, 同样是为了容忍n台节点的故障, 半数同步的方案需要2n+1个副本, 而Kafka的每个分区都有大量的数据, 毫无疑问这种方案会造成大量数据的冗余.
2, 虽然全部同步完成的方案延迟较高, 但是网络延迟对Kafka 的影响较小.


Q: 没看懂半数跟全同步需要副本数量的问题?
A:

  1. 首先啊, 我们要知道kafka是利用zookeeper的选举机制来确定集群中哪一台主机是leader的, 而zookeeper初次选举时采用的是半数以上选举机制(即只要集群中有半数以上的主机确定出了一个leader, 就算是选举完成啦); 然后呢为了能够确保选举出集群中主机们的leader, kafka集群中必须总是保证集群中正常工作的主机为总主机数量的半数以上, 这也跟zookeeper 的半数存活机制是一致的(仔细品品这句话).
  2. 这个时候, 我们就出现了同步副本和半数存活结合一起考虑的问题, 即当对集群进行同步时, 一定要确保半数以上的副本是正常的, 这样我们才能通过副本去恢复半数以上的主机, 使之能够能够正常的选举出集群的leader.
  3. 那么当采用半数同步策略时, 当n台节点发生故障时, 我们最低限度的需要设置2n+1个副本数(注意是设置, 不是要求一定要有这些副本)才能确保集群正常选举, 为啥呢? 可别忘了这2n+1采用这种了半数以上的同步机制, 这就使得在这些副本中肯定会有至少n+1个副本是正常的(这一点是必须能保证的), 这样才能可以正常恢复n+1个节点,正常选举出leader.
  4. 当采用全同步策略时, 当n台节点发生故障时, 我们最低限度的需要设置n+1个副本数才能确保集群能够正常选举, 这n+1个节点因为采用了全同步策略(这一点也是保证了最低n+1个节点正常同步出副本), 所以就能够确保集群中还有n+1个主机能够正常的选举出leader.

补充:

  1. 半数存活机制;
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Upf2w643-1635778465981)(2021-10-26-09-36-14.png)]
  1. 两种策略需要设置的副本数为什么会有不同呢?
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4GLiVOnM-1635778465983)(2021-10-26-09-47-05.png)]

2.3.3.1 ISR(in-sync replica set)–>与leader保持同步的follower集合

  • 当我们采用第二种方案(全同步才发送ack)后, leader收到数据, 所有的follower都开始同步数据直到全部同步完成,然而若是有个follower因为某种故障, 迟迟不能与leader同步, 该怎么解决呢?

Leader维护了一个动态的ISR(in-sync replica set), 意为和leader保持同步的follower集合.

  • 当ISR中的follower完成数据的同步之后, leader就会给follower发送ack.
  • 如果follower长时间未向leader同步数据, 则该follower就会被踢出ISR, 该事件阈值由replica.lag.time.max.ms参数设定
  • Leader发生故障之后, 就会从ISR中选举新的leader.

2.3.4 ack应答机制(解决数据丢失问题)

  • 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
  • 所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

acks 参数配置:

[acks = 0]:

  • ack=0时, producer不等待broker 的ack, 这一操作提供了最低的延迟, producer在broker数据一接收到还没写入磁盘就已经返回, 所以当broker发生故障时,有可能会丢失数据.

[acks =1 ]

  • ack=1时, producer等待broker的ack, 等待在partition的leader落盘成功后返回的ack, 但是如果在follower同步成功之前leader故障, 那么将会丢失数据.

[acks =-1 ]

  • 当 ack=-1时, producer等待broker 的ack, 而且是等待在partition的leader和follower全部落盘成功后返回的ack, 但是如果在follower同步完成后, broker发送ack之前,leader发生故障, 将会从剩下的follower中重新选举新的leader, 然后producer会重新向新的leader发送数据, 由于此前数据已经同步过了, 此时就会造成数据的重复.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GfxFS1tP-1635778465204)(2021-10-26-11-04-43.png)]

2.3.5 故障处理机制(数据存储一致性的保证)

LEO(log end offset):每个副本最大的Offset偏移量, 标识当前.log文件(数据文件, 别自己混乱了)下一条待写入消息的offset偏移量.
HW(high watermark):消费者能够见到的最大的Offset(即消费者只能拉取到这个偏移量前面的消息), 它是IRO队列中最小的LEO.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZsuDvx7K-1635778465206)(2021-10-26-11-26-12.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3asGfRBD-1635778465207)(2021-10-26-11-52-54.png)]

参考资料: 1. LEO和HW, 及其更新流程

  1. Follower故障

    • follower 发生故障后会被临时踢出ISR, 待该follower恢复后, follower会读取本地磁盘记录的上次的HW(高水位), 并将log文件高于HW的部分截取掉, 从HW开始向leader进行同步, 等该follower的LEO大于等于该partition的HW, 即follower追上leader之后, 就可以重新加入ISR了.
  2. Leader故障

    • leader发生故障之后, 会从ISR中选出一个新的leader, 之后, 为保证多个副本之间的数据一致性, 其余的follower会先将自己的各自的log文件高于HW的部分截掉, 然后重新的leader中同步数据.

2.3.6 Exactly Once(精准一次性) 语义-- Kafka的幂等性

  • 将服务器的ack级别设置为-1(leader,follower均落盘, 不丢数据,但是会造成leader数据重复), 可以保证producer到server 之间不会丢失数据, 即At least once(至少一次)语义,
  • 相对的, 将服务器ack设置为0(leader,follower均未落盘, 数据容易丢失), 可以保证生产者每条消息只会被发送一次, 即At Most Once(最多一次)语义.

At least once, 可以保证数据不丢失, 但是不能保证数据不重复, 相对的, 对于一些比较重要信息, 比如说交易数据, 下游数据消费者要求数据既不重复也不丢失, 即 Exactly Once语义. 在0.11之前的kafka只能保证数据不丢失, 然后在下游消费者处对数据做全局去重. 对于多个下游应用的情况, 每个都需要单独做全局去重, 无疑对性能有巨大的影响.

  • 0.11版本的Kafka引入了一项重大特性: 幂等性, 所谓的幂等性就是指Producer无论向Server发送多少次重复数据, Server端都只会持久化一条, 幂等性结合At Least Once 语义, 就构成了Kafka的Exactly Once 语义, 即
    • At Least Once(ack = -1) + 幂等性 = Exactly Once

如何启用Kafka 的幂等性呢?

要启用幂等性, 只需要将Producer的参数中enable.idompotence 设置为true即可.

幂等性的实现原理和特点?

  • Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游,
  • 开启幂等性的Producer在初始化的时候会被分配一个PID(不是进程号噢, 而是Producer ID), 发往同一Partition 的消息会附带Sequence Number, 而Broker端会对<PID, Partition, SeqNumber>做缓存, 当具有相同主键的消息提交时, Broker只会持久化一条.
  • 但是PID重启就会变化, 同时不同的Partition也具有不同的主键, 所以幂等性无法保证跨分区跨会话的Exactly Once.

2.4 Kafka消费者

2.4.1 消费方式

kafka 的consumer 采用pull(拉)模式从broker中拉取数据.

  • push(推) 很难适应消费速率不同的消费者, 因为在这种方式中, 消费发送速率是由broker决定的, 他的目标是尽可能以最快速度传递消息, 但是这样很容造成consumer来不及处理消息, 典型的表现是拒绝服务以及网络阻塞, 而pull模式则可以根据consumer 的消费能力以适当的速率消费信息.
  • pull(拉) 不足之处是, 如果kafka没有数据, 消费者可能会陷入循环中, 一直返回空数据. 针对这一点, Kafka 的消费者在消费数据时会传入一个市场参数timeout, 如果当前没有数据可供消费, consumer 会等待一段时间之后再返回, 这段时间即为timeout.

2.4.2 消费者分区分配策略(重要)

一个consumer group中有多个consumer, 一个topic有多个partition, 所以必然会涉及到partition的分配问题, 及确定哪个partition由哪个consumer来消费.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nE5vVOuV-1635778465209)(2021-10-28-22-18-37.png)]


  • 对于给consumer分配分区, kafka有三种策略, RoundRobinAssignor(轮询), RangeAssignorStickyAssignor

Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费与订阅主题之间的分区分配策略.
默认情况下, 此参数的值为org.apache.kafka.clients.consumer.RangeAssignor, 即采用RangeAssignor分配策略.
除此之外, Kafka还提供了另外两种分配策略, RoundRobinAssignor 和 StickyAssignor.
消费者客户端参数partition.assignment.strategy可以配置多个分配策略, 彼此之间以逗号分隔.

1. RangeAssignor 配置策略====>(将主题分区按照跨度平均分配给订阅了这些主题的消费者, 跨度=消费者总数/分区总数)

  • RangeAssignor分配策略的原理是按照消费者总数/分区总数来获得一个跨度, 然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀的分配给所有的消费者.

[具体的分配策略]

  1. 对于每一个主题, RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称字典顺序排列, 然后为每个消费者划分固定的分区范围, 如果不够平均分配, 那么字典顺序靠前的消费者会被多分配一个分区.
    • 假如, n=分区数/消费者数量, m=分区数%消费者, 那么前m个消费者每个分配n+1个分区, 后面的(消费者数量-m)个消费者每个分配还是n个分区.
    • (m是均分分区后剩下的余数, 即 分区数==消费者*n + m 个, 前m个消费者分配了n+1个, 剩下的(消费者-m)个只能分配n个)

[举个栗子]

  • eg1. 假设消费者组有2个消费者c0, c1, 它俩订阅了2个主题t0,t1, 这俩topic每个主题都有四个分区, 分别是t0p1, t0p2, t0p3, t0p4 和 t1p1, t1p2, t1p3, t1p4.

    • 我们按照上面的说法进行分配, 一番计算, 2消费者按字典排序(t0, t1), 4个分区, 每个消费者倆分区, 我们按照字典顺序循环, 一次给一个topic分配2个分区.
      • 分配结果为:
      • 消费者c0: t0p1, top2, t1p1, t1p2
      • 消费者c1: t0p3, top4, t1p3, t1p4
    • 对于上面这个分配结果, 我们先上面分配俩分区, 再下面分配俩, 然后上面, 然后下面, 正好平均分配.
  • eg2. 要是不均匀分配呢? 假设2个消费者c0, c1, 订阅了2个主题t0, t1, 每个主题都只有3个分区, 分别是为: t0p0, t0p1, t0p2, 和 t1p0, t1p1, t1p2.

    • 3/2=1余1, 所以n=1, m=1. 两个消费者中有一个会比另外一个多分配
      • 分配结果为
      • 消费者c0: t0p0,t0p1, t1p1, t1p1
      • 消费者c1: t0p2,t1p2

上面例子没看懂, 看看下面这张图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nDLxuMhd-1635778465984)(2021-10-28-20-44-46.png)]

可以看到, 使用RangeAssignor分配策略的话, 会出分配不均匀的现象, 如果将类似的情形扩大, 则有可能出现部分消费者过载的情况.

2. RoundRobinAssignor 配置策略====>(所有主题的所有分区,所有消费者分别字典排序, 轮询方式逐个分配)

  • Kafka默认策略.
  • RoundRobin分配策略的原理是将消费者组内所有消费者和消费者订阅的所有主题的所有分区分别按照字典顺序排序, 然后通过轮询方式逐个将分区一次分配给每个消费者.
  • 轮询策略对应的partition.assignment.strategy参数值为org.apache.kafka.clients.consumer.RoundRobinAssignor

[举个栗子]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N6X0u2MX-1635778465210)(2021-10-28-20-54-49.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SRcSz4jc-1635778465213)(2021-10-28-20-56-42.png)]

3. StickeyAssignor 配置策略====>(初次分配时, 按照订阅关系轮询分配, 订阅关系发生变化时, 只把发生变化的部分按照现有的订阅关系继续轮询分配)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hcrzhmcU-1635778465216)(2021-10-28-21-25-02.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ImdrlVUq-1635778465218)(2021-10-28-21-26-34.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YqzAhwvy-1635778465219)(2021-10-28-21-33-18.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hro0SJdO-1635778465221)(2021-10-28-21-33-52.png)]

4. 自定义配置策略====>()

2.4.3 Offset的维护

  • 由于consumer在消费过程中可能会出现断电宕机等故障, cosumer恢复后, 需要从故障前的位置继续消费, 所以consumer需要实时记录消费到了那个offset, 以便故障恢复后继续消费.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1J0QuzmI-1635778465223)(2021-11-01-17-40-55.png)]

0.9版本kafka之前 Offset的存储(位于zk中), 即上面灰色框图的内容, 在0.9版本之后, 由于offset存储在kafka集群的topic中, 所以灰色框图也就不复存在啦.


2.4.3.1 消费者组消费过程中Offset的记录

  • 在zk中. 消费者组节点的记录:
    • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-psgkDwJH-1635778465224)(2021-11-01-18-28-36.png)]

消费者组消费一个topic 的过程(拿bigdata02消费bigdata01往topic offsetTest生产的消息为例)

  • offsetTest主题有两个分区, 三个副本
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EHYJJCp1-1635778465226)(2021-11-01-18-32-21.png)]
  1. 准备: 创建新的topic, 并打开生产者示例, 准备生产消息, 并打开消费者消费相应的主题,

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bv4hdiVn-1635778465228)(2021-11-01-21-09-22.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e6VJXvYp-1635778465230)(2021-11-01-21-10-28.png)]

  1. 打开zookeeper客户端. 查看'/consumers/topic/newBD/console-consumer-对应消费者组id/newBD/offsets', 可以看到, newBD有两个分区0号,1号, 因为我们还没消费, 所以查看0号分区的值为0.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lS0GMHpn-1635778465232)(2021-11-01-21-16-01.png)]

  1. 第一次生产和消费:
  • 可以看到, 消费者组消费的newBD的0号分区变为了1
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HWbIQHGA-1635778465233)(2021-11-01-21-20-25.png)]
  1. 第二次生产和消费
  • 可以看到. 消费者组消费的newBD的1号分区变为了1
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CmmCheBd-1635778465235)(2021-11-01-21-21-24.png)]
  1. 第三次生产和消费
  • 可以看到. 消费者组消费的newBD的0号分区变为了2
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FDdrwvqo-1635778465236)(2021-11-01-21-24-13.png)]

…如此循环往复. offset不断递增.

why so?
现在只有一个主题bigdata, 两个分区, 0和1号,

  1. 生产的时候没有指定分区号和key,就是轮询着往里生产数据(这里用到生产者的消息分配策略)
  2. 然后消费的时候, 因为是1个主题2个分区, 1个消费者 ,所以消费数据也是轮询的(其实是RangeAssignor退化为了RoundRobinAssignor), 先去0号分区消费一条数据, 此时zk的消费者组–bigdata主题–0号分区的 offset为1, 然后又消费了一条数据, 注意此时消费的应该是这一主题下1号分区下的一条数据(轮询嘛, 雨露均沾), 所以zk的 消费者组–bigdata主题–1号分区-- offset为1, 依次往复下去. (这里用到消费者的消息分配策略)

2.4.4 消费者组案例(待补充)

2.4 为什么Kafka读写数据如此高效?

  1. Kafka集群采用了分布式存储和处理

  2. 顺序读写

  3. 零拷贝技术

2.5 Zookeeper 在kafka集群中的作用?

费一条数据, 此时zk的消费者组–bigdata主题–0号分区的 offset为1, 然后又消费了一条数据, 注意此时消费的应该是这一主题下1号分区下的一条数据(轮询嘛, 雨露均沾), 所以zk的 消费者组–bigdata主题–1号分区-- offset为1, 依次往复下去. (这里用到消费者的消息分配策略)

2.4.4 消费者组案例(待补充)

2.4 为什么Kafka读写数据如此高效?

  1. Kafka集群采用了分布式存储和处理

  2. 顺序读写

  3. 零拷贝技术

2.5 Zookeeper 在kafka集群中的作用?

2.6 Kafka 事务

上一篇:兜底机制——leader到底做了什么?


下一篇:Java并发53:并发集合系列-基于独占锁+PriorityBlockingQueue实现的单向阻塞*延时队列DelayQueue