消费消息的请求(按序)
- org/apache/kafka/common/requests/RequestHeader
- org/apache/kafka/common/requests/ApiVersionsRequest
- org/apache/kafka/common/requests/MetadataRequest 批量查询topic的元数据信息
- org/apache/kafka/common/requests/FindCoordinatorRequest 从拿到的topic的元数据中取出leader节点 作为组协调者
- org/apache/kafka/common/requests/JoinGroupRequest
- org/apache/kafka/common/requests/SyncGroupRequest
- org/apache/kafka/common/requests/OffsetFetchRequest
- org/apache/kafka/common/requests/ListOffsetRequest
- org/apache/kafka/common/requests/FetchRequest
- org/apache/kafka/common/requests/HeartbeatRequest
请求接口文档参考
响应接口文档参考
请求头
name |
type |
defaultValue |
docString |
api_key |
INT16 |
null |
请求接口编号 |
api_version |
INT16 |
null |
api版本 |
correlation_id |
INT32 |
null |
用户提供的一个整数id,用于响应时由响应体带回来 |
client_id |
NULLABLE_STRING |
null |
用户提供的client id |
ApiVersionsRequest
查询API版本信息
请求 version:1
仅仅有请求头
响应 version:1
name |
type |
defaultValue |
docString |
error_code |
INT16 |
null |
错误码 |
api_versions |
ARRAY({api_key:INT16,min_version:INT16,max_version:INT16}) |
null |
broker能支持的api各版本列表。含最低版本,最高版本. |
throttle_time_ms |
INT32 |
0 |
由于配额冲突而阻止请求的持续时间(毫秒)(如果请求未违反任何配额,则为零 |
虽是请求broker端,但是实际还是用的client中的API完成的逻辑:
ApiVersionsResponse.apiVersionsResponse
根据messageFormatVersion
消息格式版本推导出各API版本情况。
API版本 最小的是0 。写的固定的。 最大的是 requestSchemas的length -1 即requestSchemas最大版本。
此处不仅返回每个API的最小版本与最大版本,还返回能支持的API列表。如因版本问题不能支持的API是不会返回的。
能否支持的判断依据是,API依赖的最小消息格式版本小于当前的消息格式版本,那么就支持。
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
versionList.add(neApiVersionsResponse.ApiVersion(apiKey));
}
}
批量查询topic的元数据信息
请求 version:5
name |
type |
defaultValue |
docString |
topics |
ARRAY(STRING) |
null |
需要查元数据的topic的列表,如果不送则查所有topic的元数据 |
allow_auto_topic_creation |
BOOLEAN |
null |
在broker配置了允许自动创建topic时是否自动创建topic |
响应 version:4
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
由于配额冲突而阻止请求的持续时间(毫秒)(如果请求未违反任何配额,则为零) |
brokers |
ARRAY({node_id:INT32,host:STRING,port:INT32,rack:NULLABLE_STRING}) |
null |
所有 活着的 broker的id ip port的信息 |
cluster_id |
NULLABLE_STRING |
null |
集群id |
controller_id |
INT32 |
null |
controller角色的broker的id |
topic_metadata |
ARRAY({error_code:INT16,topic:STRING,is_internal:BOOLEAN,partition_metadata:ARRAY({error_code:INT16,partition:INT32,leader:INT32,replicas:ARRAY(INT32),isr:ARRAY(INT32)})}) |
null |
topic元数据,分区数、leader broker的id、副本所在broker id列表、isr broker id列表 |
broker端处理
在broker端
- 过滤出授权的topics KafkaApis.handleTopicMetadataRequest
- 查询出授权topics的元数据 KafkaApis.getTopicMetadata
2.1 从缓存中拿,拿到(跟topics的size相同)即返回
2.2 处理没拿到的topic
2.2.1 允许创建topic的,就按默认副本数和默认分区数创建,不能创建的或者创建出错的返回出错信息。创建topic前提是协调者可用。否则COORDINATOR_NOT_AVAILABLE。
2.2.2 返回创建后的metadata
元数据信息有缓存 kafka.server.MetadataCache.cache
:topic <--> [partitionNo <--> 分区状态封装]
MetadataCache
中一系列getxxx方法都是用来读取检索的。
元数据缓存的更新参见 《MetadataCache更新》
FindCoordinatorRequest
查询协调者
请求 version:1
name |
type |
defaultValue |
docString |
coordinator_key |
STRING |
null |
组协调时是组id 事务协调时是事务id |
coordinator_type |
INT8 |
null |
协调类型(0 = group, 1 = transaction) |
响应 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
error_message |
NULLABLE_STRING |
null |
Response error message |
coordinator |
{node_id:INT32,host:STRING,port:INT32} |
null |
协调者broker的id ip port |
组协调与事务协调都用这个请求
coordinatorKey 组协调是组id 事务协调时是事务id
分区对应的leader节点就是组协调者
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
.map(_.leader) // SimonNote: leader节点作为协调者
这个请求的响应也就是将协调者信息(node_id,host,port)返回去
JoinGroupRequest
加入消费组的请求
请求 version:2
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
唯一的组标志 |
session_timeout |
INT32 |
null |
会话时间,超过这个时间没收到心跳,协调者就认为这个消费者挂了 |
rebalance_timeout |
INT32 |
null |
协调者在重新平衡组时等待每个成员重新加入的最长时间 |
member_id |
STRING |
null |
由组协调者分配的成员ID,如果是第一次加入,则为空。 |
protocol_type |
STRING |
null |
组协调协议实现类的唯一名称 |
group_protocols |
ARRAY({protocol_name:STRING,protocol_metadata:BYTES}) |
null |
组成员能支持的组协调协议列表 |
响应 version:2
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
generation_id |
INT32 |
null |
组的年代? |
group_protocol |
STRING |
null |
协调者选中的组协议 |
leader_id |
STRING |
null |
组中的leader |
member_id |
STRING |
null |
第一次加入的时候组协调者给分的成员id |
members |
ARRAY({member_id:STRING,member_metadata:BYTES}) |
null |
组内成员? |
kafka.coordinator.group.GroupCoordinator.handleJoinGroup
一系列的check:
- 协调者是否可用
- 是否是本分区的协调者
- 消费组id是否合法(是否为空)
- 是否协调者正在load中,
GroupMetadataManager
会管理当前partition是否在load中
- sessionTimeoutMs是否在组配置的最大最小范围内
向groupManager
加入新建的GroupMetadata
实例(如果没有的话,有就直接下一步了),GroupMetadata
有哪些东西,下面注释写了一部分,但是还包含事务消息用一些offset
/**
* Group contains the following metadata:
*
* Membership metadata:
* 1. Members registered in this group
* 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
* 3. Protocol metadata associated with group members
*
* State metadata:
* 1. group state
* 2. generation id
* 3. leader id
*/
doJoinGroup
一系列的check后,根据group.currentState
做相应处理
group.currentState
GroupMetadata.scala
中有对group状态定义及action及走向到哪的明确详细描述,非常重要
SyncGroupRequest
请求 version:1
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group唯一标志 |
generation_id |
INT32 |
null |
代的标志? |
member_id |
STRING |
null |
第一次加入的时候组协调者给分的成员id |
group_assignment |
ARRAY({member_id:STRING,member_assignment:BYTES}) |
null |
null |
响应 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
member_assignment |
BYTES |
null |
null |
OffsetFetchRequest
请求 version:3
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group id |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32})}) |
null |
topic列表,支持多个topic |
响应 version:3
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,offset:INT64,metadata:NULLABLE_STRING,error_code:INT16})}) |
null |
列表:topic-[{分区号-offset,元数据信息}] |
error_code |
INT16 |
null |
Response error code |
ListOffsetRequest
请求 version:2
name |
type |
defaultValue |
docString |
replica_id |
INT32 |
null |
follower的broker的id. 正常消费用-1. |
isolation_level |
INT8 |
null |
事务消息可见性设置。 使用 READ_UNCOMMITTED (isolation_level = 0)能看到所有消息. 使用 READ_COMMITTED (isolation_level = 1), 非事务消息和已经提交的消息能被看到. 更具体一点, READ_COMMITTED 返回比当前 LSO (last stable offset)小的offset, 并允许返回已经取消的事务 |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,timestamp:INT64})}) |
null |
列表:topic,partitions{分区号,时间戳} |
响应 version:2
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,error_code:INT16,timestamp:INT64,offset:INT64})}) |
null |
列表:topic 分区号 错误码 时间戳 offset |
FetchRequest
请求 version:6
name |
type |
defaultValue |
docString |
replica_id |
INT32 |
null |
follower的broker的id. 正常消费用-1 |
max_wait_time |
INT32 |
null |
等待响应的最大时间 单位ms. |
min_bytes |
INT32 |
null |
最小字节 |
max_bytes |
INT32 |
null |
最大字节. 单条消息如果超过这个大小也将返回 |
isolation_level |
INT8 |
null |
事务隔离级别 |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,fetch_offset:INT64,log_start_offset:INT64,max_bytes:INT32})}) |
null |
列表: topic 分区号 取的offset log开始的 offset?? 最大字节. |
响应 version:6
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition_header:{partition:INT32,error_code:INT16,high_watermark:INT64,last_stable_offset:INT64,log_start_offset:INT64,aborted_transactions:ARRAY({producer_id:INT64,first_offset:INT64})},record_set:RECORDS})}) |
null |
列表: topic 分区头: 分区号 高水位值 LSO(上次稳定offset), log开始offset,取消事务:生产者id 第一个offset。 消息记录集 |
HeartbeatRequest
请求 version:1
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group id |
generation_id |
INT32 |
null |
group的年代 |
member_id |
STRING |
null |
第一次加入的时候组协调者给分的成员id |
响应 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
响应码 |
kafka-clients 1.0 高阶API消费消息(未完)