Kafka
笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》
内容还会不断充实~
概述
定义
传统定义:
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
最新定义:
Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
消息队列
应用场景
- 异步处理
- 削峰
优势
-
解耦
允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束
-
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
-
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况
-
灵活性&峰值处理能力
- 灵活性:动态上下线,在流量高峰时可以动态增加服务器
- 峰值处理:削峰,延迟处理
使系统能够应对突发的高峰流量
-
异步通信
允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理
模式
-
点对点模式
一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。
-
发布/订阅模式
-
一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费
-
发布/订阅模式中,又分为两种:
-
消费者主动拉取消息
Kafka就是属于这种类型
优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息
弊端:需要轮询,查看队列中是否有消息,浪费资源
-
队列推送消息
类似于公众号推送
弊端:
推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)
-
-
Kafka基础架构
架构图
zk在这里的作用:
- 存储kafka集群信息
- 存储消费者消费到的位置信息
- 即:消费到第几条了
- 消费者本地内存也会存储该条数信息,平时就是读取并维护本地的信息;但当机器挂掉重启后,会先去zk获取该信息,然后再在本地内存继续维护)
- 0.9之后将位置信息存储到kafka里一个系统创建的topic中
- 为何改存到kafka?
- 因为消费者本身就需要维护与kafka的连接,去获取消息,如果将位置信息放在zk,则还需要请求zk获取信息,速度不如kafka(注:Kafka消息存到磁盘,默认存七天)
名词解释
- Broker:可以理解为起了Kafka进程的服务器
- Topic:主题,可以理解为一个队列,用来将消息分类,便于发送和消费
- Partition:分区,用来提高Kafka集群的并发处理能力,每一个partition是一个有序的队列(个人理解Partition就是对Topic的又一次细分,分布式的多个Broker是为了避免单个机器的性能造成的阻塞,多个partition是为了避免同一内存区域的IO阻塞)
- Replication:副本。为保证集群中某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
- Leader:每个分区多个副本的”主“,生产者发送数据的对象,消费者消费数据的对象都是leader
- Follower:
- Leader的副本,每个分区多个副本的”从“,实时从leader同步数据,保持和leader数据的同步。用来备份数据(不直接对生产者和消费者提供读写),避免Leader所在机器挂掉后数据丢失。(所以,同一topic同一partition的Follower和Leader一定不在同一台机器)
- leader发生故障时,某个Follower会成为新的leader
- Consumer Group:消费者组,可以理解为一个大的消费者,目的是提高消费能力
- 和其他普通消费者一样,需要订阅一个topic
- 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
- 一个消费者组里面的不同消费者,只能消费kafka中这个topic的不同partition的数据。(个人理解:建立partition就是为了提高消息的读写速度,对于同一个topic,消息的写入根据partition区分开了,消费者消费的时候如果不分开,会降低消息消费速度且造成重复消费,那partition的意义就不大了)
基本操作
安装
命令行操作
topic
查看该机器上所有topic:
kafka-topics.sh --list --zookeeper ip:zk端口
创建topic:
kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数
#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower
删除topic:
kafka-topics.sh --delete --topic first --zookeeper ip:zk端口
#注:执行效果:
#Topic first is marked for deletion. 标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true. 只有当delete.topic.enable设为true时才会真正删除
查看topic详情:
kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称
消息
生产者发送消息
kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口
消费者消费消息
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费
架构深入
工作流程
说明:
- 偏移量不是全局唯一的,是分区唯一的
- kafka只保证消息的分区内有序,不保证全局有序
- 有序是指消费消息的顺序和生产消息的顺序一致
- kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
-
topic是逻辑上的概念,而partition是物理上的概念
- 每个partition对应一个文件夹(文件夹名 = topic - partition),文件夹内有.log文件,该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。
- 消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
生产者
消息发送流程
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord
- ProducerBatch是一个消息批次,包含ProducerRecord,使字节使用更加紧凑
- 将较小的ProducerRecord拼凑成一个较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量
如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。
在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。
ProducerBatch的大小和batch.size的关系:
当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。
如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;
如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。
我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据
元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
元数据的更新(二者满足其一即可触发更新):
- 当客户端中没有要使用的元数据时
- 超过metedata.max.age.ms时间没有更新元数据(默认5分钟)
当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。
重要参数
参数名称 | 说明 |
---|---|
bootstrap.servers | 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 |
buffer.memory | RecordAccumulator 缓冲区总大小, 默认 32m。 |
batch.size | 缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据, Leader 收到数据后应答。 -1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字 |
retries | 当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性, 默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。 |
生产者分区
分区好处
-
便于合理使用存储资源, 每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一
块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。 - 提高并行度, 生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据
分区策略
老版本
分区原则
发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性
- 指明partition的情况下,直接将指明的值作为partition值
- 没有指明partition值,但有key值的情况下,将key的hash值与当前topic的partition存活数进行取余得到partition值
- 既没有指明partition值又没有key值的情况下,采用轮询的方式选取分区。但谁来做第一个分区?kafka采用的机制是:在第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与当前topic可用的partition总数取余得到partition值。(由于每次调用都会在这个整数上自增,所以取余后的结果也是自增或等于初始值,也就达到了轮询每个partition的效果)。这就是round-robin算法
源码
//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取当前topic的partition数目
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//没有指定key
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
//当前topic存活的partition数
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//从存活的partition里选取一个partition返回
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
//选取一个partition返回
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//有key值,将key的hash值与当前topic的partition总数进行取余得到partition值
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
新版本
分区原则
源码
DefaultPartitioner:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param numPartitions The number of partitions of the given {@code topic}
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
//没有指定key,采用粘性分区策略
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
//指定了key,使用key的哈希值与【分区总数】进行求模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
StickyPartitionCache:
/**
* An internal class that implements a cache used for sticky partitioning behavior. The cache tracks the current sticky
* partition for any given topic. This class should not be used externally.
*/
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
//尽可能使用上一个分区(所以叫黏性分区策略)
Integer part = indexCache.get(topic);
if (part == null) {
//没办法了(该分区batch已满或已完成),找下一个分区
return nextPartition(topic, cluster, -1);
}
return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
//当前topic的分区数
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
//当前topic可用分区数
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
//没有可用分区,从所有分区里随机选一个算了
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
//就一个可用的,别无选择
newPart = availablePartitions.get(0).partition();
} else {
//有多个可用的,这就得挑三拣四一下了
while (newPart == null || newPart.equals(oldPart)) {
//新分区不能和上一个分区一样,若是一样,就继续选!就是这么倔
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
//现在有多个可用分区,当然是从可用分区里选
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
}
吞吐量提升
数据可靠性
概述
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。因此,kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡
ack应答级别
acks参数配置:
- 0: 不等待。producer不等待Broker的ack,这一操作提供了一个最低的延迟,Broker一接收到还没有写入磁盘就已经返回,当Broker故障时有可能丢失数据
- 1: 只等待leader。producer等待Broker的ack,leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
- -1(all): 等待leader和ISR中的follower。producer等待Broker的ack,leader和ISR中的follower全部落盘成功后才会返回ack。
- 极限情况下也会造成数据丢失:比如当所有follower同步消息都比较慢时,此时ISR中只有leader自己,这种情快下当leader落盘成功,就等于ISR中全部落盘成功了,便会返回ack,在follower同步完成之前,若leader挂掉,则会导致数据丢失
- 数据重复:follower同步完成之后,Broker发送ack之前,leader发生故障,producer没有收到ack便会重发消息,故造成数据重复
图示
ISR
- leader维护了一个动态的ISR(in-sync replica set),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack(不应该是follower给leader发送么)。如果follower长时间未向leader同步数据,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定(默认10秒)。leader发生故障后,就会从ISR中选举新leader
- 由于上述条件限制,所以ISR中的follower都是同步速度很快的follower,未来leader也会在ISR中选举,所以leader只要等ISR中的follower都同步完成,就可以向生产者发送ack
老版本中,follower进入ISR有两个条件:
- follower中消息条数与leader中的消息条数差值要小于设定的参数阈值(默认10000条)
- follower向leader同步数据的时间要小于设定的阈值
0.9版本移除了条数限制,原因:
生产者一般都是批量发送数据,假设条数阈值为10条,但生产者一次就发来了12条,这时leader中比所有follower都多12条数据,所有follower都会被移除ISR,但很快一些follower同步完成,又会把他们移入ISR,ISR存在内存中,这就会导致频繁操作内存。而且kafka会将ISR信息写入zookeeper,这也会导致kafka频繁请求zookeeper。
数据去重
数据传递语义
幂等性
原理
开启幂等性
开启参数 enable.idempotence ,默认为 true
生产者事务
原理
API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
数据有序
数据乱序
如果未开启幂等性,且max.in.flight.requests.per.connection大于1的话,可能会出现:其中某条消息发送失败,在重试时,该消息后面的消息发送成功,导致乱序。
单分区内有序条件:
Broker
Zookeeper中存储的Kafka信息
总体工作流程
Broker重要参数
参数名称 | 说明 |
---|---|
replica.lag.time.max.ms | ISR 中, 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值, 默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是 10%。 每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。 |
log.index.interval.bytes | 默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间, 默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间, 分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间, 毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔, 默认是 5 分钟 |
log.retention.bytes | 默认等于-1,表示无穷大。 超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略 |
num.io.threads | 默认是 8。 负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。 数据传输线程数,这个参数占总核数的50%的 2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理 |
副本
基本信息
- 作用:提高数据可靠性
- Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率
- Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader同步数据
- Kafka分区中的所有副本统称AR(Assigned Replicas)
- AR = ISR + OSR
- ISR,表示和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。 Leader 发生故障之后,就会从 ISR 中选举新的 Leader
- OSR, 表示 Follower 与 Leader 副本同步时,延迟过多的副本
Leader选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
故障处理细节
Follower故障
Leader故障
注:
这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。数据不丢失或不重复还得看ack、幂等性和事务
Leader Partition自动平衡
文件存储
文件存储机制
Topic数据存储机制
.log文件和.index文件
注:index文件中存储的是相对offset,绝对offset那一列只是为了方便看图加上的
.index文件中存储:
- 消息的偏移量
- 对应消息的存储地址偏移量
- 对应消息的大小
.log文件中存储:
- 消息内容
注:.indx文件中存的内容是固定的,就是存消息偏移量、存储地址偏移量、消息大小等信息,所以在根据消息偏移量找对应消息时,可以直接采用 消息偏移量 * .index中单个内容的大小,快速找到要读取的消息信息地址,加快查询速度
参数名称 | 说明 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。 |
log.index.interval.bytes | 默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引 |
消息读取机制
- 根据要读取的消息偏移量,采用二分查找,找到对应对的.index文件
- 在改.index文件中,根据消息偏移量,找到要读取的消息的存储地址偏移量和该消息大小
- 根据2中结果,去.log文件中直接读取 地址偏移量 到 地址偏移量 + 消息大小 这一段的数据
文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:
- log.retention.hours, 最低优先级小时,默认 7 天。
- log.retention.minutes, 分钟。
- log.retention.ms, 最高优先级毫秒。
- log.retention.check.interval.ms, 负责设置检查周期,默认 5 分钟
日志超时后,Kafka提供的清理策略:
- delete 日志删除:将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
- 基于时间:默认打开。 以 segment 中所有记录中的最大时间戳作为该文件时间戳
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment(log.retention.bytes,默认等于-1,表示无穷大)
- log.cleanup.policy = delete 所有数据启用删除策略
- compact 日志压缩
高效读写数据
Kafka能够高效读写的原因
-
Kafka本身是分布式集群,可以采用分区技术,并行度高
-
读数据采用稀疏索引,可以快速定位要消费的数据
-
顺序写磁盘
- Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,
为顺序写。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这
与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间
- Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,
-
页缓存 + 零拷贝技术
-
相关参数
-
参数名称 说明 log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。 一般不建议修改,交给系统自己管理。 log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。
-
消费者
消费方式
Consumer采用pull(拉)的方式从Broker中读取数据
kafka中没有数据时,pull模式可能会使消费者陷入空转。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时间即为timeout
消费者总体工作流程
订阅主题与分区
一个消费者可以订阅一个或多个主题
订阅
-
subscribe
-
定义
-
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern)
-
-
对于该方法,可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题
-
如果前后两次订阅了不同的主题,那么消费者以最后一次的为准
-
如果采用正则表达式的方式订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息
-
-
assign
-
定义
-
public void assign(Collection<TopicPartition> partitions)
-
-
可直接订阅某些主题的特定分区
-
只有一个参数,用来指定订阅的分区集合
-
TopicPartition类
-
public class TopicPartition { private final int partition; private final String topic; //构造器、hashCode... }
-
-
注:
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费者组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。assign()方法订阅时无该功能。
如何获取主题的分区信息?
KafkaConsumer中有方法:
public List<PartitionInfo> partitionsFor(String topic)
PartitionInfo:
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
// AR集合
private final Node[] replicas;
//ISR集合
private final Node[] inSyncReplicas;
//OSR集合
private final Node[] offlineReplicas;
}
取消订阅
public void unsubscribe()
该方法可以取消通过subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过assign(Collection)方式实现的订阅。
注:如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法
订阅状态:
- AUTO_TOPICS
- subscribe(Collection)方式订阅时即为该状态
- AUTO_PATTERN
- subscribe(Pattern)方式订阅时即为该状态
- USER_ASSIGNED
- assign(Collection)方式订阅时即为该状态
- NONE
- 没有订阅
这些状态是互斥的,一个消费者只能出现其中一种,否则会报出IllegalStateException
消息消费
public ConsumerRecords<K, V> poll(final Duration timeout)
timeout方法用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞
timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE
poll()方法返回值类型是ConsumerRecords,表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了iterator()方法来遍历消息集内部的消息:
public Iterator<ConsumerRecord<K, V>> iterator()
消费者组
原理
初始化流程
消费流程
重要参数
参数名称 | 说明 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名 |
group.id | 标记消费者所属的消费者组 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率, 默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest: 默认, 自动重置偏移量为最新的偏移量。 none:如果消费组原来的( previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数, 默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间, 默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间, 默认 45s。超过该值,该消费者被移除,消费者组执行再平衡 |
max.poll.interval.ms | 消费者处理消息的最大时长, 默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据 |
fetch.max.bytes | 默认 Default: 52428800( 50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig) or max.message.bytes (topic config) 影响 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数, 默认是 500 条 |
分区分配策略
一个Consumer Group中有多个Consumer,一个Topic有多个Partition,所以必然会涉及到Partition的分配问题,即确定哪个Partition由哪个Consumer来消费
kafka分配策略:
- Range
- RoundRobin
- Sticky
- CooperativeSticky
可通过partition.assignment.strategy参数来设置分区策略,默认是Range + CooperativeSticky.(Kafka可同时使用多个分区策略)
当消费者组中消费者个数发生变更时,就会触发重新分配。即使消费者数目增加到大于分区数,也会重新分配
Range分区策略原理
RoundRobin分区策略原理
Sticky分区
粘性分区定义: 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化
offset的维护
由于Consumer在消费过程中可能宕机,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了哪个offset,以便故障后能够继续消费。
存储位置
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。 key 是 group.id+topic+分区号, value 就是当前 offset 的值。 每隔一段时间, kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
提交
自动提交
手动提交
指定offset消费
参数auto.offset.reset = earliest | latest | none
默认是 latest
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
- earliest:自动将偏移量重置为最早的偏移量, --from-beginning。
- latest(默认值):自动将偏移量重置为最新偏移量。
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
kafkaConsumer.seek(TopicPartition, 指定消费的offset);
注意:每次执行完,需要修改消费者组名;
重复消费和漏消费
重复消费: 已经消费了数据,但是 offset 没提交。
漏消费: 先提交 offset 后消费,有可能会造成数据的漏消费。
消费者事务
提高消费者吞吐量
代码实操
基操
生产者
需要用到的类:
- KafkaProducer:需要创建一个生产者对象,用来发送数据
- ProducerConfig:获取所需的一系列配置参数
- ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
发送消息
/**
* 简单发送
*/
@Test
public void testProducer() {
KafkaProducer<String, String> producer = getKafkaProducer();
// 异步发送
producer.send(new ProducerRecord<>("first", "test", "hello kafka"));
//同步发送
producer.send(new ProducerRecord<>("first", "test", "hello kafka")).get();
//关闭资源
producer.close();
}
/**
* 发送后触发回调函数
*/
@Test
public void testProducerWithCallBack() {
KafkaProducer<String, String> producer = getKafkaProducer();
producer.send(new ProducerRecord<>("first", "test", "hello kafka"), new Callback() {
//回调方法,会在producer收到ack时触发
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (Objects.isNull(e)) {
//没有异常
System.out.println(recordMetadata.toString());
} else {
e.printStackTrace();
}
}
});
//关闭资源
producer.close();
}
private KafkaProducer<String, String> getKafkaProducer() {
//配置
Properties properties = new Properties();
properties.put("bootstrap.servers","shangxiaoying.cn:9092");
properties.put("acks", "all");
//配置的key可以在ProducerConfig中找到
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//这里指定要使用的分区策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
事务内发送消息
public class CustomProducerTransactions {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// key,value 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
}
// int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}
自定义分区策略
编写类实现Partitioner接口,重写partition()方法
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义选择分区的逻辑
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
在生产者配置中指定分区策略
private KafkaProducer<String, String> getKafkaProducer() {
//配置
...
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//这里指定要使用的分区策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
//发送消息
...
消费者
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 所以 offset 的维护是 Consumer 消费数据时必须考虑的问题。
需要用到的类:
- KafkaConsumer:需要创建一个消费者对象,用来消费数据
- ConsumerConfig:获取所需的一系列配置参数
- ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。 自动提交 offset 的相关参数:
- enable.auto.commit:是否开启自动提交 offset 功能
- auto.commit.interval.ms:自动提交 offset 的时间间隔
自动提交offset
@Test
public void testConsumer() {
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
//订阅topic,可以订阅多个
kafkaConsumer.subscribe(Collections.singletonList("first"));
while (true) {
//拉取消息,参数为没有消息时的等待时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
private KafkaConsumer<String, String> getKafkaConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
//开启自动提交(这里提交是指提交offset)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//这里是反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
return kafkaConsumer;
}
手动提交offset
虽然自动提交 offset 十分简洁便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。
因此 Kafka 还提供了手动提交 offset 的 API。 手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步 提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
@Test
public void testConsumer() {
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
//订阅topic,可以订阅多个
kafkaConsumer.subscribe(Collections.singletonList("first"));
while (true) {
//拉取消息,参数为没有消息时的等待时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
//手动提交,不要忘记关掉自动提交
//同步提交,当前线程会阻塞直到 offset 提交成功
// kafkaConsumer.commitSync();
//异步提交
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
//业务逻辑
}
});
}
}
private KafkaConsumer<String, String> getKafkaConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
//关闭自动提交(这里提交是指提交offset)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//这里是反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
return kafkaConsumer;
}
数据漏消费和重复消费:
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。
- 先提交 offset 后消费,有可能造成数据的漏消费;
- 提交offset后,数据还没处理完,消费者挂掉,重启后,本地offset丢失,获取kafka存储的offset,这个offset是挂掉前拉取的最新数据的kafka,kafka以为消费者都消费完了,其实并未真正处理完毕,再拉取消息会按照kafka存储的offset拉取,故造成喽消费。
- 而先消费后提交 offset,有可能会造成数据的重复消费
- 消费者拉取到数据在处理,处理了一部分数据后消费者挂掉,此时offset并未提交,消费者重启后,本地offset已经丢失,所以会读取kafka存储的offset,这个offset是上次拉取消息之前的,所以又会将挂掉前拉取的消息再拉取一遍,造成重复。
自定义存储offset
要解决上述问题,关键是要保证offset存储和数据处理的一致性。我们可以将offset存入MySQL,使得消息处理和offset存储在同一个事务中,从而保证一致性。
但是:
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。 当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
如何感知到是否发生Rebalance?
- Kafka有一个类:ConsumerRebalanceListener可以实现
思路:
- 在Rebalance之前,每个分区要提交自己最新的offset
- 在Rebalance之后,每个分区获取自己最新的offset,继续消费
实现:
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
/**
* 自定义存储offset
*/
@Test
public void testCustomOffset() {
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
//订阅topic,ConsumerRebalanceListener帮助我们实现自定义存储offset
kafkaConsumer.subscribe(Collections.singletonList("first"), new ConsumerRebalanceListener() {
//该方法会在 Rebalance 之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//Rebalance之前,将每个分区的最新offset提交,这里可以自定义提交至MySQL
commitOffset(currentOffset);
}
//该方法会在 Rebalance 之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
currentOffset.clear();
for (TopicPartition partition : collection) {
kafkaConsumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
}
}
});
while (true) {
//最佳实践:将数据处理和offset保存放在一个事务中,从而保证数据不会丢失或重复
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf(record.toString());
//维护currentOffset
currentOffset.put(new TopicPartition(record.topic(),
record.partition()), record.offset());
}
commitOffset(currentOffset);
}
}
//获取某分区的最新 offset
private static long getOffset(TopicPartition partition) {
//业务逻辑,比如去MySQL中查询最新offset
return 0;
}
//提交该消费者所有分区的 offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
//保存offset
}
拦截器
原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
- configure(configs)
- 获取配置信息和初始化数据时调用。
- onSend(ProducerRecord)
- 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
- onAcknowledgement(RecordMetadata, Exception)
- 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
- close
- 关闭 interceptor,主要用于执行一些资源清理工作
- 如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
- 另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中 要特别留意。
实操
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
时间戳拦截器
public class TimeInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return new ProducerRecord(producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
System.currentTimeMillis() + "-" + producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
计数拦截器
public class CountInterceptor implements ProducerInterceptor {
private Integer successCount = 0;
private Integer errorCount = 0;
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (Objects.isNull(e)) {
successCount ++;
} else {
errorCount ++;
}
}
@Override
public void close() {
System.out.println("success: " + successCount + ", error: " + errorCount);
}
@Override
public void configure(Map<String, ?> map) {
}
}
使用拦截器
@Test
public void testInterceptorChain() {
//配置
Properties properties = new Properties();
properties.put("bootstrap.servers","shangxiaoying.cn:9092");
properties.put("aacks", "all");
//配置的key可以在ProducerConfig中找到
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//这里指定要使用的分区策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
//这里可以指定拦截器
List<String> interceptorList = new ArrayList<>();
interceptorList.add("cn.shangxiaoying.kafka.interceptors.TimeInterceptor");
interceptorList.add("cn.shangxiaoying.kafka.interceptors.CountInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
producer.send(new ProducerRecord<>("first", 0, "testInterceptor", "hello,Interceptor"));
//该close方法也会调用拦截器的close
producer.close();
}
集成SpringBoot
生产者
配置文件
application.properties
# 应用名称
spring.application.name=kafka-learn
# 指定 kafka 的地址
spring.kafka.bootstrapservers=ip1:port, ip2:port
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
发送数据
@RestController
public class ProducerController {
// Kafka 模板用来向 kafka 发送数据
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/send")
public String send(String msg) {
kafka.send("first", msg);
return "ok";
}
}
消费者
配置文件
application.properties
# 指定 kafka 的地址
spring.kafka.bootstrapservers=ip1:port, ip2:port
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=kafka-test
消费数据
@Configuration
public class KafkaConsumer {
// 指定要监听的 topic
@KafkaListener(topics = "first")
public void consumeTopic(String msg) { // 参数: 收到的 value
System.out.println("收到的信息: " + msg);
}
}