问题描述:
kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费
问题解决:
下面是摘自官网的一段话:
How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting KafkaConfig.startOffsetTime as follows:
kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
A Unix timestamp aka seconds since the epoch (e.g. via System.currentTimeMillis()): see How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? in the Kafka FAQ
As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.
Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.
这段话的包含的内容大概有,通过SpoutConfig对象的startOffsetTime字段设置消费进度,默认值是kafka.api.OffsetRequest.EarliestTime(),也就是从最早的消息开始消费,如果想从最新的消息开始消费需要手动设置成kafka.api.OffsetRequest.LatestTime()。另外还有一个问题是,这个字段只会在第一次消费消息时起作用,之后消费的offset是从zookeeper中记录的offset开始的(存放消费记录的地方是SpoutConfig对象的zkroot字段,未验证)
如果想要当前的topology的消费进度接着上一个topology的消费进度继续消费,那么不要修改SpoutConfig对象的id。换言之,如果你第一次已经从最早的消息开始消费了,那么如果不换id的话,它就要从最早的消息一直消费到最新的消息,这个时候如果想要跳过中间的消息直接从最新的消息开始消费,那么修改SpoutConfig对象的id就可以了
下面是SpoutConfig对象的一些字段的含义,其实是继承的KafkaConfig的字段,可看源码
public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
public int fetchMaxWait = 10000; //当服务器没有新消息时,消费者会等待这些时间
public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
相关文章
- 08-01Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题
- 08-01基于kafka_2.11-2.1.0实现的生产者和消费者代码样例
- 08-01自定义实现一个loghub(或kafka)的动态分片消费者负载均衡?
- 08-01kafka Poll轮询机制与消费者组的重平衡分区策略剖析
- 08-01Kafka 里面的信息是如何被消费的?
- 08-01kafka生产者Producer、消费者Consumer的拦截器interceptor
- 08-01kafka消费者分组消费的再平衡策略
- 08-01Kafka的消费者提交方式手动同步提交、和异步提交
- 08-01【Kafka的Controller选举机制/分区副本选举Leader机制/消费消息的offset记录机制/Rebalance过程与机制以及分区分配策略/发布消息机制/HW与LEO/日志分段存储】
- 08-01kafka在python中的使用及结束kafka消费者