前言
Kafka 是一种分布式的,基于发布/订阅体系的消息队列系统。
能够以 时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证 常数时间复杂度 的访问性能。
即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
kakfa 是大数据系统架构中不可或缺的一部分,因其高效的性能、高可靠的数据与服务、提供准确一次的语义、优秀的在线水平扩展等特性,常用作为流式计算系统的数据源来处理活动数据流。
本文将从 架构、生产者、消费者、内部机制 等理论部分内容开始逐一讨论 Kakfa 的技术实现。
最后在 生产实践 中通过列举一些 软硬件配置、性能优化、使用规范 方面的建议,希望能为有兴趣学习 Kakfa 或者正在使用 kafka 的开发人员提供帮助。
一、架构
1.1 架构总览
物理角色
Kakfa 集群由以下几种角色组成:Zookeeper、Broker 和 Client。
Kafka 集群通过 Zookeeper 维护集群节点,进行控制器选举以及集群配置管理 等活动,是 Kakfa 的基础依赖。
Broker 是 Kafka 集群中的 服务端节点,负责 存储数据、协调分区副本、管理消费者群组、处理客户端请求 等工作。
一个 Kafka 集群由多个 Broker 组成,一个 Broker 为一台服务端机器,如图中的 Broker1、Broker2、Broker3。
在 Kafka 集群启动时,Broker 会使用 client-id
在 Zookeeper 中注册 /brokers/ids
临时节点。
该临时节点在 Broker 没有提供心跳或者 Broker 退出时将被清理,其他节点获得改及诶单移除出集群的通知,
Client 为 开发人员使用的客户端,不属于 Kafka 集群的节点,但是是组成 Kafka 集群的重要角色,分为 生产者与消费者 两种类型。
- Producer: 生产者是集群中生产数据的地方,通过push的方式将数据推送到集群服务端存储。
- Consumer: 消费者是集群中消费数据的地方,通过pull的方式将数据从集群服务器中拉取并消费。
值得注意的是,Kafka 集群中并没有 Master 和 Slave 节点的概念,只有两种类型的节点:服务端和客户端。
逻辑角色
在 Broker 服务端中,Kafka 以 Topic 的形式来组织数据,每条发送到服务端的消息必须要指定其属于哪个 Topic,不同的 Topic 中的数据在物理上分开存储。
每个 Topic 包含 一个或多个 Partition(在创建 Topic 时指定),同一个 Topic 的不同 Partition 可能存在不同的 Broker 中,从而实现 数据和请求的负载均衡。
每个 Partition 都拥有自己的 副本(副本数可配置),副本之间有 Leader 与 Follower 的关系:
- Leader 副本负责统一接受客户端读写请求,保持数据一致性。
- Follower 副本从 Leader 副本同步数据,并时刻准备成为 Leader 副本提供服务。
如上图中,TopicA 拥有3个分区:part0、part1和part2。
每个分区都一个副本,每个副本都处于 Leader 或者 Follower 的状态,同一个分区的副本会被 Broker 尽量 存储在不同的服务器上,防止某台机器宕机后主备副本都不可用的状态。
其中有趣的概念是,Kafka 中 Leader 和 Follower(主从) 的概念是 针对副本而不是针对节点的,这与其他分布式系统有很大的区别,注意不要混淆 Kafka 的副本主备概念。
1.2 控制器
在集群中,Broker 存在两种类型:普通的 Broker 和 具有分区副本主备选举功能的 Broker。
具有选举功能的 Broker 被称为 Controller,负责分区 Leader 的选举。
第一个 在 Zookeeper 注册的 Broker 即为集群中的 Controller,将会同时注册 /controller
节点。
其余 Broker 也会进行 /controller
节点的竞争,当竞争失败时会在该节点注册一个watch,以便当前控制器宕机之后 继续竞争控制器节点。
当出现集群中有 Broker 离开或者加入时,控制器会对集群中的分区副本做 重新选举或者副本同步的操作。
节点离开
当 Broker 离开集群时,控制器会得到通知(控制器也是一个 Broker,在zk上监听了/brokers/ids
临时节点),其会判断 哪些分区的 Leader 在离开集群的 Broker 之上。
它需要为这些分区重新选举出一个新的 Leader(简单来说就是分区列表的下一个分区),否则这些分区的数据将无法使用。
指定新的分区副本 Leader 后,控制器还需要向 包含这些分区的 Broker 发送请求 说明谁成为了新的 Leader 和 Follower。
随后新的 Leader 开始接受客户端请求,Follower 开始从 Leader 同步数据。
节点加入
当新的 Broker 加入集群时,将根据 Broker Id 确认当前 Broker 是否包含现有分区副本。
如果有,控制器会通知 拥有相关分区的 Broker 有新的 Follower 加入,随后新 Broker 上的 Follower 分区开始从 Leader 同步数据。
Kafka 通过控制器来实现分区副本的 主备选举与主备关系协调机制,需要注意的是,该机制并不直接通过 Zookeeper 进行,而是通过具有控制器功能的 Broker 进行。
1.3 分区副本
前面我们提到过,副本有两种类型:Leader 和 Follower,也可以称之为主备副本。
Leader 副本负责统一接受客户端读写请求,保持数据一致性。并且会通过 定期检查 每个 Follower 最新同步的偏移量 来检查所有 Follower 的 数据同步是否一致。
如果发现 Follower 偏移量不是最新的,或者超过10s未更新偏移量则认为是 「不同步」 的 Follower 副本。
该 Follower 在选举时将不会成为 Leader ,因为其被认为 没有包含所有的数据,是不完整的。
而 Follower 副本本身并不具备什么功能,只是 定期的从 Leader 复制数据,并能够在 Leader 宕机后 重新选举成为分区 Leader,使 Kafka 集群 时刻保持数据高可用状态。
二、生产者
生产者是 Kafka 中的生产客户端,生产者将数据发送给 Kafka 服务端后即结束流程,是 Kafka 中开发人员经常需要接触的组件。
2.1 构建生产者
开发人员可以通过 Kafka 提供的 Client API 来构建自己的生产者对象:
val props = new Properties()
//设置broker地址
props.put("bootstrap.servers","broker1:9092")
props.put("key.serializer","org.apache.kafka.commmon.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.commmon.serialization.StringSerializer")
//创建生产者对象
val producer = new KafkaProducer[String,String])(props)
其中,bootstrap.servers
不需要写所有的 Broker 地址,只需要设置一个 Kafka 会通过 自动发现 来查找到所有可用的 Broker。
构建完生产者对象后,使用 send
方法即可向指定的 Kafka 集群发送数据:
//构建数据对象,包含Topic和数据值
val record = new ProducerRecord("Topic1","test-value")
producer.send(record)
send
方法会返回一个 Feature
对象,当直接忽略该返回值不做处理时,则生产者结束此次发送,但是我们 无法得知数据的发送情况。
如果我们需要等待服务端返回处理的响应只需要调用 Feature
对象的 get
方法即可 同步等待 响应并得到返回结果:
val result = producer.send(record).get
但是这种同步的方式将会 阻塞住当前的线程,可以预见的是使用该方式的生产者 并发效率并不高。
可以看到,无论是忽略还是同步模式,都有缺陷,有没有办法做到高效率的同时还能关心数据是否接受成功?
我们可以使用 异步回调 的方式来实现这个目标:
import org.apache.kafka.clients.producer.Callback
//定义回调函数
private class DemoProducerCallback extends Callback {
@override
def onCompletion(recordMetadata: RecordMetadata, e: Exception) {
if (e != null) {
e.printStackTrace()
}
}
}
//设置回调函数
producer.send(record, new DemoProducerCallback())
在异步回调的模式中,生产者发送数据之后并不会阻塞当前线程,而是提供回调函数让服务端在数据处理完毕之后调用通知客户端。
从以上描述可以看出,对于有高可靠要求的场景来说,同步是最简单、最合适的发送方式,但是同步方式带来的是性能方面的问题,同步线程阻塞后程序并发注定无法提高。
此时可以适当使用提供回调机制的异步方式来提高并发度。
而忽略的发送方式可以带来最高的并发效果,但是客户端无法得知服务端的数据处理结果,是一种比较不可靠的方式。
2.2 生产者的写入流程
如上图所示,在我们使用 Kafka 提供的 API 构建生产者对象时,其内部发生了一系列的对象构建与运转流程。
首先,开发人员需要使用Topic、kv数据对等信息来构建 ProduceRecord 对象,也可以指定分区信息。
调用 send
方法之后,客户端会先将对象通过 序列化器将键值数据序列化为字节数组,这样才能够在网络上进行传输。
接下来数据会进入 分区器,在分区器中将会根据数据携带的key分配一个分区号(如果已经手动指定该数据所属的分区,则分区器不作任何事情)。
通过分区器之后,生产者就知道要将该数据往哪个 Topic 的哪个分区中发送了。
但是此时生产者并不会立即发送数据,而是 将该数据缓存在本地批次中。
生产者的本地缓存了 多个批次 的数据,每个批次的数据全部都 属于同一个 Topic 的同一个分区,生产者将会启动独立的线程发送批次的所有消息到指定 Broker。
利用批量发送的方式提升吞吐、节约网络资源,成功返回 RecordMetaData 对象,失败则返回错误信息。
三、消费者
消费者组件是开发人员在 Kafka 集群的场景中最经常接触的组件,比生产者的频率还高,同时内部机制也比生产者简单的构建生产者对象并发送数据要复杂。
消费组负责从 Kafka 服务器拉取数据并处理。
3.1 消费者组与群组协调器
首先我们要接受的第一个概念是 消费者组。
Kafka 中,消费者按组来划分与组织,每个消费者对象都会有自己的 groupid
,同一个 groupid
的所有消费者对象成为一个 消费者组。
Topic 中的一条消息 在同组的消费者对象中只能被消费一次,组内的各个消费者消费的消息不存在交集,每个消费者有且只对应一个分区的数据。
另一方面,如果有多个消费者组消费同一个 Topic,那么 Kafka 服务端将会把该 Topic 中的消息都分给各个消费者组进行消费,类似广播模式,各个消费者组都能都到一份一模一样的数据,组内再进行数据划分。
假设现有 Topic 分区为4,只有一个消费者组:
- 组内消费者数量为1:那么该消费者将会负责消费4个分区的所有数据。
- 组内消费者数量为2:每个消费者将会均匀地负责消费2个分区的数据。
- 组内消费者数量为4:那么每个消费者都会单独消费一个分区的数据。
- 组内消费者数量为6:那么将会有两个消费者被闲置。
所以在特定场景下,我们有可以通过 设置大量的 Topic 分区,并 增加消费者组内的数量 就可以做到 横向扩容,所以增加消费者数量是消费者端提升并发的重要方式。
那么是谁在决定 Topic 中的哪个分区给哪个消费者处理呢?
这个过程需要两个角色来配合处理:群组协调器与群主。
群组协调器是由 Broker “兼职”的,每个消费者组可以有不同的协调器,每个 Broker 都可能兼职做着协调器的工作。
消费者客户端启动后会做两件事情:
- 向群组协调器 发送加入群组的请求
- 定期向协调器 发送心跳 表示自己是活跃的
第一个被协调器接收到请求的消费者将成为 群主,群主会从协调器中获取 所有活跃的消费者(最近一段时间和协调器通信的消费者),并负责给这些消费者 分配分区(通过PartitionAssignor),最后将分配结果返回给协调器。
协调器收到分配结果后,将会 一一通知消费者他们所负责的分区,消费者收到分区信息之后开始消费数据,并在消费数据的过程中定期给协调者发送心跳。
每个消费者组中都有一个消费者担任着 群主 的角色。
群组协调器在发现消费者组中的 消费者数量增加或者减少 时,将会 触发以上过程重新触发分区分配,这个过程称为 分区再平衡。
分区再平衡过程非常重要,它为消费者组带来了 高可用性和伸缩性。
但是正常情况下开发人员一般都不会希望分区再平衡的发生,即使这个过程是自动的,不需要用户关心。
因为分区再平衡过程中,消费者将无法读取数据,会导致整个消费者组一段时间内的不可用(因为分区正在被重新分配),并且新的消费者可能会丢失老消费者的进度。
3.2 构建消费者
开发人员可以通过 Kafka 提供的 Client API 来构建自己的消费者对象:
val props = new Properties()
//设置broker地址
props.put("bootstrap.servers","broker1:9092")
//设置groupid
props.put("group.id","myGroup")
props.put("key.serializer","org.apache.kafka.commmon.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.commmon.serialization.StringSerializer")
//创建消费者
val consumer = new KafkaCOnsumer[String, String](props)
//订阅一个Topic列表
consumer.subcribe(Collections.singletonList("Topic"))
构建完生产者对象后,使用 poll
方法即可向指定的 Kafka 集群拉取数据:
try {
//无限循环,长期运行的消费者程序
while (true) {
//拉取数据的阻塞等待时间,如果为0则马上获取数据
//返回一个消息列表
val records = consumer.poll(100)
for (record <- records){
println(record)
}
}
} finally {
//退出时应该及时关闭消费者立即进行分区再平衡,否则协调器将等到超时之后才触发,这段时间内有数据将不会被消费到
consumer.close()
}
3.3 消费者的读取流程
消费者客户端通过 poll轮询接受消息 的方式从服务端拉取数据,在这个过程中将会自动处理以下流程:
- 消费者注册到群组协调器
- 接受群组协调器分配的分区和偏移量
- poll中拉取对应分区的数据(包括分配的新分区)
- 持续发送心跳给群组协调器
其中,拉取对应分区数据后,消费者需要 将本次数据消费之后的偏移量更新到服务端。
消费者的偏移量有 自动和手动 两种方式可以选择。
如果开启了 enable.auto.commit
则,偏移量的自动提交在 poll
方法中根据 auto.commit.interval.ms
时间间隔(默认5s) 定期发送poll返回的最新的偏移量,开发人员不需要进行任何操作,但是此方法在消费者组分区再平衡进行时可能造成重复数据处理。
除了自动提交之外,消费者 API 还提供了手动提交的方式给开发人员,用来在自己觉得必要的时刻提交偏移量而不是基于时间间隔。
关闭 enable.auto.commit
并在数据处理完毕后调用 consumer.commitSync
则会将 poll
中返回的最新偏移量提交,在 Broker 做出回应之前 程序将会阻塞,如果失败将会 重复尝试提交。
有同步肯定会有异步,调用 consumer.commitAsync
将会异步提交偏移量而不必阻塞,但是随之而来的缺点是:如果因为一些网络问题,导致之前提交的偏移量没有被服务端接受,在当前新的偏移量提交并被接受之后,旧的偏移量才被服务端收到,此时如果发生在均衡,那么将会有数据被重复消费。
无论是同步还是异步,单独使用时都有一定的限制,但是开发人员可以通过 同步和异步的结合 来实现最优选择。
在正常的业务流程中使用 异步提交提升效率,在 try-catch
代码块中消费者停止时使用 同步来保证偏移量会被正确提交,即可实现最优组合。
除了提交自动生成的偏移量之外,消费者还支持 提交自定义的偏移量。
自定义偏移量不再一次poll提交一次,而是在可以在poll处理批次中任意位置提交任意偏移量。
需要通过构建 Map[TopicPartition, OffsetAndMetadata]
对象来设置 Topic、Partition 与其对应的偏移量,并通过 consumer.commitAsync(map)
提交该设置。
四、内部机制
4.1 Broker 请求处理
Broker 处理的请求类型有两种:
- 数据生产请求:生产者客户端的写入请求
- 数据获取请求:消费者客户端、副本 Follower 从 Broker 读取消息的请求
无论是读请求还是写请求,其都需要经过一个 查找分区 Leader 的过程。
因为集群的元数据缓存在所有 Broker 上,所以客户端(副本 Follower 对于 Broker 来讲也是一个客户端)可以请求任一 Broker。
Broker 返回对应分区 Leader 所在的 Broker 地址,客户端将该地址缓存,下次直接发送请求,并会 定期刷新元数据缓存。
Leader 所在的 Broker 收到数据请求后,所有请求被交给 Processer 线程处理将经过以下过程:
- 请求列队:Processer 将请求放到 顺序的请求队列 中,从而可以保证消息队列顺序的特性
- 响应队列:IO线程 负责顺序处理请求队列中的请求,检索数据并将结果放入 响应队列 中
- 响应请求:Processer 从 响应队列 中获取结果并返回给客户端
对于写请求来说,Processer 会进行 是否有权限、ack值是否合法 等检查,IO线程将请求中的数据写入磁盘(此时是linux系统的文件缓冲区并不是真正的磁盘块)。
- 如果请求中
ack=0 or 1
则 Processer 直接返回。 - 如果请求中
ack=all
,则 Processer 请求放入缓冲区中,等到所有 Follower 同步完成再将该请求返回。
对于读请求来说,Processer 会进行 偏移量是否存在 等安全检查,IO线程根据请求中的信息(Topic、Partition、Offset等)从磁盘读取数据,从文件缓冲区直接读取并发送,这是 Kakfa 使用的 零复制技术,不经过本地内存缓冲区,有更好的性能。
这个过程只能读取 被所有副本同步了的数据,以保证数据一致性。
4.2 数据存储
生产者发送的数据会被 Broker 通过有效的流程和组织存储于服务端,并能够给消费者提供高效的数据检索。
分区分配与存储
在服务端创建Topic时,需要指定分区与副本数,假设现我们在6个节点的集群中创建一个分区数为10、副本数为3的 Topic,那么在集群中将会产生30个分区副本:
- 分区之间的 Leader 顺序分配: 首先随机选择一个 Broker1 存储分区1的 Leader 副本,随后 Broker2 存储分区2的 Leader 副本,以此类推;
- 各个分区中的 Leader 和 Follower 不存在一个 Broker 上: 分区1的 Leader 副本在 Broker1 上,那么该分区剩下的副本将会被依次存储于 Broker2、Broker3 中,以此类推;
- 尽可能的让 Broker 的副本数平均: 尽量保证每个 Broker 上有5个分区副本(10 * 3 / 6)。
确定好分区存储的 Broker 节点后,需要在节点上进一步确认分区数据存储的路径。
Kakfa 单独为每个分区配置目录,分配方式很简单:每次统计所有目录中的分区数量,总是选择分区最少的目录使用。
如果你新配置了一块硬盘作为数据目录(log.dir
),你会发现新创建的分区总是会被分配到新磁盘目录中。
Kafka 存储数据时使用的格式和生产者发送过来的格式一致,同时也和发送给消费者的数据格式一致,所以 Kafka 能够使用零复制技术直接从从文件缓冲区直接读取并发送。
存储格式
传输过程中的数据格式包含 偏移量、时间戳、键大小和键值、值大小和值 等元数据信息。
在介绍生产者的时候我们知道,生产者发送数据时是一批批发送的,如果生产者开启了压缩(强烈推荐)那么每一批的数据都会被打成一个包,存储和发送给消费者的时候也是以压缩包单位,可以大大提升网络传输的性能和磁盘存储的开销。
服务端使用 数据片段 存储数据,默认1G或者一周就滚动开启一个新的数据片段,同一时刻只有一个活跃的数据片段在被写入。
在数据写入时,Kafka 同时 为每个分区维护了一个索引,索引中 映射着偏移量到数据片段和片段中位置 的信息。
客户端在消费时会告诉服务端:我需要这个 Topic 中这个 Partition 的数据,偏移量是xxx到xxx。
根据这个信息,服务端可以很快的确认分区存储的路径,并通过分区索引快速定位到偏移量所在的数据片段和所处位置,读取并返回。
数据过期与清理
Kafka 并不会像数据库一样永久的存储数据,它总是需要通过 数据保留期限或者数据保留大小 的维度来限制各个 Topic 的数据存储。
另外一方面,Kafka 也允许开发人员对队列中的数据进行清理,可以开启 log.cleaner.enable
配置,相同键保留最新值。
4.3 可靠性保证
Kafka 的设计足够灵活,开发人员可以根据自身的应用场景来权衡并设置和使用不同程度的可靠性保证。
这种权衡一般是在 数据存储的可靠性和数据一致性 两者的重要程度,和 可用性、高吞吐、低延迟、硬件成本 的重要程序之间的权衡。
服务端的可靠性配置
Kafka 在数据存储上的可靠性通过 分区副本 机制来保证,前面我们讨论过,只要分区还有任一一个副本存在那么数据就不会丢失(前提是已同步的副本)。
在服务端可以通过下面三个配置来影响数据的可靠性:
-
replication.factor
: 副本数配置,副本数越多,可靠性越高、故障率越低,同时 使用的存储成本也也高、性能越差 -
min.insync.replicas
: 最少同步副本数配置,同步的副本数要求越多生产者的效率可能就越低,但是 整体的数据可靠性就越高。 -
unclean.leader.election
: 不完全选举(默认为true),开配置开启后 允许不同步的副本成为 Leader,这将导致 部分数据丢失 的后果,但是 提升了系统整体可用性。
除了 unclean.leader.election
是系统全局生效的之外,其余两个配置均可以在 Topic 级别单独设置。
且在一些如银行信用卡消费等 严格要求数据一致性 的场景中,unclean.leader.election
是需要被关闭的,因为银行宁可等几小时后系统恢复也 不会冒险处理错误的数据。
现在,通过配置调整 replication.factor=3 && unclean.leader.election=false
的设置,我们的 Kafka 服务端看起来已经十分可靠了。
但是如果在客户端没有正确地设置和处理服务端的响应,仍然可能会出现诸如生产者数据丢失的情况。
所以在客户端,开发人员仍然需要严谨地设置配置和处理服务端的响应。
生产者的可靠性配置
acks
是生产者可靠性配置的首选项,设置为 all是最保险的配置,但是会 降低生产者的吞吐。
即使设置了 acks=all
仍然可能出现 Leader 收到数据后马上崩溃导致数据丢失 的情况。
所以生产者需要有 足够多的重试次数 保证在可重试的错误下(可能几秒后就能够恢复)可以重新连接上,但是 太多的重试次数仍然会导致生产者效率低下。
所以 自动重试处理次数 是生产者可靠性配置的第二个选项。
除了生产者可以自动重试处理的错误外,如果服务端返回一个 不可重试的错误,比如认证错误、序列化错误、重试次数达到上限等问题时,还需要开发人员手动处理错误,比如将错误数据保存到外部存储等操作。
手动处理异常错误 是生产者的最后一道防线,可以最低限度的保证服务端任何问题出现时数据都可以被处理。
消费者的可靠性配置
由于消费者只会读取 同步的数据,所以在消费者的角度,服务端已经保证了 数据一致性与顺序性。
那么消费者只需要考虑哪些消息已经读过、哪些消息还没有读就可以了。
所以对于消费者来说,如何使用与提交偏移量是消费者不丢数据的关键。
所以之前我们在讨论消费者时提到的3个配置是保证消费者行为的可靠性非常重要的配置参数:
- auto.offset.reset: 没有偏移量的情况下如何取数据
- enable.auto.commit: 是否自动提交偏移量
- auto.commit.interval.ms: 自动提交偏移量的时间间隔
五、应用实践
5.1 软硬件环境
安装部署 Kafka 集群时首先需要对软硬件环境做出选择与配置。
硬件
在硬件方面 磁盘、内存、网络和CPU 是影响 Kafka 集群性能的主要因素。
磁盘
生产者直接受磁盘吞吐影响,服务端通过独立的IO线程进行数据的写入。
所有生产者必须等待一个服务器确认消息成功提交才结束,如果服务端IO磁盘吞吐跟不上,意味着生产者的并发不会很高。
另外一方面,磁盘的大小又决定了 Kafka 集群的数据存储量,如何确定集群机器所需要的磁盘容量可以参考以下评估公式:
(每天收到的总数据量 * 消息最多保留的天数) * 复制因子 + 消息峰值的缓冲区10% + 磁盘预留10%
内存
受内存影响较大的是消费者,消费者在读取消息时会直接通过本地文件缓存发送,服务端读取的消息将会使用 页面缓存(jvm内存可以不用太多)。
因此不建议将 Kafka Broker 与其他重要程序部署在一起,否则将会 共享页面缓存互相影响性能。
网络
网络的吞吐量决定着 Kafka 集群处理和消费数据的上限,建议有条件配备万兆网卡。
CPU
CPU对于生产者、消费者和服务端来说都有一定影响,生产者需要压缩数据、服务端需要解压缩并设置偏移量后重新压缩、最后消费者受到数据后再解压缩。
对于CPU来说视业务数据量而定,越多的CPU核心数性能肯定越好,但是也要注意不要造成浪费。
软件
软件方面,Kafka 对 Zookeeper 集群的依赖是十分重要的,因此建议 Zookeeper 集群:
- 使用独立的物理节点
- 使用5节点集群
- 容错性高(比3节点多1)
- 性能较好(比7节点)
5.2 服务端优化
Kafka 集群安装部署完毕后,可以根据情况对以下配置进行调优:
- num.partitions: 默认分区数配置
- log.retention.ms: 数据最多保留的时间,根据 业务与数据情况、可用磁盘容量 设置数据最多保留的时间期限
- log.segment.bytes: 数据片段的滚动大小,太小将会频繁写文件,太大将会影响日志过期
- log.segment.ms: 日志片段的过期时间
- message.max.bytes: 单条消息最大的大小,默认1M
我们可以根据整体的数据量希望达到的总吞吐量来评估:总吞吐量/生产消费者速率=分区个数
5.3 生产者优化
代码优化
在使用生产者客户端时,建议 bootstrap.servers
建议设置至少2个,可以在出错的时候连到另外一个 Broker。
同时,ack
可以根据业务情况做出调整:
- 0:不关心后续的数据存储
- 1:Leader 收到消息就即可返回
- all:所有参与副本、分区的brober收到才ok
某些业务场景中 ack=0
的设置可以得到极高的并发性能。
配置优化
- buffer.memory: 生产者缓冲区大小
- batch.size: 一个批次的数据字节大小,太大占用缓冲区多,但是不会延迟(有其他触发机制)而 太小会频繁提交消息效率低下
- linger.ms: 发送批次前等待的时间,时间到会触发批次数据的提交,太大延迟高但吞吐高,太小延迟低但频率高开销大
- reties: 提交失败重试次数
- max.in.flight.requests.per.connection: 生产者受到服务器响应之前还可以发送多少消息,设置为1可以保证消息是顺序写入的(即使失败重试),太大内存使用高但是吞吐高,不会因为服务端而阻塞
- compression.type: snappy/gzip/lz4可选,推荐 少CPU,高性价比的snappy
5.4 消费者优化
代码优化
在读请求中,消费者可以设置 poll
方法获取数据的 上限和下限:
- 上限:设置一次最多可以拿多少条数据,避免客户端内存不够
- 下限:设置至少有多少数据时才返回给客户端,避免几ms就发一次请求,每次只获取少量数据的情况,减少请求量提升效率。
在偏移量提交中,同步与异步配合使用:正常流程中使用异步确保低延迟,异常或者退出流程中使用同步确保偏移量提交。
//同步提交、失败重试、安全、延迟高
consumer.commitSync
//异步提交、无失败重试、不安全、延迟低
consumer.commitAsync
但是自动提交偏移量时仍然可能会出现一些问题,例如记录写入db到偏移量提交到kafka之前仍然可能出现错误。
提交自定义偏移量 是最安全的偏移量提交方式:记录和偏移量一起写入db中,这个动作保持事务,并通过 分区再平衡监听器 处理消费者变动影响。
ConsumerRebalanceListener
有两个方法可以在此方案中使用:
- onPartitionsRevoked:消费者停止后,rebalance开始前调用,将记录和最新偏移量一起写入db中,防止偏移量丢失
- onPartitionsAssigned:rebalance开始后,消费者开始读数据前调用,从db中读取偏移量,调用seek从指定的偏移量开始消费
配置优化
- fetch.min.bytes: 消费者从 Broker 最小拉取的字节数,调大可以降低负载、提高吞吐
- fetch.max.wait.ms: 消费者从 Broker 拉取数据时最大的等待单位,太大会提高延迟,拉取数据时要么满足 字节数 要么满足 等待时间。
- session.timeout.ms: 消费者端发送心跳的超时时间(默认3s),调高可以 避免gc等情况下触发的rebalance,容错时间长,太低可能导致容错时间短,可能会触发不必要的rebalance
- auto.offset.reset: 读取没有偏移量的分区时如何处理,lastest:从最新记录开始(默认)、earliest:从头开始
- partition.assignment.strategy: RangeAssignor、RoundRobinAssignor
5.5 Kafka队列规范
Topic规范
Topic默认8个分区,相同属性的业务数据推送至同一Topic中,Topic命名规范如下:
- Topic_业务名/业务表名
- 英文字母统一小写
- 短横杠"-"以下划线代替 " _ "
推送数据格式
推送的数据以标准json形式组织,必带的字段信息如下:
- btype:业务类型标识
- recordgid:数据记录唯一标识
- gtime:数据获取的业务时间
- utime:数据推送时间
- data:业务数据json串
其中data中的业务数据json串需满足以下要求:
- 字段格式命名以下划线分割
- 字段名统一小写
- 必须为utf-8格式
- 每条数据只写在一行中
不满足要求的数据可能会在后续的数据清洗中作为脏数据处理掉。
推送数据格式如下:
| 字段名 | 注释 | 字段类型 | 是否必须 | 备注 |
| -- | -- | -- | -- | -- |
| btype | 业务类型标识 | string | y | |
| recordgid | 数据记录唯一标识 | string | y | 32位uuid |
| gtime | 数据获取的业务时间 | string | y |13位时间戳 |
| utime | 数据推送时间 | string | y |13位时间戳 |
| data | 业务数据json串 | string | y | 视具体业务而定 |