consumer_offsets深度剖析(十三)

consumer_offsets深度剖析

      位移提交是使用消费者客户端过程中一个比较“讲究”的操作。位移提交的内容最终会保存到 Kafka 的内部主题 __consumer_offsets 中,对于主题 __consumer_offsets 的深度掌握也可以让我们更好地理解和使用好位移提交。

       一般情况下,当集群中第一次有消费者消费消息时会自动创建主题 __consumer_offsets,不过它的副本因子还受 offsets.topic.replication.factor 参数的约束,这个参数的默认值为3(下载安装的包中此值可能为1),分区数可以通过 offsets.topic.num.partitions 参数设置,默认为50。客户端提交消费位移是使用 OffsetCommitRequest 请求实现的,OffsetCommitRequest 的结构如下图所示。

consumer_offsets深度剖析(十三)

 

        请求体第一层中的 group_id、generation_id 和 member_id 在前面的内容中已经介绍过多次了,retention_time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1。也就是说,按照 broker 端的配置 offsets.retention.minutes 来确定保留时长。offsets.retention.minutes 的默认值为10080,即7天,超过这个时间后消费位移的信息就会被删除(使用墓碑消息和日志压缩策略)。注意这个参数在 2.0.0 版本之前的默认值为1440,即1天,很多关于消费位移的异常也是由这个参数的值配置不当造成的。

      有些定时消费的任务在执行完某次消费任务之后保存了消费位移,之后隔了一段时间再次执行消费任务,如果这个间隔时间超过 offsets.retention.minutes 的配置值,那么原先的位移信息就会丢失,最后只能根据客户端参数 auto.offset.reset 来决定开始消费的位置,遇到这种情况时就需要根据实际情况来调配 offsets.retention.minutes 参数的值。

       OffsetCommitRequest 中的其余字段大抵也是按照分区的粒度来划分消费位移的:topic 表示主题名称,partition 表示分区编号等。注意这里还有一个 metadata 字段。在3.2.5节中讲到手动位移提交时提到了可以通过 Map<TopicPartition, OffsetAndMetadata> offsets 参数来指定要提交的分区位移。OffsetAndMetadata 中包含2个成员变量(offset 和 metadata),与此对应的有两个构造方法,详细如下:

public OffsetAndMetadata(long offset)
public OffsetAndMetadata(long offset, String metadata)

        metadata 是自定义的元数据信息,如果不指定这个参数,那么就会被设置为空字符串,注意 metadata 的长度不能超过 offset.metadata.max.bytes 参数(broker 端配置,默认值为4096)所配置的大小。

          同消费组的元数据信息一样,最终提交的消费位移也会以消息的形式发送至主题 __consumer_offsets,与消费位移对应的消息也只定义了 key 和 value 字段的具体内容,它不依赖于具体版本的消息格式,以此做到与具体的消息格式无关。

consumer_offsets深度剖析(十三)

 

       上图中展示了消费位移对应的消息内容格式,上面是消息的 key,下面是消息的 value。可以看到 key 和 value 中都包含了 version 字段,这个用来标识具体的 key 和 value 的版本信息,不同的版本对应的内容格式可能并不相同。就目前版本而言,key 和 value 的 version 值都为1。key 中除了 version 字段还有 group、topic、partition 字段,分别表示消费组的 groupId、主题名称和分区编号。虽然key中包含了4个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。

        value 中包含了5个字段,除 version 字段外,其余的 offset、metadata、commit_timestamp、expire_timestamp 字段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳。其中 offset 和 metadata 与 OffsetCommitRequest 请求体中的 offset 和 metadata 对应,而 expire_timestamp 和 OffsetCommitRequest 请求体中的 retention_time 也有关联,commit_timestamp 值与 offsets.retention.minutes 参数值之和即为 expire_timestamp(默认情况下)。

        在处理完消费位移之后,Kafka 返回 OffsetCommitResponse 给客户端,OffsetCommitResponse 的结构如下图所示。OffsetCommitResponse 中各个域的具体含义可以通过前面内容中推断出来,这里就不再赘述了。

consumer_offsets深度剖析(十三)

 

      我们可以通过 kafka-console-consumer.sh 脚本来查看 __consumer_offsets 中的内容,不过要设定 formatter 参数为       kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter。假设我们要查看消费组“consumerGroupId”的位移提交信息,首先可以根据代码清单12-1中的计算方式得出分区编号为20,然后查看这个分区中的消息,相关示例如下:

[root@node1 kafka_2.11-2.0.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets -–partition 20 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

[consumerGroupId,topic-offsets,30]::[OffsetMetadata[2130,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,8]::[OffsetMetadata[2310,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,21]::[OffsetMetadata[1230,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,27]::[OffsetMetadata[1230,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,9]::[OffsetMetadata[1233,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,35]::[OffsetMetadata[1230,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,41]::[OffsetMetadata[3210,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,33]::[OffsetMetadata[1310,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
[consumerGroupId,topic-offsets,23]::[OffsetMetadata[2123,NO_METADATA],CommitTime 1538843128354,ExpirationTime 1539447928354]
(…省略若干)

一般情况下,使用 OffsetsMessageFormatter 打印的格式可以概括为:

"[%s,%s,%d]:: [OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]".format (group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)

       这里需要说明的是,如果某个 key(version + group + topic + partition 的组合)对应的消费位移过期了,那么对应的 value 就会被设置为 null,也就是墓碑消息(主题 __consumer_offsets 使用的是日志压缩策略),对应的打印结果也会变成如下的形式:

" [%s,%s,%d]::null".format(group, topic, partition)

有时候在查看主题 __consumer_offsets 中的内容时有可能出现下面这种情况:

[consumerGroupId,topic-offsets,21]::null

          这说明对应的消费位移已经过期了。在 Kafka 中有一个名为“delete-expired-group-metadata”的定时任务来负责清理过期的消费位移,这个定时任务的执行周期由参数 offsets.retention.check.interval.ms 控制,默认值为600000,即10分钟。

          还有 metadata,一般情况下它的值要么为 null 要么为空字符串,出现这种情况时,OffsetsMessageFormatter 会把它展示为“NO_METADATA”,否则就按实际值进行展示。

冷门知识:如果有若干消费者消费了某个主题中的消息,并且也提交了相应的消费位移,那么在删除这个主题之后会一并将这些消费位移信息删除。

上一篇:[ZJOI2007]最大半连通子图【tarjan缩点】【拓扑排序+DP】


下一篇:CF118E Bertown roads