kafka篇二
- 11、讲一讲 kafka 的 ack 的三种机制
- 12、消费者如何不自动提交偏移量,由应用提交?
- 13、消费者故障,出现活锁问题如何解决?
- 14、如何控制消费的位置
- 15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?
- 16、kafka 的高可用机制是什么?
- 17、kafka 如何减少数据丢失
- 18、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。
11、讲一讲 kafka 的 ack 的三种机制
request.required.acks 有三个值 0 1 -1(all)
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失
12、消费者如何不自动提交偏移量,由应用提交?
将 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者异步提交
commitAsync()
即:
ConsumerRecords<> records = consumer.poll(); for (ConsumerRecord<> record : records){
。。。
tyr{
consumer.commitSync()
}
。。。
13、消费者故障,出现活锁问题如何解决?
出现“活锁”的情况,是它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。 在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到 offset 提交失败(调用commitSync()引发的 CommitFailedException)。这是一种安全机制,保障只有活动
成员能够提交 offset。所以要留在组中,你必须持续调用 poll。
消费者提供两个配置设置来控制 poll 循环:
max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的
对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll。 但是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。
14、如何控制消费的位置
kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的 offset的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))
15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?
Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证 FIFO的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 partition。
Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数。partiton 和 key 是可选的。
如果你指定了 partition,那就是所有消息发往同 1个 partition,就是有序的。并且在消费端,Kafka 保证,1 个 partition 只能被1 个 consumer 消费。或者你指定 key( 比如 order id),具有同 1 个 key的所有消息,会发往同 1 个 partition。
16、kafka 的高可用机制是什么?
这个问题比较系统,回答出 kafka 的系统特点,leader 和 follower 的关系,消息读写的顺序即可。
17、kafka 如何减少数据丢失
Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。
- block.on.buffer.full = true
- acks = all
- retries = MAX_VALUE
- max.in.flight.requests.per.connection = 1
- 使用KafkaProducer.send(record, callback)
- callback逻辑中显式关闭producer:close(0)
- unclean.leader.election.enable=false
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
- enable.auto.commit=false
- 消息处理完成之后再提交位移
18、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。
其实还是得结合业务来思考,我这里给几个思路:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。