Apache Kafka – KIP 32,33 Time Index

32, 33都是和时间相关的,

KIP-32 - Add timestamps to Kafka message

引入版本,0.10.0.0

需要给kafka的message加上时间戳,这样更方便一些, 比如在做retention,rolling,或getMessageByTime的时候

在config里面可以配置,

  1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
  2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.

type,可以定义,是以用户指定的event time,或是kafka处理的processing time来作为时间戳

如果选用用户createTime,会产生的问题是,首先这个时间戳不一定是递增的

max.message.time.difference.ms 的默认值是Long.MaxValue,如果设置该项,会丢弃时间异常的message,即过老或过新的;

 

然后在ProducerRecord,ConsumerRecord,增加timestamp字段

Add a timestamp field to ProducerRecord and ConsumerRecord.

对于createTime,需要user自己在创建ProducerRecord的时候去设置,timestamp

如果是LogAppendTime,broker会在收到message后自动填上这个timestamp

 

使用CreateTime和LogAppendTime的区别?

The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)

The good things about LogAppendTime are:

  1. Broker is more robust.
  2. Monotonically increasing.
  3. Deterministic behavior for log rolling and retention.
  4. If CreateTime is required, it can always be put into the message payload.

The good things about CreateTime are:

  1. More intuitive to users.
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
  3. Immutable after entering the pipeline.

场景不一样,CreateTime主要是对于分析的场景,
其实message里面往往是包含有event time的,所以单纯从队列而言,LogAppendTime就足够,而且各种逻辑会简单很多

 

好,现在message里面有时间了,怎么用?

所以提出,

KIP-33 - Add a time based log index

引入版本,0.10.1.0

动机,

Kafka has a few timestamp based functions, including

  1. Searching message by timestamp
  2. Time based log rolling
  3. Time based log retention.

Currently these operations depend on the create time / modification time of the log segment file. This has a few issues.

  1. Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
  2. The time based log rolling and retention does not work well when replica is reassigned.

之前retention,rolling,或search message都会用到time
而之前的时间都是用的是log segment的创建时间,这样会有些问题

尤其当发生replica reassigned后,log segment的时间会变成最新,所以就不准确了

这里会引入,time-based log index,来建立时间索引

所以log目录下的文件,就从原来的log file,offset index两个,增加time-based log index,变成3个

The log index works for both LogAppendTime and CreateTime.

Create another index file for each log segment with name SegmentBaseOffset.timeindex. The density of the index is upper bounded by index.interval.bytes configuration.

格式,

Time Index Entry => Timestamp Offset

Timestamp => int64

Offset => int32

 

创建timeIndex的过程,

  1. When a new log segment is created, the broker will create a time index file for the log segment. 
  2. The default initial / max size of the time index files is the same as the offset index files. (time index entry is 1.5x of the size of offset index entry, user should set the configuration accordingly).
  3. Each log segment maintains the largest timestamp so far in that segment. The initial value of the largest timestamp is -1 for a newly created segment.
  4. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. (The timestamp will either be LogAppendTime or CreateTime depending on the configuration)
  5. When broker appends the message to the log segment, if an offset index entry is inserted, it will also insert a time index entry if the max timestamp so far is greater than the timestamp in the last time index entry.
    • For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.
  6. When the current active segment is rolled out or closed. A time index entry will be inserted into the time index to ensure the last time index entry has the largest timestamp of the log segment.
    1. If largest timestamp in the segment is non-negative (at least one message has a timestamp), the entry will be (largest_timestamp_in_the_segment -> base_offset_of_the_next_segment)
    2. If largest timestamp in the segment is -1 (No message in the segment has a timestamp), the time index will be empty and the largest timestamp would be default to the segment last modification time.

The time index is not monotonically increasing for the segments of a partition. Instead, it is only monotonically increasing within each individual time index file. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment.

 

创建新的log segment的同时,会创建time index file,并初始化

当brokers append一条message到log segment时,首先offset index entry 会被插入(index插入都是有inteval的),同时也会插入一条time index entry (当timestamp大于当前TimeIndex中last entry的时间,所以time index是单调递增的)

并且当active segment发生roll或者closed的时候也会插入一条time index entry(因为index插入有间隔,所以在关闭或新开的时候,需要把last记录插入index)

并且上面在插入time index的时候,会判断时间戳是否当前time index中last entry的时间,所以在单个time index file中,时间是单调递增的

但是在多个time index file之间,无法保证,即在新的time index file中会出现比较老的时间戳;

如果message用的是createTime,这个问题应该会经常碰到

 

按上面的场景,这样,我们在做rolling,retention的时候都会用这个time index

步骤如下,

Enforce time based log retention

To enforce time based log retention, the broker will check from the oldest segment forward to the latest segment. For each segment, the broker checks the last time index entry of a log segment. The timestamp will be the latest timestamp of the messages in the log segment. So if that timestamp expires, the broker will delete the log segment. The broker will stop at the first segment which is not expired. i.e. the broker will not expire a segment even if it is expired, unless all the older segment has been expired.

broker会从最老的segment开始遍历,如果该segment的last time index是过期的,就把这个segment删掉
如果没有过期,就停止扫描;这样如果后面还有过期的segment,也不会被过期掉

所以如果用createTime,会让log retention变的有点不确定和混乱

 

Enforce time based log rolling

Currently time based log rolling is based on the creating time of the log segment.
With this KIP, the time based rolling would be changed to only based on the message timestamp.
More specifically, if the first message in the log segment has a timestamp, A new log segment will be rolled out if timestamp in the message about to be appended is greater than the timestamp of the first message in the segment + log.roll.ms.

When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out.

During the migration phase, if the first message in a segment does not have a timestamp, the log rolling will still be based on the (current time - create time of the segment).

log segment的rolling也是根据message的时间,所以这里当message.timestamp.type=CreateTime的时候,必须谨慎的设置max.message.time.difference.ms, 以避免rolling

上述比较好理解,唯一注意的是,如果message.timestamp.type=CreateTime, 以message的timestamp作为依据,会有很多的不确定性

 

Search message by timestamp

When searching by timestamp, broker will start from the earliest log segment and check the last time index entry. If the timestamp of the last time index entry is greater than the target timestamp, the broker will do binary search on that time index to find the closest index entry and scan the log from there. Otherwise it will move on to the next log segment.

Searching by timestamp will have better accuracy. The guarantees provided are:

  • The messages whose timestamp are after the searched timestamp will be consumed.
  • Some messages with earlier timestamp might also be consumed.

The OffsetRequest behaves almost the same as before. If timestamp T is set in the OffsetRequest, the first offset in the returned offset sequence means that if user want to consume from T, that is the offset to start with. The guarantee is that any message whose timestamp is greater than T has a bigger offset. i.e. Any message before this offset has a timestamp < T.

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching.

这个feature非常赞,现在kafka终于可以支持按时间replay

之前只能是在segment级别去replay

过程就是,

broker会从最早的earliest log segment开始遍历,check last time index entry,如果小于指定时间,说明这个segment里面所有的message都是早于指定时间的,所以skip

继续直到找到一个segment的last time index entry比指定时间大的,说明这个segment中有我们需要的数据

接着,在该segment的time index中进行二分查找,找到最接近的时间index,从对应的offset开始读取

这里会保证,在指定时间后的数据都会被读取到,但注意之前的数据也是有可能被读到的

首先,因为只是找到最接近time index,所以不是精确的,总会多读点

再者,如果是createTime的时间戳,message不是时间单调递增的,所以后面有可能有老的message

在源码上,

KafkaAPIs.handleOffsetRequestV1

会调用,

              fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
}

而fetchOffsetForTimestamp的实现如下,

private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
logManager.getLog(topicPartition) match {
case Some(log) =>
log.fetchOffsetsByTimestamp(timestamp)
case None =>
throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.")
}
}

logManager.getLog会返回partition所对应的Log,The log is a sequence of LogSegments

Log.fetchOffsetsByTimestamp的实现,

// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset)) //如果是earliest,返回第一个segment的baseoffset
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset)) //如果是latest,返回logEndOffset val targetSeg = {
// Get all the segments whose largest timestamp is smaller than target timestamp
val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
// We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
if (earlierSegs.length < segmentsCopy.length)
Some(segmentsCopy(earlierSegs.length)) //返回第一个大于targetTimestamp的segment
else
None targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))

根据每个segment里面的time index中的largestTimestamp去比较

找出比targetTimestamp大的,

LogSegment.findOffsetByTimestamp

是在该time index中,继续二分查找,找到最接近的timestamp所对应的offset

上一篇:华东交通大学2016年ACM“双基”程序设计竞赛 1007


下一篇:Date对象