Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

文章目录


Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明


官方说明

https://kafka.apache.org/documentation/

选择对应的版本,我这里选的是 2.4.X

https://kafka.apache.org/24/documentation.html

选择

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

https://kafka.apache.org/24/documentation.html#consumerconfigs

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

查找 auto.offset.reset
Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

让我们来品一品官方的解读

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明


参数解读

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)

no initial offset || current offset does not exist

no initial offset

举个例子 在消费组ConsumerGroupA里有个消费者A1, 已经消费到了100条数据, 这个时候你又新起了一个消费者, 但是呢这个新起的消费者的消费组和消费组A的名称不同,我们暂且称之为消费组ConsumerGroupB.
根据kafka的机制, 这个新起的消费组中的消费者再消费分区数据的时候,auto.offset.reset参数就起作用了

current offset does not exist

我们知道kafka提供了API可以按照消费offset记录继续消费,如果指定的offset不存在,那么 这个参数也会生效

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明
啥意思?。。。。。。。

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费

  • latest(默认) :只消费自己启动之后发送到主题的消息
  • earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

Code

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明


POM依赖

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- 引入 Spring-Kafka 依赖 -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>


配置文件

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明


生产者

 package com.artisan.springkafka.producer;

import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Random;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */

@Component
public class ArtisanProducerMock {


    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;


    /**
     * 同步发送
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public SendResult sendMsgSync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"artisanTestMessage-" + id);
        // 同步等待
       return  kafkaTemplate.send(TOPIC.TOPIC, messageMock).get();
    }



    public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
        // 异步发送消息
        ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
        return result ;

    }

}
    
     

消费者

 package com.artisan.springkafka.consumer;

import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */

@Component
public class ArtisanCosumerMock {


    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "ConsumerGroup-" ;

    /**
     * 消费者的groupId ,每次启动都通过SPEL 随机一个出来,确保每次都是一个新的消费组 用于测试 auto.offset.reset 参数 设置为 earliest的情况
     * @param messageMock
     */
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC +  "-" + "#{T(java.util.UUID).randomUUID()})")
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }

}
    
    

看注释


单元测试

 
 package com.artisan.springkafka.produceTest;

import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {

    private Logger logger = LoggerFactory.getLogger(getClass());


    @Autowired
    private ArtisanProducerMock artisanProducerMock;

    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        SendResult sendResult = artisanProducerMock.sendMsgSync();

        logger.info("testSyncSend Result =  topic:[{}] , partition:[{}], offset:[{}]",
                sendResult.getRecordMetadata().topic(),
                sendResult.getRecordMetadata().partition(),
                sendResult.getRecordMetadata().offset());

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }


    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
            artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    logger.info(" 发送异常{}]]", throwable);

                }
                @Override
                public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                    logger.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                            objectObjectSendResult.getRecordMetadata().topic(),
                            objectObjectSendResult.getRecordMetadata().partition(),
                            objectObjectSendResult.getRecordMetadata().offset());
                }
            });

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();

    }

}
    
    

测试

看结果之前,先看看当前topic中的数据

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

earliest

earliest 按照预期,新起了一个ConsumerGroup , 肯定会从头消费 ,让我们来验证下

2021-02-23 16:54:38.014  INFO 5684 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[51]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=1, name='artisanTestMessage9'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=68, name='artisanTestMessage-68'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=48, name='artisanTestMessage-48'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=81, name='artisanTestMessage-81'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=51, name='artisanTestMessage-51'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=60, name='artisanTestMessage-60'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='artisanTestMessage-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=23, name='artisanTestMessage-23'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=37, name='messageSendByAsync-37'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=16, name='messageSendByAsync-16'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='messageSendByAsync-79'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=19, name='messageSendByAsync-19'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=42, name='messageSendByAsync-42'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=70, name='messageSendByAsync-70'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=84, name='messageSendByAsync-84'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=56, name='messageSendByAsync-56'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=82, name='messageSendByAsync-82'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=6, name='messageSendByAsync-6'}]
2021-02-23 16:54:38.064  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=94, name='messageSendByAsync-94'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=20, name='messageSendByAsync-20'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=52, name='messageSendByAsync-52'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=24, name='messageSendByAsync-24'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=32, name='messageSendByAsync-32'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=45, name='messageSendByAsync-45'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=18, name='messageSendByAsync-18'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=13, name='messageSendByAsync-13'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=57, name='messageSendByAsync-57'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=53, name='messageSendByAsync-53'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=46, name='messageSendByAsync-46'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=33, name='messageSendByAsync-33'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='artisanTestMessage-49'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=83, name='artisanTestMessage-83'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=61, name='artisanTestMessage-61'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]
2021-02-23 16:54:38.067  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=99, name='artisanTestMessage-99'}]
2021-02-23 16:54:38.075  INFO 5684 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=43, name='artisanTestMessage-43'}]


latest(默认)

2021-02-23 16:55:21.541  INFO 21472 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[52]
2021-02-23 16:55:21.587  INFO 21472 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=79, name='artisanTestMessage-79'}]


none

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)-1, groupId=ConsumerGroup-MOCK_TOPIC-d52dc4e5-535f-4151-ad5a-85b7c0488b9d)] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [MOCK_TOPIC-0]

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) [kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) [kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) [spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) [spring-kafka-2.6.4.jar:2.6.4]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : No offset and no reset policy

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [MOCK_TOPIC-0]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:671) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2422) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1718) ~[kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1031) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:462) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2631) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) ~[kafka-clients-2.6.0.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1269) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1160) ~[spring-kafka-2.6.4.jar:2.6.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1073) ~[spring-kafka-2.6.4.jar:2.6.4]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

2021-02-23 16:55:44.726 ERROR 19956 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container
2021-02-23 16:55:44.730  INFO 19956 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-23 16:55:44.749  INFO 19956 --- [           main] c.a.s.produceTest.ProduceMockTest        : testSyncSend Result =  topic:[MOCK_TOPIC] , partition:[0], offset:[53]


exception

Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

后两种设置,看看就行

懂了么,老兄 ~


源码地址

https://github.com/yangshangwei/boot2/tree/master/springkafka_auto.offset.reset
Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明

上一篇:interface Consumer, package java.util.function, since jre1.8


下一篇:RocketMQ从基础到应用(延迟消息队列)