Kafka生产者原理总结
# Kafka架构
1.producer:
消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:
kafka 集群中包含的服务器。
3.topic:
每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4.partition:
partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
5.consumer:
从 kafka 集群中消费消息的终端或服务。
6.Consumer group:
high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一个角色,从 leader 中复制数据。
10.controller:
kafka 集群中的其中一个服务器,用来进行 leader selection。
12.zookeeper:
kafka 通过 zookeeper 来存储集群的信息。
# Kafka的Log存储解析
### 1. 存储形式
为了解决kafka查找效率问题,kafka的message在存储的时候采取了分段+索引的存储方式。
Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
![在这里插入图片描述](https://www.icode9.com/i/ll/?i=20210628143023676.png)
partition是以文件的形式存储在文件系统中,比如,创建了一个名为iss_fw_state_callback的topic,其有2个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样2个目录: iss_fw_state_callback-0,iss_email_send_topic-1,其命名规则为-,里面存储的分别就是这2个partition的数据
![在这里插入图片描述](https://www.icode9.com/i/ll/?i=20210628143040716.png)
partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件
![在这里插入图片描述](https://www.icode9.com/i/ll/?i=20210628143049989.png)
可以看到,这个partition有4个LogSegment。partion全局的第一个segment从0开始,后续每个segment文件名为上一个 segment文件最后一条消息的offset值进行递增
### 2. index与log对应关系
![在这里插入图片描述](https://www.icode9.com/i/ll/?i=20210628143218308.png?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L1dMWTE5OTUwOTA4,size_16,color_FFFFFF,t_70)
在partition中如何通过offset查找message?查找的算法是:
1.索引文件:根据offset的值,查找segment段中的index索引文件。
注:由于索引文件命名是以上一个文件的最后一个offset进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
2.近似position:找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。
注:由于是稀疏索引,所以可能只能定位到一条相近的消息
3.具体消息:得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息。
示例:我们要查找offset=368776这条消息(参照上面的图)
那么先找到00000000000000368769.index,
在索引文件中,找到索引6(对应的offset=368775)的具体位置position=1407(注:这次只能找到近似offset)
到log文件中,根据1407这个position开始查找,比较每条消息的offset是否大于等于368776。最后查找到对应的消息以后返回。
# Kafka生产者
### 1. 分区策略
查看kafka生产者源码:
```java
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, (Iterable)null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) {
this(topic, partition, (Long)null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
```
partition分为三种情况:
1. 指定partition分区序号,则直接将消息投递进指定序号的分区。
2. 没有指定partition分区序号,指定了key的情况下。对key进行hash得到的值,跟topic的分区数量进行取模得到partition分区的值。如一个topic的分区数量为3,则hash(key)%3为消息投递的分区。
3. 没有指定分区partition和没有指定key,则进行轮询RoundRobin投递分区。
### 2. 数据可靠性保证
1.副本数据同步策略:
| 方案 | 优点 | 缺点 |
|--|--|--|
| 半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
|全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka采用的是第二种方案,因为网络延迟对kafka的延迟小。第一种方案会造成大量数据冗余,浪费资源。
2. ISR(同步副本In-sync replicas):
试想,如果有一个follower故障,长时间没有从leader同步数据,那么leader就会一直等下去,直到该follower同步完消息才发送ack。所以,这才有了ISR。
In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的. 那么,ISR中存在是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中. 而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”。
对于"follower副本是否与Leader副本保持了同步"的理解如下:
(1) 上面所说的同步不是指完全的同步,即并不是说一旦follower副本同步滞后与Leader副本,就会被踢出ISR列表.
(2) Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。这就意味着,只要follower副本落后于leader副本的时间间隔不超过10秒,就可以认为该follower副本与leader副本是同步的,所以哪怕当前follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出出局。
(3) 如果follower副本被踢出ISR列表,等到该副本追上了Leader副本的进度,该副本会被再次加入到ISR列表中,所以ISR是一个动态列表,并不是静态不变的.
3. ack应答机制:
acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的配置具体如下:
1).acks=0,生产者不需要等待leader的响应,leader一接收到还没写入磁盘时leader故障,就会造成消息丢失
2).acks=1,leader把消息写入磁盘后返回ack。如果在follower同步之前leader发生故障,会造成消息丢失
3).acks=-1,leader和所有follower把消息写入磁盘后才返回ack。在follower同步完成后,leader发送ack之前,leader发生故障,会导致消息重复发送。虽然-1时基本上不会丢失数据,但是在某种极端情况下还是会的,即所有isr的副本都宕机,而另一些因为一些问题没有宕机也不在isr中的follower恢复了,就重新加入isr了,并重新选出leader,(退化到acks=1的情况了)。。。此时原本isr中的一些最新数据没有被同步到新的isr的副本节点中,那么那些数据就丢失了
4. LEO(Log End Offset) & HW(High Watermark)
![在这里插入图片描述](https://www.icode9.com/i/ll/?i=20210628160319131.png?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L1dMWTE5OTUwOTA4,size_16,color_FFFFFF,t_70)
LEO:指的是每个副本中最大的offset
HW:指的是ISR队列中最小的LEO
(1) follower故障:
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会从磁盘中读取上次的HW,并将log中高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2) leader故障:
leader发生故障后,会从ISR中选出一个新的leader,之后,为了保证多个副本之间的数据一致性,其余的follower会先将各自log文件中高于HW的部分截取掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
5. Exactly Once
· at-least-once:如果producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终consumer,这种策略可能导致重复的工作和不正确的结果。
· at-most-once:如果在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,因此不会传递给consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。
· exactly-once:即使producer重试发送消息,消息也会保证最多一次地传递给最终consumer。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将Kafka consumer的偏移量rollback,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现excactly-once。
at-least-once + 幂等性 = exactly-once
要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true。开启幂等性的Producer会在初始化的时候分配一个PID,发往同一个partition的消息会附带一个sequence number,而broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交时,broker只会持久化一条