数据丢失和如何保障数据不丢失
数据丢失的情况:
1、broker端: acks设置0 ,1
acks=0:producer把消息发送出去了,就确认发送成功了,但是如果此时leader分区宕机了,根本没有接收这条消息,或者还没有写入日志,导致数据丢失
acks=1:producer把消息发送出去了,leader分区收到并写入日志,,就确认发送成功了,但是如果此时leader分区宕机了,根本没有把这条消息同步给follower分区,follower分区后续继续选举了一个新的leader分区,新leader分区并没有这条新数据,导致数据丢失
保障不丢失措施:
1、设置acks=all,当leader分区收到并写入日志,及同步给follower分区之后,再确认消息发送成功
2、设置副本数>1,如果副本数=1,则就算acks=all ,也可以会造成数据丢失
2、producer: 异步批量发送
一般情况下,为了提高吞吐量,发消息会实行异步批量发送,
这样做有一个风险,当buffer里面有很多数据的时候,此时producer宕机了,会把buffer里面的数据给丢失了,导致数据丢失
保障不丢失措施:
1、把异步改成同步
kafkaTemplate.send("testTopic",gson.toJson(message));
改成:
Future future = kafkaTemplate.send("testTopic",gson.toJson(message));
future.get();
2、不要使用producer.send(msg),而要使用producer.send(msg, callback)。在callback回调函数 中可以把发送失败的消息给保存下来,进行下一次发送
3、设置retries ,当网络发生抖动,配置了retries > 0的producer能够自动重试消息发送,避免消息丢失。
3、comsumer:自动提交
自动提交是当消费者poll过来了,就自动提交offset给broker,如果此时消费者接收到了数据,并没有做处理的时候,突然消费者当前的机器宕机了,导致还没对这条数据做处理(保存数据库或者其他计算),最终丢失了
1、设置enable.auto.commit=false 在处理完这条数据后,手动提交offset
2、设置auto-offset-reset 为 earliest
这个参数指定了在没有偏移量可提交时(比如消费者第l次启动时)或者请求的偏移量在broker上不存在时(比如数据被删了),消费者会做些什么。
这个参数有两种配置。一种是earliest:消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest(默认),如果选择了这种配置, 消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
消息的有序性
kafka的特性是只支持partition有序
之前在生产者讲分区选择的时候,可以指定key的方式发送数据 ,可以用业务id,作为key,达到业务上的有序
重复消费
1、 比如消费者手动提交的时候,处理完这条消息,但是在进行手动提交的时候,突然宕机了,并没有修改broker中的offset值
2、当有人估计把broker中保存offset的topic给删掉的时候,我们没法知道当前分区的offset 是什么,只有从头再读一遍,会造成大量重复消息
3、消费者提交offset不是消费一条就进行提交的,而是按一定的时间周期进行提交的,如果正好在这个时间之内,服务器宕机了,就导致offset没有更新,当消费者启动的时候,会重复消费之前的数据
解决办法是:
1、本地保存处理完消息的offset 值,每条消息都带有一个offset,在表设计的时候,可以topic 分区 offset三维度区设计
2、消费数据的幂等性,可以定义消息的唯一属性,如果数据库存在唯一性的消息就不进行处理或者更新处理
消息堆积
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group yc-testTopic
Consumer group 'yc-testTopic' has no active members.(我当前的消费者服务被停掉了)
CURRENT-OFFSET:表示当前的offset
LOG-END-OFFSET:表示最大的那个offset
则LAG堆积数量为:LOG-END-OFFSET- CURRENT-OFFSET ,造成这个原因是我把消费者线程给停掉了,生产了1千万多条数据没有被消费