kafka --- 常见性问题

数据丢失和如何保障数据不丢失

数据丢失的情况:

1、broker端: acks设置0 ,1

acks=0:producer把消息发送出去了,就确认发送成功了,但是如果此时leader分区宕机了,根本没有接收这条消息,或者还没有写入日志,导致数据丢失

acks=1:producer把消息发送出去了,leader分区收到并写入日志,,就确认发送成功了,但是如果此时leader分区宕机了,根本没有把这条消息同步给follower分区,follower分区后续继续选举了一个新的leader分区,新leader分区并没有这条新数据,导致数据丢失

保障不丢失措施:

1、设置acks=all,当leader分区收到并写入日志,及同步给follower分区之后,再确认消息发送成功

2、设置副本数>1,如果副本数=1,则就算acks=all ,也可以会造成数据丢失

2、producer: 异步批量发送

一般情况下,为了提高吞吐量,发消息会实行异步批量发送,

kafka --- 常见性问题

这样做有一个风险,当buffer里面有很多数据的时候,此时producer宕机了,会把buffer里面的数据给丢失了,导致数据丢失

保障不丢失措施:

1、把异步改成同步

kafkaTemplate.send("testTopic",gson.toJson(message));

改成:

Future future = kafkaTemplate.send("testTopic",gson.toJson(message));
future.get();

 2、不要使用producer.send(msg),而要使用producer.send(msg, callback)。在callback回调函数  中可以把发送失败的消息给保存下来,进行下一次发送

3、设置retries ,当网络发生抖动,配置了retries > 0的producer能够自动重试消息发送,避免消息丢失。

3、comsumer:自动提交

自动提交是当消费者poll过来了,就自动提交offset给broker,如果此时消费者接收到了数据,并没有做处理的时候,突然消费者当前的机器宕机了,导致还没对这条数据做处理(保存数据库或者其他计算),最终丢失了

1、设置enable.auto.commit=false 在处理完这条数据后,手动提交offset

2、设置auto-offset-reset 为 earliest

这个参数指定了在没有偏移量可提交时(比如消费者第l次启动时)或者请求的偏移量在broker上不存在时(比如数据被删了),消费者会做些什么。

这个参数有两种配置。一种是earliest:消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest(默认),如果选择了这种配置, 消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。

消息的有序性

 kafka的特性是只支持partition有序

之前在生产者讲分区选择的时候,可以指定key的方式发送数据 ,可以用业务id,作为key,达到业务上的有序

重复消费

1、 比如消费者手动提交的时候,处理完这条消息,但是在进行手动提交的时候,突然宕机了,并没有修改broker中的offset值

2、当有人估计把broker中保存offset的topic给删掉的时候,我们没法知道当前分区的offset 是什么,只有从头再读一遍,会造成大量重复消息

3、消费者提交offset不是消费一条就进行提交的,而是按一定的时间周期进行提交的,如果正好在这个时间之内,服务器宕机了,就导致offset没有更新,当消费者启动的时候,会重复消费之前的数据

解决办法是:

1、本地保存处理完消息的offset 值,每条消息都带有一个offset,在表设计的时候,可以topic 分区 offset三维度区设计

2、消费数据的幂等性,可以定义消息的唯一属性,如果数据库存在唯一性的消息就不进行处理或者更新处理

消息堆积

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group yc-testTopic
Consumer group 'yc-testTopic' has no active members.(我当前的消费者服务被停掉了)

kafka --- 常见性问题  CURRENT-OFFSET:表示当前的offset

 LOG-END-OFFSET:表示最大的那个offset

则LAG堆积数量为:LOG-END-OFFSET- CURRENT-OFFSET ,造成这个原因是我把消费者线程给停掉了,生产了1千万多条数据没有被消费

上一篇:SpringSecurity 微服务权限方案


下一篇:停止线程的错误方法