kafka 消息回溯
指定 offset 的 api
KafkaConsumer#seek
KafkaConsumer#seekToBeginning
KafkaConsumer#seekToEnd
对应
assignedState(tp).seek(offset); assignedState(partition).reset(offsetResetStrategy); assignedState(partition).reset(offsetResetStrategy);
首先检查当前消费者是否分配到分区,然后发送请求
// org.apache.kafka.clients.consumer.internals.SubscriptionState#assignedState
private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}
KafkaConsumer#seek 和 KafkaConsumer#offsetsForTimes 和结合使用
所以,kafka 的消息回溯,需要给消费者发送指令,让消费者调用 seek 或 seekToBeginning 或 seekToEnd。