一、主要作用
- 异步处理:提升响应速度
- 系统解耦:降低系统各模块之间的耦合
- 流量削峰:高并发下 作为缓冲,减轻系统压力
- 数据分发:广播到多个消费者, 实现数据的分发和共享。
二、特点
- 分布式:集群方式、多副本,高可靠性 高扩展性。
- 高吞吐,低延迟,百万级tps。
- 支持实时的流式处理。
- 持久化日志:可重复读取 和 无限期保留。
三、Broker
-
定义:Kafka集群中的服务器节点,负责存储和转发消息。
-
功能:
- 存储消息:Broker负责存储Topic中的消息,并保证消息的有序性和可靠性。
- 转发消息:Broker负责将生产者发送的消息转发给消费者,并保证消息的可靠性和顺序性。
- 管理元数据:Broker维护Topic和Partition的元数据信息,供生产者和消费者使用。
- 提供API接口:Broker提供了API接口,供生产者和消费者使用。
四、Topic
- 定义:消息的逻辑分组,是Kafka消息系统中的基本单元,用于将消息进行逻辑上的分组和归类。
- 作用:通过创建不同的Topic,可以根据需求将消息进行分类、分割和分发,以满足不同业务场景下的消息传递需求。
- 特点:每个Topic可以有一个或多个分区(Partition),每个分区可以保存不同时间段的消息序列。
五、Partition
-
定义:分区,是Topic的物理分割,是Kafka中存储消息的最小单位。每个分区都有一个对应的日志文件,用于顺序写入和追加消息。
-
作用:
- 提高吞吐量:通过分区,Kafka可以分散数据和负载,支持并行处理,从而提高整体吞吐量。
- 有序存储:在同一个分区内,消息是按照写入的顺序存储的,消费者可以保证读取到的消息是有序的。
- 负载均衡:通过合理分配分区,可以实现负载均衡,避免单点瓶颈。
-
特点:
- 不可变性:一旦消息写入分区,它就是不可变的,不能被修改或删除,直到超过保留时间或被压缩。
- 分段存储:每个分区在底层是一个日志文件,由多个段(Segment)组成,每个段是一个物理文件,存储一批有序的消息。
- 索引文件:每个段都有一个对应的索引文件,用于快速查找消息的偏移量(Offset)。
六、副本(Replication)
-
定义:Partition的副本机制,也可以称之为备份机制,通过多副本保证数据的高可用性和容错性。Kafka将每个分区的消息复制到多个副本中,这些副本分布在不同的Broker上。这样即使某个Broker出现故障,其他Broker上的副本仍然可以保留和恢复消息。
-
角色:
- Leader:每个分区有一个主副本(Leader),负责处理所有的读写请求。
- Follower:其余副本是从副本(Follower),负责从主副本复制数据。称之为ISR副本集合。当Leader副本出现故障时,可以从ISR中选举出一个新的Leader副本。
- Unclean领导者选举:当ISR集合为空时,Kafka需要重新选举一个新的领导者副本。此时,Kafka可以选择从非同步副本中选举领导者副本,这称为Unclean领导者选举。开启Unclean领导者选举可能会造成数据丢失,但提升了系统的高可用性;而禁止Unclean领导者选举则维护了数据的一致性。
-
作用:
- 数据冗余:提供数据冗余,即使系统部分组件失效,系统依然能够继续运转,增加了整体可用性以及数据持久性。
- 故障恢复:当主副本不可用时,从副本可以自动提升为主副本,确保系统的连续性。
-
同步原理:Kafka采用基于领导者(Leader)的副本机制。从副本(Follower)不对外提供服务,只是定期地异步拉取 领导者副本(Leader)中的数据,从而实现与领导者副本的同步。当领导者副本挂掉时,Kafka会依托于ZooKeeper提供的监控功能实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
七、ISR机制
指分区中的一组与Leader副本保持同步的副本集合; 这些副本包含了Leader副本中的所有已确认消息。当Leader副本出现故障时,可以从ISR中选举出一个新的Leader副本。只有ISR中的副本才能参与消息的同步复制和数据的读写操作,从而确保数据的一致性和可靠性。
八、Controller机制
Kafka使用Controller机制来管理Broker、Partition、Leader等元数据信息。Controller是一个独立的组件,负责监控和管理整个集群的状态和配置。当出现异常情况时,Controller可以自动进行故障转移和数据恢复操作,确保集群的稳定性和高可用性。
快速故障恢复的机制。当某个Broker出现故障时,可以通过ISR机制和Controller机制快速进行主副本的选举和数据的恢复,从而确保集群的高可用性和数据的可靠性。
九、ACK应答机制
是生产者发送数据后Kafka的接收确认方式。
在生产者发送消息到Kafka集群时,可以设置不同的ACK级别来确保消息在发送过程中的可靠性和持久性。
Kafka的ACK机制有三种级别,由acks参数的值控制:
- acks=0:无需等待确认,可靠性最低
- acks=1:默认值。生产者发送消息后会等待Broker的领导者(Leader)确认, 会确认消息已经被领导者接收写入,但不一定被复制到所有的副本。
- acks=-1(all):等待主题分区中所有ISR 副本都成功写入消息后才返回确认。acks=all时,Kafka只有在消息被写入ISR中的所有副本后才会返回确认,这确保了消息即使在Leader故障的情况下也不会丢失,因为ISR中的其他副本可以选举为新的Leader。
日志持久化:
- Kafka将消息持久化到磁盘上,即使Broker宕机或重启,消息也不会丢失。Kafka使用顺序写入和分段存储的方式来提高性能,同时保证数据的可靠性。每个主题都有一个或多个分区,每个分区都有一个对应的日志文件,用于顺序写入和追加消息。
持久性保证:
- Kafka提供了多种消息传递语义,包括至少一次传递和最多一次传递。这些语义可以确保消息在发送和接收之间的可靠传递,并保证消息不会丢失或重复。
十、Kafka中Zookeeper的作用
-
选举leader:
- 在Kafka集群中,每个分区都会有一个leader来负责处理读写请求。
- 当leader节点出现故障时,Zookeeper会利用选举算法协助进行新的leader选举,确保集群能够继续正常运行,并处理来自生产者和消费者的请求。
-
管理消费者组的偏移量:
- 消费者组在消费消息时,需要记录当前已经消费的消息偏移量。
- Zookeeper用来保存这些偏移量信息,以便在消费者重启或发生故障时,能够继续从上次消费的位置开始,从而保证消息消费的连续性和准确性。
-
发现和注册服务:
- Kafka的broker节点和消费者在启动时会向Zookeeper注册自己的信息,包括IP地址、端口号和可用的分区等。
- 其他节点可以通过查询Zookeeper来发现可用的broker节点和分区,从而方便地进行消息的生产和消费。
-
维护集群的元数据:
- Zookeeper保存了Kafka集群的元数据信息,这些信息包括但不限于broker的列表、topic的分区信息、以及消费者组的偏移量等。
- Kafka通过与Zookeeper的交互来获取这些元数据,以确保集群内部各组件之间的协调一致。
-
监控集群的健康状态:
- Zookeeper负责监控Kafka集群中各个节点的状态,包括节点的上下线情况、存活状态以及partition的副本分配情况等。
- 通过提供心跳检测和故障检测功能,Zookeeper能够及时发现并通知集群中可能存在的问题,从而保障集群的稳定运行。
十一、如何保证消息有序?
1、生产者如何有序
(1)序列号:为每条消息分配一个唯一的序列号,标记数据的顺序。消息排序,重新发送。消费者 按序列号来消费消息。
(2)指定分区发送。
2、存储有序
一个分区内有序,多个分区无法保证。获取数据后会将数据顺序写入日志文件。
3、消费着如何有序
(1)一个消费者 单线程单实例 消费。同一个Topic、同一个分区,一个Partition分区只会被一个消费者组中的一个消费者消费,从而保证顺序性。
(2)按序号消费。
生产者如何将消息 发布到指定分区?
kafka默认分区策略:轮询 选出一个partition分区
(1)自定义分区策略:实现Partitioner接口,重写partition( )方法
(2)每条消息设置同一个key:计算key的hash值路由到指定分区,同一个key的消息都在同一个分区
消费者如何消费同一个分区消息?
(1)消费者消费方法上使用 @KafkaListener的topicPartitions属性 和 @TopicPartition @PartitionOffsets 注解指定topic、partition、offset。
十二、如何保证消息不丢失?
- 生产者(Producer):通过
同步阻塞
的方式的发送,设置acks=all,等待所有ISR 副本都成功写入消息后才返回确认;超时或者失败 会重试
- Broker:支持同步刷盘(
持久化到硬盘中
)、异步刷盘的策略,支持 1主N从的策略。持久化的 offset (偏移量),消息 支持持久化到Commitlog里面。 - 消费者(Consumer):自动提交偏移量,改为手动提交,确保业务逻辑
真正执行成功
再提交偏移量。
- 自动提交: 不是立即提交,默认每5秒 自动提交消费偏移量。可能导致在处理消息时发生错误时无法回滚偏移量,造成消息的重复消费或丢失。
- 手动提交: 需要显式地调用提交方法来提交消费偏移量。 可在 成功处理完消息后 提交偏移量, 确保消息至少被处理一次。
- 同步提交(commitSync): 消费者在提交偏移量时会阻塞等待Kafka的确认。会导致消费者在提交偏移量时发生阻塞,可能会影响消费者的吞吐量。
-
异步提交(commitAsync): 不会阻塞等待Kafka的确认,会立即返回,消费者可以继续处理其他任务。 如果提交失败,可能不会立即被发现,可能导致消息的重复消费或丢失。但 Kafka允许为异步提交 提供
回调函数
(OffsetCommitCallback),以便在提交成功或失败时执行相应的操作。
通信流程:拉消息,拉取消息的过程通常是通过一个循环来实现的,在循环中,消费者不断调用poll
方法来拉取新的消息记录 。poll
方法的参数指定了拉取消息的超时时间 。 拉取到的消息记录包含多个消息,每个消息都包含了消息的元数据(如主题、分区、偏移量等)和实际的消息内容。
指定消息在分区中的偏移量offset,消费特定分区的消息,并且可向后回滚重复消费。
十三、消费端收到两条一样的消息,应该怎样处理?
(1)幂等性:消费端 处理消息 保持幂等性,不管来多少条重复消息,最后处理的结果都一样
(2)记录消息ID和消费状态:保证每条消息都有唯一ID编号, 消费者端 维护张日志表 记录已经处理成功的消息的ID
十四、消息堆积怎么办?
(1)生产者:限流,设置合理的发送速率、使用滑动窗口控制速率
(2)消息队列:增加队列的分区数;设置消息过期时间。 可以将堆积的消息进行持久化存储,以便后续处理。
(3)消费端:服务器扩容;建Consumer消费者集群,增加消费者数量,批量消费 调整批量大小和拉取间隔,异步消费、异步提交;优化消费者性能(优化数据库查询、减少网络调用次数、使用多线程或异步处理)。
十五、消费失败怎么办?
(1)重试:设置合理的重试次数和重试间隔。
(2)记录:将消费失败的消息存储到数据库。消费者可以记录错误信息并跳过有问题的消息。
(3)死信队列:对于无法处理的消息,应将其移动到死信队列Topic
中,并定期分析和处理这些消息。这有助于避免消息堆积对正常业务的影响。
十六、Kafka性能优化
通过压缩和批量发送优化。
Kafka支持Gzip、Snappy、LZ4、zstd等数据压缩算法。
zstd有最高的压缩比(zstd > LZ4 > gzip > Snappy),LZ4效率最高(LZ4 > Snappy > zstd > gzip)
十七、Kafka常见配置参数
kafka:
# Broker的全局设置
global:
broker.id: 0 # Kafka broker的唯一标识符,每个broker的id都应该是唯一的
log.dirs: /var/lib/kafka/data # Kafka存储消息的目录
zookeeper.connect: localhost:2181 # Kafka连接到Zookeeper的地址
# 网络设置
network:
listeners: PLAINTEXT://:9092 # Kafka监听的地址和端口,用于客户端连接
advertised.listeners: PLAINTEXT://your.host.name:9092 # 客户端连接broker的地址,通常是在集群外部访问的地址
num.network.threads: 3 # 用于处理网络请求的线程数
num.io.threads: 8 # 用于处理磁盘IO的线程数
socket.send.buffer.bytes: 102400 # Socket发送缓冲区大小
socket.receive.buffer.bytes: 102400 # Socket接收缓冲区大小
socket.request.max.bytes: 104857600 # 请求的最大字节数
# 日志设置
log:
log.retention.hours: 168 # 日志保留的小时数
log.segment.bytes: 1073741824 # 每个日志段的大小(以字节为单位)
log.retention.check.interval.ms: 300000 # 检查日志是否过期的间隔时间(以毫秒为单位)
log.cleaner.enable: true # 是否启用日志清理器(用于删除过期的日志)
# 复制设置
replication:
default.replication.factor: 1 # 默认的消息复制因子
min.insync.replicas: 1 # 在提交消息之前,必须有多少副本是同步的
replica.lag.time.max.ms: 10000 # 副本落后时间的最大阈值(以毫秒为单位)
replica.socket.timeout.ms: 30000 # 副本之间连接的超时时间(以毫秒为单位)
replica.fetch.max.bytes: 1048576 # 副本从leader拉取数据的最大字节数
# 消费者和生产者设置(这些通常在客户端配置,但可以在broker上设置默认值)
consumer_producer:
message.max.bytes: 1000000 # 消息的最大字节数(包括键和值)
replica.fetch.min.bytes: 1 # 副本从leader拉取的最小字节数
request.timeout.ms: 30000 # 请求的超时时间(以毫秒为单位)
group.min.session.timeout.ms: 6000 # 消费者组的最小会话超时时间(以毫秒为单位)
auto.commit.interval.ms: 5000 # 自动提交偏移量的间隔时间(以毫秒为单位)
# 其他设置
other:
num.partitions: 1 # 默认的主题分区数
controlled.shutdown.enable: true # 是否启用受控关闭
delete.topic.enable: true # 是否允许删除主题
# 消费者配置
fetch.min.bytes=1 消费者拉取数据的最小批次大小
fetch.max.bytes=5242880 # 5MB 消费者每次拉取数据的最大量
max.poll.records=100 消费者每次拉取的最大记录数(即消息数)
fetch.max.wait.ms=500 消费者在拉取数据时,如果数据不足fetch.min.bytes所指定的量,则消费者会等待的最长时间(以毫秒为单位)。
min.insync.replicas 在提交消息之前,必须有多少副本是同步的。
replica.*参数 与副本相关的配置,如副本落后时间的最大阈值、副本之间连接的超时时间等。
message.max.bytes 消息的最大字节数。
request.timeout.ms 请求的超时时间。
group.min.session.timeout.ms 消费者组的最小会话超时时间。
auto.commit.interval.ms 自动提交偏移量的间隔时间。
num.partitions 默认的主题分区数。
controlled.shutdown.enable 是否启用受控关闭。
delete.topic.enable 是否允许删除主题。
auto.leader.rebalance.enable 是否启用自动领导者选举平衡。
leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.percentage 领导者不平衡检查和阈值设置。
# 性能和调优设置
queued.max.requests=500 等待处理的请求队列的最大大小。
compression.type=producer 生产者消息的压缩类型。
metric.reporters=[] 用于报告指标的类列表。
num.replica.fetchers=1 每个broker用于从其他broker拉取数据的线程数。