kafka java 客户端发送请求,大量使用 RequestFuture,因此先说明下该类。
RequestFuture 类的成员属性 listeners 是 RequestFutureListener 的集合,调用 complete 方法,会触发 listener 的 onSuccess 方法。
public void complete(T value) { try { if (value instanceof RuntimeException) throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException"); if (!result.compareAndSet(INCOMPLETE_SENTINEL, value)) throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); fireSuccess(); } finally { completedLatch.countDown(); } } private void fireSuccess() { T value = value(); while (true) { RequestFutureListener<T> listener = listeners.poll(); if (listener == null) break; listener.onSuccess(value); } }
值得关注的是 compose 和 chain 方法,这两个方法均是为当前 RequestFuture 添加 listener,listener 的 onSuccess 又是调用另一个 RequestFuture 的方法。
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { // 创建新的 RequestFuture 对象 final RequestFuture<S> adapted = new RequestFuture<>(); // 为旧的 RequestFuture 添加 listener addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); // 返回新的 RequestFuture 对象 return adapted; } public void chain(final RequestFuture<T> future) { // 为当前 RequestFuture 添加 listener addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { future.complete(value); } @Override public void onFailure(RuntimeException e) { future.raise(e); } }); }
rebalance 入口在 ConsumerCoordinator#poll
客户端判断是否需要重新加入组,即 rebalance
//ConsumerCoordinator#needRejoin public boolean needRejoin() { if (!subscriptions.partitionsAutoAssigned()) return false; // 所订阅 topic 的分区数量发生变化 // we need to rejoin if we performed the assignment and metadata has changed if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) return true; // 所订阅的 topic 发生变化 // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) return true; // 消费者加入组,或退出组,由心跳线程设置 rejoinNeeded = true return super.needRejoin(); }
消费者开始 rebalance
// AbstractCoordinator#joinGroupIfNeeded void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { ensureCoordinatorReady(); if (needsJoinPrepare) { // 调用用户传入的 ConsumerRebalanceListener onJoinPrepare(generation.generationId, generation.memberId); needsJoinPrepare = false; } // 发送 join group 的请求 RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future); if (future.succeeded()) { onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value()); resetJoinGroupFuture(); needsJoinPrepare = true; } else { resetJoinGroupFuture(); RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue; else if (!future.isRetriable()) throw exception; time.sleep(retryBackoffMs); } } }
AbstractCoordinator#initiateJoinGroup
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() { if (joinFuture == null) { disableHeartbeatThread(); state = MemberState.REBALANCING; joinFuture = sendJoinGroupRequest(); joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info("Successfully joined group with generation {}", generation.generationId); state = MemberState.STABLE; rejoinNeeded = false; if (heartbeatThread != null) heartbeatThread.enable(); } } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED; } } }); } return joinFuture; }
AbstractCoordinator#sendJoinGroupRequest
private RequestFuture<ByteBuffer> sendJoinGroupRequest() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator log.info("(Re-)joining group"); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( groupId, this.sessionTimeoutMs, this.generation.memberId, protocolType(), metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs); log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator); return client.send(coordinator, requestBuilder) .compose(new JoinGroupResponseHandler()); }
重点关注 client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
为老的 RequestFuture 添加 listener,返回新的 RequestFuture
ConsumerNetworkClient#send
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) { long now = time.milliseconds(); // 使用 RequestFutureCompletionHandler 作为回调函数 RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, completionHandler); unsent.put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); return completionHandler.future; }
JoinGroupResponseHandler#handle
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = joinResponse.error(); if (error == Errors.NONE) { log.debug("Received successful JoinGroup response: {}", joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) { // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); } else { AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol()); if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } } } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator()); // backoff and retry future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately resetGeneration(); log.debug("Attempt to join group failed due to unknown member id."); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff markCoordinatorUnknown(); log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception log.error("Attempt to join group failed due to fatal error: {}", error.message()); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } }
收到响应后,最终的执行流是 RequestFutureCompletionHandler -> JoinGroupResponseHandler#handle
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members()); SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment); log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } catch (RuntimeException e) { return RequestFuture.failure(e); } } private RequestFuture<ByteBuffer> onJoinFollower() { // send follower's sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, Collections.<String, ByteBuffer>emptyMap()); log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); return client.send(coordinator, requestBuilder) .compose(new SyncGroupResponseHandler()); }
用 RequestFuture 把 JoinGroupResponseHandler 和 SyncGroupResponseHandler 串联起来了
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { Errors error = syncResponse.error(); if (error == Errors.NONE) { sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment()); } else { requestRejoin(); if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("SyncGroup failed because the group began another rebalance"); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { log.debug("SyncGroup failed: {}", error.message()); resetGeneration(); future.raise(error); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { log.debug("SyncGroup failed: {}", error.message()); markCoordinatorUnknown(); future.raise(error); } else { future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } } }
rebalance 过程最后的 listener
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info("Successfully joined group with generation {}", generation.generationId); state = MemberState.STABLE; rejoinNeeded = false; if (heartbeatThread != null) heartbeatThread.enable(); } } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED; } } });