空学Kafka之一

搞了好多年的RPC框架有点疲了,最近在换到中间件的另一个方向MQ,准备做些kafka上阿里云的商业化工作,趁着热乎劲整理下kafka的知识,主要内容是阅读Kafka的作者之一Neha的书:The Definitive Guide的记录,加入一些其他参考。

先说下Kafka名称来历, Kafka作者之一Jay是这么解释的:“我想既然 Kafka 是为了写数据而产生的,那么用作家的名字来命名会显得更有意义。我在大学时期上过很多文学课程,很喜欢 Franz Kafka。况且,对于开源项目来说,这个名字听起来很酷。因此,名字和应用本身基本没有太多联系。”

基本概念

  • 消息与批次:消息相当于数据的一条记录;“在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。”
  • 模式 (schema):像 JSON 和 XML 这些简单的系统,不仅易用,而且可读性好。不过,它们缺乏强类型处理能力,不同版本之间的兼容性也不是很好。Kafka推荐Avro,因为当Avro schema发生变化时,不需要重新生成代码;它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。数据格式的一致性对于 Kafka 来说很重要,来消除了消息的生产者与消费者操作之间的耦合性,通过定义良好的模式,并把它们存放在公共仓库,可以方便我们理解 Kafka 的消息结构。
  • 主题(Topic)与Partition:Kafka 的消息通过主题 进行分类。主题就好比数据库的DataSource数据源,或者文件系统里的文件夹。主题可以被分为若干个分区 ,一个分区就是一个commitlog。跟数据库的分库分表来做比较,一个10库10表的数据源就相当于一个topic分布到10个broker而每个broker由10个partition。生产者在不停地put/insert数据,通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的partition上。消费者在不停地get数据,通过负载均衡策略服务来选择partition进行消费。与大家熟悉的分布分表组件组件做个类比,就能更好更容易的理解在单元化/LDC/自适应多活架构中消息的设计实现;甚至于更容易接受Confluent(Kafka背后的商业化公司,估值挺高)近来一直宣导的“turning-the-database-inside-out”;说到这里,想到了OceanBase对于使用者已经屏蔽了类似TDDL分库分表的组件逻辑,只需要MySQL的客户端就能访问OceanBase,Kafka也是一样,OSS一样,所有的分布式系统都有可类比的地方,只是所针对的场景是不一样的,kafka侧重在queue数据管道,OceanBase侧重在ACID的k/v,OSS侧重在海量文件(大对象)的持久。
  • Broker与集群:根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。Kafka 的消息复制机制只能在单个集群里进行,不能在多个集群(集群建议是数据中心内收敛独立部署)之间进行。Kafka 提供了一个叫作 MirrorMaker 的工具,可以用它来实现集群间的消息复制。
  • Controller:每个 Kafka 集群都有一个控制器controller,负责管理topic分区和副本的状态的变化,以及执行重分配分区之类的管理任务。它是运行在集群某个 broker 上的一个线程。控制器负责看管集群的操作,有时候需要将控制器从一个 broker 迁移到另一个 broker 上。当前控制器将自己注册到 Zookeeper 的一个节点上,这个节点处于集群路径的最顶层,名字叫作 /controller 。手动删除这个节点会释放当前控制器,集群将会进行新的控制器选举。Controller提供的服务功能基本上都是结合ZK的变化通知来一起完成,比如topic的创建或删除。
  • Coordinator:有两个主要用途,分别是协调consumer之间的partition路由、offset等,协调kafka所定义跨分区跨topic跨消费生产的transaction语义。这块应该是kafka中最复杂的一块。

技术点记录

部署运维方面

如何选择partation的数量

  • 主题需要达到多大的写入吞吐量?例如,是希望每秒钟写入 100KB 还是 1GB ?
  • 从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒 50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒 50MB。
  • 可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。
  • 每个 broker 包含的分区个数、可用的磁盘空间和网络带宽。
  • 如果消息是按照不同的键来写入分区的,那么为已有的主题新增分区就会很困难
  • 单个 broker 对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间也越长。

根据以上的因素,如果你估算出topic的生产者吞吐量和单个消费者吞吐量,可以用主题吞吐量除以单个消费者吞吐量算出分区的个数。也就是说,如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。如果不知道这些信息,那么根据经验,把分区的大小限制在 25GB 以内可以得到比较理想的效果。

消息文件的若干配置

  • log.retention.ms:通常根据时间来决定数据可以被保留多久,是相对于最后的修改时间,一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里最后一个消息的时间戳,移动或者通过Linux命令改动日志文件都可能会导致滚动删除的失效。
  • log.retention.bytes:另一种方式是通过保留的消息字节数来判断消息是否过期,是作用在每一个分区上。如果同时指定了 log.retention.bytes 和 log.retention.ms (或者另一个时间单位),只要任意一个条件得到满足,消息就会被删除,即是“或”的关系。
  • log.segment.bytes:当日志片段大小达到 log.segment.bytes 指定的上限(默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。
  • log.segment.ms:指定了多长时间之后日志片段会被关闭。log.segment.bytes 和 log.retention.ms 这两个参数之间也不存在互斥问题,日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms 没有设定值,所以只根据大小来关闭日志片段。需要考虑并行关闭多个日志segment的问题。
  • message.max.bytes:来限制单个消息的大小,默认值是 1 000 000,也就是 1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误信息。这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。另外,这个值与消费者客户端设置fetch.message.max.bytes 必须与服务器端设置的消息大小进行协调,如果前者比后者大,就会消费阻塞——这一点我不是很明白,为什么不拉取下一条?

硬件的选择

如果关注整体性能,就需要在预算范围内选择最优化的硬件配置:磁盘吞吐量和容量、内存、网络和 CPU。

  • 硬盘:HDD还是SSD:成本和存储容量的权衡,最终会影响到生产者的RT。
  • 内存:pagecache的大小问题,最终会影响消费者的RT。
  • 网络:因为消费与生产的比重一般大于1,网络流出大于流入的不平衡,同时考虑复制和mirrormake等也会加重不平衡。
  • CPU:要求较低。主要消耗在安全,解压缩等。

要根据 Kafka 的性能优先级来选择合适的实例。可以先从要保留数据的大小开始考虑,然后考虑生产者方面的性能。如果要求低延迟,那么就需要专门为 I/O 优化过的使用固态硬盘的实例,否则,使用配备了临时存储的实例就可以了。选好存储类型之后,再选择 CPU 和内存就容易得多。实际上,如果使用 AWS,一般会选择 m4 实例或 r3 实例。m4 实例允许较长时间地保留数据,不过磁盘吞吐量会小一些,因为它使用的是弹性块存储。r3 实例使用固态硬盘,具有较高的吞吐量,但保留的数据量会有所限制。如果想两者兼顾,那么需要升级成 i2 实例或 d2 实例,不过它们的成本要高得多。 —— 阿里云对于Kafka的用户也应该有对应的建议配置;从Confluent的Kafka云服务来看,是集成了AWS,Azure,GoogleCloud的部署入口,当前是没有aliyun的选项。

通常来说,在做kafka集群的容量规划时,消息的总容量(包含备份复制)和网络流量情况两个因素就能估算出Broker集群的大小。

OS+JVM的配置

  • 虚拟内存:尽量避免内存交换。内存页和磁盘之间的交换对 Kafka 各方面的性能都有重大影响。建议把 vm.swappiness 参数的值设置得小一点,比如 1。该参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减小页面缓存,而不是进行内存交换。通过设置 vm.dirty_ratio 参数可以增加被内核进程刷新到磁盘之前的脏页数量,可以将它设为大于 20 的值(这也是系统内存的百分比)。这个值可设置的范围很广,60~80 是个比较合理的区间。不过调整这个参数会带来一些风险,包括未刷新磁盘操作的数量和同步刷新引起的长时间 I/O 等待。如果该参数设置了较高的值,建议启用 Kafka 的复制功能,避免因系统崩溃造成数据丢失。
  • 文件系统:XFS相比较EXT4 为 Kafka 提供了更好的性能,除了由文件系统提供的自动调优之外,无需额外的调优。批量磁盘写入具有更高的效率,可以提升整体的 I/O 吞吐量。不管使用哪一种文件系统来存储日志片段,最好要对挂载点的 noatime 参数进行合理的设置。文件元数据包含 3 个时间戳:创建时间(ctime)、最后修改时间(mtime)以及最后访问时间(atime)。默认情况下,每次文件被读取后都会更新 atime,这会导致大量的磁盘写操作,而且 atime 属性对于Kafka用处不大,可以禁用。
  • 网络:设置 TCP socket 的读写缓冲区,它们的参数分别是 net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem 。把 net.ipv4.tcp_window_scaling 设为 1,启用 TCP 时间窗扩展,可以提升客户端传输数据的效率,传输的数据可以在服务器端进行缓冲。把 net.ipv4.tcp_max_syn_backlog 设为比默认值 1024 更大的值,可以接受更多的并发连接。把 net.core.netdev_max_backlog 设为比默认值 1000 更大的值,有助于应对网络流量的爆发,特别是在使用千兆网络的情况下,允许更多的数据包排队等待内核处理。(熟悉Java网络编程的同学也知道,这些配置在Java应用中也可以配置)
  • GC:优先选择G1;如果一台服务器有 64GB 内存,并且使用 5GB 堆内存来运行 Kafka,那么可以参考以下的配置:MaxGCPauseMillis 可以设为 20ms;InitiatingHeapOccupancyPercent 可以设为 35,这样可以让垃圾回收比默认的要早一些启动。
  • 其他的生产建议:部署上建议跨机架(这是调度服务需要考虑的);同时建议在0.9版本之后把offset保存到broker来减少对zk的压力和依赖,不建议多集群复用zk服务。

基本的设计点

消费者的Rebalance

在0.9以前的client api中,consumer是要依赖Zookeeper的,来对整个集群的数据进行监听获取,并各自做rebalance的计算,会有惊群(herd effect)和脑力(brain split)的问题,同时会加重ZK本身的负载会有性能稳定性的问题。herd effect:任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance;split brain:每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。在0.9中,不再用zookeeper,而是Kafka集群本身来进行consumer之间的同步,也就是下面说的机制。

类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。

  1. 看offset保存在那个partition
  2. 该partition leader所在的broker就是被选定的coordinator

Rebalance的过程

  1. consumer给coordinator发送JoinGroupRequest请求。
  2. 这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
  3. 其他consumer发送JoinGroupRequest请求。
  4. 所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
  5. consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
  6. coordinator回包,把分配的情况告诉consumer,包括leader。

注意以上partition的分配还是在Consumer端完成,这一设计的原因是根据业务的不同会有很多灵活的需求,从逻辑上看,这种灵活的分配需求来自消费端,而并非broker端,当消费端有新的需求时,如果需要调整的却是Broker端,显然有些怪异,也不够灵活。是包含了一种责任边界清晰化的设计考量。

当partition或者消费者的数量发生变化时,都得进行reblance。列举一下会reblance的情况:

  1. 增加partition
  2. 增加消费者
  3. 消费者主动关闭
  4. 消费者宕机了
  5. coordinator自己也宕机了

消费者的offset状态维护

一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。确定consumer group位移信息写入__consumers_offsets 的哪个partition,具体计算是Consumer_Group与partition数量的哈希值。

生产者

分区器的实现,Schema管理

服务端文件组织

kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:
空学Kafka之一

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。

  • baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
  • position:在segment中的绝对位置。
  • 查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。

可靠性保证

可靠性的定义因人而异,比如Rocketmq的侧重在通过二次异步提交的方式来达到生产者的低且稳定的RT,虽然有加大丢失数据的风险,即CAP中的A(可用性);而Kafka是侧重在多副本复制而来的数据一致性,Kafka 的复制机制和分区的多副本架构是 Kafka 可靠性保证的核心,即CAP中的C(一致性)和ACID中的D(持久性)。可靠性这一系统行为是在一定条件下的权衡,需要针对具体的业务场景而定,这种权衡一般是指消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本的重要程度之间的权衡。

  • Broker的可靠:每个partition分区可以有多个副本,其中一个副本是Leader。所有的事件都直接发送给Leader,或者直接从Leader读取事件。其他副本只需要与Leader保持同步,并及时复制最新的事件。当Leader不可用时,其中一个同步副本将成为新Leader。与 Zookeeper 的心跳6s内失败,或者不再获取新消息,或者获取消息滞后了 10s 以上,那么此副本就被认为是不同步副本。

    •  replication.factor :topic级别的副本个数。如果复制系数为 N ,那么在 N -1 个 broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的复制系数会带来更高的可用性、可靠性和更少的故障。另一方面,复制系数 N 需要至少 N 个 broker,而且会有 N 个数据副本,也就是说它们会占用 N 倍的磁盘空间。比如金融级别的数据建议为5,日志监控类的数据为1。同时,可以通过broker.rack 参数来为每个 broker 配置所在机架的名字来提升高可用性。
    • unclean.leader.election: 在 broker 级别(实际上是在集群范围内)进行配置,它的默认值是 true,就是允许不同步的副本成为首领(也就是“不完全的选举”),那么我们将面临丢失消息的风险。如果把这个参数设为 false ,就要等待原先的首领重新上线,从而降低了可用性。在金融级的场景下,需要设置为false。
    • min.insync.replicas:主体级别的配置,最小同步的副本数。
  • Producer的可靠:即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。

    • acks=n:0意味着oneway,最快但没有任何保证;1意味着Leader在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应;all 意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。
    • 异常处理:分为可重试(比如网络抖动带来的网络异常)和不可重试(比如无法序列化,消息不符合规则等)。如果发生正常的首领选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,只要重试就能保证消息发送OK(会带来潜在的消息重复问题,消息在append时做幂等控制还是比较困难)。
  • Consumer的可靠:消费者唯一要做的是跟踪哪些消息是已经读取过的,哪些是还没有读取过的。这是在读取消息时不丢失消息的关键。已提交消息是指已经被写入所有同步副本并且对消费者可见的消息——这里试想下如何在多副本间同步已提交的进度?

    • 消费进度的提交:自动或显式。前者简单,但可能会导致没消费成功而推进offset,或者重复消费。显式提交需要考虑频度(是每次消费完还是异步延迟),是性能和重复消息数量之间的权衡。
  • 监控可靠性

    • 对于生产者来说,最重要的两个可靠性指标是消息的 error-rate 和 retry-rate(聚合过的)。如果这两个指标上升,说明系统出现了问题。
    • 对于消费者来说,最重要的指标是 consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想情况下,该指标总是为 0,消费者总能读到最新的消息。—— 在最Kafka2.0版本中,有另一个lag指标:当分区 renteion 时间很短而导致消费者跌出可消费范围时(out-of-range),consumer-lag指标不能完全针对潜在的危险为用户报警。因此加入了另一个“领先”指标(lead metrics),定义为分区首端(log-start-offset)与消费者在分区上的位置距离,当此指标趋近于零时,代表消费者有跌出可消费范围因而丢失数据的危险。
    • 监控数据流及时地读取:为了确保数据能够被及时读取,你需要知道数据是什么时候生成的。这方面Notify,RocketMQ定义的就比较好,有borntime(客户端生成时间),gmtCreate(到达服务端时间)等;同时加上有eagleeye tracing这样的链路分析会更加的精细。PS,这里面需要提一下,Kafka的“自产自销”的好用处:增加一个“监控消费者”的Topic,这个消费者订阅一个特别的主题,它只进行消息的计数操作,并把数值与生成的消息数量进行对比,这样我们就可以在没有消费者的情况下仍然能够准确地监控生产者。


上一篇:空学Kafka之二


下一篇:NERSC扩展深度学习计算能力达15千万亿次FLOPS