复习一下Kafka

一、前言

前几天看了一篇《打造全球最大规模Kafka集群,UBER的多区域灾备实践》的一篇博客,Uber用kafka实践,每天处理数亿级消息和几个PB的数据,kafka现在成了Uber的技术栈的基石,并基于构建了一复杂的生态系统,它实现了kafka的多区部署、多区消费消息和容灾,今天我不重点说kafka在UBer的具体实践,而是借用这个主题全面梳理一下kafka,重新复习一下。说到消息队列,耳熟能详有rabbitMQ、activeMq、rocketMq、redis的MQ等等。在工程化越来越复杂的今天,消息队列已越来越重要也越来越常用,先来全局了解一下kafka:
复习一下Kafka

二、kafka简介

2.1 什么是kafka

Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。最早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控。所以kafka一开始设计的目标就是作为一个分布式、高吞吐量的消息系统,所以适合运用在大数据传输场景。

2.2 Kafka的应用场景

由于kafka具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka每秒可以处理几十万消息),让kafka成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用长,主要会应用于如下几个方面:

  • 行为跟踪:kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的topic中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控;
  • 日志收集:日志收集方面,有很多比较优秀的产品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的log都会输出到本地的磁盘上,排查问题的话通过linux命令来搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的事情了。所以一般都会做一个日志统一收集平台管理log日志用来快速查询重要应用的问题。所以很多公司的套路都是把应用日志集中到kafka上,然后分别导入到es和hdfs上,用来做实时检索分析和离线统计数据备份等。而另一方面,kafka本身又提供了很好的api来集成日志并且做日志收集。

2.3 架构

一个典型的kafka集群包含若干Producer(可以是应用节点产生的消息,也可以是通过Flume收集日志产生的事件),若干个Broker(kafka支持水平扩展)、若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同。Producer使用push模式将消息发布到broker,consumer通过监听使用pull模式从broker订阅并消费消息。多个broker协同工作,producer和consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。
复习一下Kafka

三、基础知识

3.1 名词解释

  • 1)Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
  • 2)Producer
    负责发布消息到Kafka broker
  • 3)Consumer
    消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。
  • 4)Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • 5)Partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.
  • 6)Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
  • 7)Topic & Partition
    Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

3.2 基础参数说明

  • batch.size
    生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb,意味着当一批消息大小达到指定的batch.size的时候会统一发送
  • linger.ms
    Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle算法,也就是基于小包的等-停协议。
    batch.size和linger.ms这两个参数是kafka性能优化的关键参数,batch.size和linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到broker上
  • group.id
    consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费.如下图所示,分别有三个消费者,属于两个不同的group,那么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此时的架构来说,这个firstTopic就类似于ActiveMQ中的topic概念。如右图所示,如果3个消费者都属于同一个group,那么此时firstTopic就是一个Queue的概念
    复习一下Kafka
    复习一下Kafka
  • enable.auto.commit
    消费者消费位移的提交方式,true为自动提交,即consumer poll消息后自动提交上次之前poll的所有消息位移,若为false则需要手动提交,即consumer poll出的消息需要手动提交消息位移,提交消息位移的方式有同步提交和异步提交。
  • auto.commit.interval.ms
    在enable.auto.commit 为true的情况下, 自动提交消费位移的间隔,默认值5000ms。那么消费者会在poll方法调用后每隔5000ms(由auto.commit.interval.ms指定)提交一次位移。和很多其 他操作一样,自动提交消费位移也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间(auto.commit.interval.ms指定的值),如果是则提交上一次poll返回的最大位移。
  • auto.offset.reset
    这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义。
    auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息。
    auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费。
    auto.offset.reset=none情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
  • max.poll.records
    consumer是通过轮训的方式使用poll()方法不断获取消息的,max.poll.records参数可以限制每次调用poll返回的消息数,默认是500条。
  • max.poll.interval.ms
    默认值5分钟,表示若5分钟之内consumer没有消费完上一次poll的消息,也就是在5分钟之内没有调用下次的poll()函数,那么kafka会认为consumer已经宕机,所以会将该consumer踢出consumer group,紧接着就会发生rebalance,发生rebalance可能会发生重复消费的情况。

四、关于Topic和Partition

4.1 Topic

在kafka中,topic是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到kafka集群的消息都有一个类别。物理上来说,不同的topic的消息是分开存储的,每个topic可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
复习一下Kafka

4.2 Partition(分区)

  每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的,那么为什么要设置多partition呢?第一分区存储可以存储更多的消息,其次是为了提高吞吐量,如果只有一个partition,则所有消息只能存储在该partition内,消费时不管有多少个消费者也只能顺序读取该partition内的消息,如果是多个partition,那么消费者就可以同时从多个partition内并发读取消息,正是这个原因才提高了吞吐量。
  每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的。
  在多partition和多consumer的情况下,生产的消息是具有顺序性的,且根据partition的分发策略依次插入到相应的partition中,但是由于kafak只保证同一个partition内的消息输出有序性,所以多partition依次输出的消息顺序并不能保证和生产消息写入的顺序是一样的。
  下图中,对于名字为test的topic,做了3个分区,分别是p0、p1、p2,每一条消息发送到broker时,会根据partition的规则选择存储到哪一个partition。如果partition规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分片处理。
复习一下Kafka

4.3 消息分发消费&分区策略

  • 4.3.1 生产者消息分发策略

   消息是kafka中最基本的数据单元,在kafka中一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到哪个partition中,我们可以根据需要进行扩展producer的partition机制。
   默认情况下,kafka采用的是hash取模的分区算法,如果Key为null,则会随机分配一个分区,这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个,对于这个时间段内,如果key为null,则只会发送到唯一的分区。当然也可以自己定义分区略。

  • 4.3.2 消费者消费原理

在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,一方面能够对broker上的数据进行分片有效减少了消息的容量从而提升io性能。另外一方面,提高了消费端的消费能力,如果只有一个partitions,那么多consumer也只能顺序读取该partitions内的消息,如果是多个partitions的话,那么多consumer就可以从多partitions并发生读取topic消息,这样就提高了消息断的消费能力,所以一般会设置多个consumer去消费同一个topic的多个partitions, 也就是消费端的负载均衡机制。
  在多个partition以及多个consumer的情况下,消费者是如何消费消息的。kafka存在consumer group的概念,也就是group.id一样的consumer,这些consumer属于一个consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的consumer来消费,那么同一个consumergroup里面的consumer是怎么去分配该消费哪个分区里的数据的呢?如下图所示,3个分区,3个消费者,那么哪个消费者消分哪个分区?
复习一下Kafka
对于上面这个图来说,这3个消费者会分别消费test这个topic 的3个分区,也就是每个consumer消费一个partition。

  • 演示1(3个partiton对应3个consumer)
    Ø 创建一个带3个分区的topic
    Ø 启动3个消费者消费同一个topic,并且这3个consumer属于同一个组
    Ø 启动发送者进行消息发送
    演示结果:consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费partition2分区,如果是2个consumer消费3个partition呢?会是怎么样的结果?
  • 演示2(3个partiton对应2个consumer)
    Ø 基于上面演示的案例的topic不变
    Ø 启动2个消费这消费该topic
    Ø 启动发送者进行消息发送
    演示结果:consumer1会消费partition0/partition1分区、consumer2会消费partition2分区
  • 演示3(3个partition对应4个或以上consumer)
    演示结果:仍然只有3个consumer对应3个partition,其他的consumer无法消费消息,通过这个演示的过程,引出接下来需要了解的kafka的分区分配策略(Partition Assignment Strategy)
  • 4.3.3 consumer和partition的数量建议

如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数,如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀,最好partiton数目是consumer数目的整数倍,所以partition数目很重要,如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

  • 4.3.4 分区分配策略

同一个group中的消费者对于一个topic中的多个partition,存在一定的分区分配策略,每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,在kafka中,存在三种分区分配策略,一种是Range(默认)、 另一种是RoundRobin(轮询)、StickyAssignor(粘性)。

  • 1)、RangeAssignor(范围分区)
    Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
    假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区.
    结果看起来是这样的:
    C1-0 将消费 0, 1, 2, 3 分区
    C2-0 将消费 4, 5, 6 分区
    C3-0 将消费 7, 8, 9 分区
    假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
    C1-0 将消费 0, 1, 2, 3 分区
    C2-0 将消费 4, 5, 6, 7 分区
    C3-0 将消费 8, 9, 10 分区
    假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
    C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
    C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
    C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
    可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,所以最好的情况就是partiton数目是consumer数目的整数倍,可以有效避免这个弊端。
  • 2)、RoundRobin(轮询)
    轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序,注意上一种range分区是针对每一个topic而言的,而轮训分区是相对于所有的partition和consumer而言的,最后通过轮询算法分配partition给消费线程。如果消费组内,所有消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
    在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1(c1和c2 consumer group都订阅了t1),最后分区分配的结果为:
    C1-0 将消费 T1-5, T1-2, T1-6 分区;
    C1-1 将消费 T1-3, T1-1, T1-9 分区;
    C2-0 将消费 T1-0, T1-4 分区;
    C2-1 将消费 T1-8, T1-7 分区;
    相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
    对于订阅组内消费者订阅Topic不一致的情况:假设有三个消费者分别为C1-0、C2-0、C3-0,有3个Topic T1、T2、T3,分别拥有1、2、3个分区,并且C1-0订阅T1,C2-0订阅T1和T2,C3-0订阅T1、T2、T3,那么RoundRobinAssignor的分配结果如下:
    复习一下Kafka
    看上去分配已经尽量的保证均衡了,不过可以发现C3-0承担了4个分区的消费而C2-0和C1-0都是承担一个分区,如果T2-1分配给c2-0,均衡性是不是更好呢?带个这个问题,继续下面的这次策略。
  • 3) StickyAssignor(粘性)
    尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。更核心的问题是无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。它主要有两个目的:
  • 1、分区的分配尽可能的均匀
  • 2、分区的分配尽可能和上次分配保持相同,也就是rebalance之后分区的分配尽量和之前的分区分配相同。
    当两者发生冲突时, 第 一 个目标优先于第二个目标。 第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。

五、分区副本

5.1.1 副本机制

  我们已经知道Kafka的每个topic都可以分为多个Partition,并且同一topic的多个partition会均匀分布在集群的各个节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。
   每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志,和redis cluster中的节点概念相同,leader副本为redis cluster中的主节点,follower副本为redis cluster中的备节点。
  一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高kafka集群的可用性。
复习一下Kafka
通常follower副本和leader副本不会在同一个broker上,这种是为了保证当leader副本所在broker宕机后,follower副本可继续提供服务。

5.1.2 副本选举机制

Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,这个时候怎么处理呢?kafka必须要保证从follower副本中选择一个新的leader副本,那么kafka是如何实现选举的呢?要了解leader选举,我们需要了解几个概念,Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:

  • leader副本:响应clients端读写请求的副本
  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
  • ISR副本:Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能,ISR副本包含了leader副本和所有与leader副本保持同步的follower副本,注意是和保持同步,不包含和leader副本没保持同步的follower副本。
5.1.3 副本协同机制

刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从ISR副本中的follower副本中选取新的leader。
写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,leader就会把它踢出去,kafka通过ISR集合来维护一个分区副本信息
复习一下Kafka
一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

5.1.4 ISR

ISR表示目前可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集。怎么去理解可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足两个条件:

  • 1、副本所在节点必须维持着与zookeeper的连接
  • 2、副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值。(replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除isr列表
  • 3、ISR数据保存在Zookeeper的 /brokers/topics//partitions//state 节点中
    follower副本把leader副本前的日志全部同步完成时,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合
    复习一下Kafka

5.1.5 所有副本不工作如何处理?

在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
1、等待ISR中的任一个Replica“活”过来,并且选它作为Leader
2、选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader,默认配置。
如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。
选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源。默认情况下Kafka采用第二种策略,即unclean.leader.election.enable=true,也可以将此参数设置为false来启用第一种策略。

5.1.6 副本数据同步原理

了解了副本的协同过程以后,还有一个最重要的机制,就是数据的同步过程。下图中,深红色部分表示test_replica分区的leader副本,另外两个节点上浅色部分表示follower副本
复习一下Kafka
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader get /brokers/topics//partitions/2/state ,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据,这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK,一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(HighWatermark)并且向Producer发送ACK。

  • LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的,可以看出leader副本和follower副本都有LEO。
  • HW:即所有follower副本中相对于leader副本最小的LEO值。HW是相对leader副本而言的,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的
    通过下面这幅图来表达LEO、HW的含义,随着follower副本不断和leader副本进行数据同步,follower副本的LEO主键会后移并且追赶到leader副本,这个追赶上的判断标准是当前副本的LEO是否大于或者等于leader副本的HW,如果follower在replica.lag.time.max.ms时间范围内追赶上了leader副本,该follower副本则加入到ISR副本内,也可以使得之前被踢出的follower副本重新加入到ISR集合中;如果在replica.lag.time.max.ms时间范围内follower副本没追赶上leader副本,该follower副本会被从ISR副本范围内踢出,可以看出ISR副本是一个由zookerper动态监控的变化的副本。

六、数据可靠性和持久性保证

6.1 生产者不丢数

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • request.required.acks=0
    producer写入的一条消息会立即返回ack确认消息,不管leader副本是否同步完或者ISR中的follower副本是否同步完,此配置丢失数据风险很大,生产环境很少使用。
  • request.required.acks=1(默认配置)
    producer写入的一条消息后会等到leader副本同步完成(不需要等到ISR内的follower副本同步完成)后立即返回给客户端ack消息。该配置的风险是如果ISR内的follower副本还没有完成信息同步时,leader节点宕机了,然后通过选举一个follower副本做为新的节点,此时就会有数据丢失的问题,相当于mysql的主从同步,优点就是可用性强,缺点就是弱一致性,可能造成数据丢失。
  • request.required.acks=-1
    producer写入的一 条消息需要等到分区的leader 副本完成同步,且需要等待ISR集合中的所有follower副本都同步完之后才能返回producer确认的ack,这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失,类似于强一致性,追求强一致性也就意味着可用性(响应时间)会降低。设置成-1就可以保证写入的数据不丢失了吗?不一定,`比如当ISR中只有leader副本时(前面ISR那一节讲到,ISR副本中的成员由于某些情况会增加也会减少,最少就只剩一个leader),当leader副本宕机后,所有数据丢失。
    为了避免数据的丢失,提高可靠性,避免ISR副本中只有一个leader副本情况的发生,可以使用参数min.insync.replicas来约束,该参数的意思是设定ISR中的最小副本数是多少,总数包含leader副本和follower副本之和,如果ISR中的副本数不够参数min.insync.replicas所设定的值,客户端会返回异常。
    如果由于网络原因导致producer push数据失败了,我们可以设置retries参数来进行重试,总结:producer消息不丢失需要下面3中措施:
  • request.required.acks=-1
  • 设置min.insync.replicas参数
  • 设置retries参数

6.2 broker数据不丢失

上面已经介绍过unclean.leader.election.enable=false参数。
这里设置unclean.leader.election.enable=false,表示:如果ISR副本全部宕机后,等到ISR副本中的里一个副本启动之后,并将他做为leader副本.

6.3 consumer数据不丢失

enable.auto.commit该参数默认为true,表明consumer在下次poll消息时自动提交上次poll出的所有消息的消费位移,如果设置为false,则需要用户手动提交手动提交所有消息的消费位移。

6.4 消息重复消费和消息丢失的场景

当 enable.auto.commit设置为true的时候会有消息重复消费和消息丢失的场景。
当应用端消费消息时,还没有提交消费位移的时候,此时kafka出现宕机,那么在kafka恢复之后,这些消息将会重新被消费一遍,这就造成了重复消费。
比如consumer第一次poll出n条消息进行消费,达到auto.commit.interval.ms时间后,cosumer会进行下一次poll并提交上次poll出的n条消息的消费位移。如果第一次poll出的n条消息客户端还没有消费完,此时客户端宕机了,当客户端重启后,将会从第二次poll的位置开始拉取消息,从而丢失第一次未提交消费位移的消息,这就造成了数据丢失。

6.5 只能避免数据丢失而不能解决数据重复

当设置enable.auto.commit为false时,所有的消息位移提交都为手动提交了,所有可以避免上面提到的数据丢失问题,可以保证consumer消息时数据不会丢失。
手动提交有同步提交和异步提交,我们可以选择在应用端处理完消息后手动提交消费位移。如果在消费完消息准备提交消息位移的时候,应用端发生了宕机,那么重启之后这些消息还是会被重新消费一遍,所以通过配置enable.auto.commit参数为false只能避免消费端丢失消息而不能避免消费端重复消费消息.

七、消息存储持久化

消息发送端发送消息到broker上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储首先我们需要了解的是,kafka是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。Kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,Log并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>

7.1 消息的文件存储机制

一个topic的多个partition在物理磁盘上的保存路径,路径保存在 /tmp/kafka-logs/topic_partition,包含日志文件、索引文件和时间索引文件
复习一下Kafka
kafka是通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的,索引文件是用来保存消息的索引。

7.2 LogSegment

假设kafka以partition为最小存储单位,那么我们可以想象当kafka producer不断发送消息,必然会引起partition文件的无线扩张,这样对于消息文件的维护以及被消费的消息的清理带来非常大的挑战,所以kafka 以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。
segment file由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值进行递增。数值最大为64位long大小,20位数字字符长度,没有数字用0填充
复习一下Kafka

7.3 segment中index和log的对应关系

从所有分段中,找一个分段进行分析
为了提高查找消息的性能,为每一个日志文件添加2个索引索引文件:OffsetIndex 和 TimeIndex,分别对应.index以及.timeindex, TimeIndex索引文件格式:它是映射时间戳和相对offset
复习一下Kafka
如图所示,index中存储了索引以及物理偏移量。 log存储了消息的内容。索引文件的元数据执行对应数据文件中message的物理偏移地址。举个简单的案例来说,以[4053,80899]为例,在log文件中,对应的是第4053条记录,物理偏移量(position)为80899. position是ByteBuffer的指针位置

7.4 如何通过offset查找message

查找的算法是:

  • 1、根据offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一个文件的最后一个offset进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
  • 2、找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)
  • 3、得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息
    比如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是否大于等于2490。最后查找到对应的消息以后返回

7.5 日志的清除策略以及压缩策略

1、日志清除策略

前面提到过,日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个:

  • 1、根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程
  • 2、根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息
    通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7天
2、日志压缩策略

Kafka还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。因此,我们可以开启kafka的日志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进行合并,只保留最新的value值。日志的压缩原理是
复习一下Kafka

八、小结

本文仅仅是自己复习一下,涉及并不太深入也并不太全面,如果想更深入了解,可以学习一下《深入理解Kafka- 核心设计与实践原理》,作者是之前唯品会的同事朱忠华,他对rabbitMQ、Kafka了解非常深入,其实我也是拜读他的几篇博客,理解非常到位。另外新一代消息中间件已问世,名字是Pulsar,下一代云原生消息平台、Apache *项目 ,有空大家了解学习一下,他既拥有 Kafka 的优势,又规避它的缺陷,同时还融合了 MQ 的一系列特性。

上一篇:Java高级进阶学习资料!java创建list集合对象


下一篇:关于hive核心