Offset存储模型
由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以 groupid-topic-partition -> offset 的方式保存。
如图所示:
Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中。
__consumers_offsets默认拥有50个partition,可以通过
Math.abs(groupId.hashCode() % offsets.topic.num.partitions)
的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个partition中。
下图展示了__consumers_offsets中保存的offset消息的格式:
如图所示,一条offset消息的格式为groupid-topic-partition -> offset。
因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。
Offset查询
前面我们已经描述过offset的存储模型,它是按照groupid-topic-partition -> offset的方式存储的。然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢?
答案就是Offsets Cache!!
如图所示,Consumer提交offset时,Kafka Offset Manager会首先追加一条条新的commit消息到__consumers_offsets topic中,然后更新对应的缓存。读取offset时从缓存中读取,而不是直接读取__consumers_offsets这个topic。
Offset管理方式
通常由如下几种 Kafka Offset 的管理方式: Spark Checkpoint:在 Spark Streaming 执行Checkpoint 操作时,将 Kafka Offset 一并保存到 HDFS 中。这种方式的问题在于:当 Spark Streaming 应用升级或更新时,以及当Spark 本身更新时,Checkpoint 可能无法恢复。 因而,不推荐采用这种方式。 HBASE、Redis 等外部 NOSQL 数据库:这一方式可以支持大吞吐量的 Offset 更新,但它最大的问题在于:用户需要自行编写 HBASE 或 Redis 的读写程序,并且需要维护一个额外的组件。 ZOOKEEPER:老版本的位移offset是提交到zookeeper中的,目录结构是 :/consumers/<group.id>/offsets/ <topic>/<partitionId> ,但是由于 ZOOKEEPER 的写入能力并不会随着 ZOOKEEPER 节点数量的增加而扩大, 因而,当存在频繁的 Offset 更新时,ZOOKEEPER 集群本身可能成为瓶颈。因而,不推荐采用这种方式。 KAFKA 自身的一个特殊 Topic(__consumer_offsets)中:这种方式支持大吞吐量的Offset 更新,又不需要手动编写 Offset 管理程序或者维护一套额外的集群,因而是迄今为止最为理想的一种实现方式。
另外几个与 Kafka Offset 管理相关的要点如下: Kafka 默认是定期帮你自动提交位移的(enable.auto.commit=true)。 有时候,我们需要采用自己来管理位移提交,这时候需要设置 enable.auto.commit=false。 属性 auto.offset.reset 值含义解释如下: earliest : 当各分区下有已提交的 Offset 时,从“提交的 Offset”开始消费;无提交的Offset 时,从头开始消费; latest : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数 none : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。
kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中); kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面);