从消费者看 rebalance

kafka java 客户端发送请求,大量使用 RequestFuture,因此先说明下该类。

RequestFuture 类的成员属性 listeners 是 RequestFutureListener 的集合,调用 complete 方法,会触发 listener 的 onSuccess 方法。

public void complete(T value) {
    try {
        if (value instanceof RuntimeException)
            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");

        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        fireSuccess();
    } finally {
        completedLatch.countDown();
    }
}

private void fireSuccess() {
    T value = value();
    while (true) {
        RequestFutureListener<T> listener = listeners.poll();
        if (listener == null)
            break;
        listener.onSuccess(value);
    }
}

值得关注的是 compose 和 chain 方法,这两个方法均是为当前 RequestFuture 添加 listener,listener 的 onSuccess 又是调用另一个 RequestFuture 的方法。

public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
    // 创建新的 RequestFuture 对象
    final RequestFuture<S> adapted = new RequestFuture<>();
    // 为旧的 RequestFuture 添加 listener
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            adapter.onSuccess(value, adapted);
        }

        @Override
        public void onFailure(RuntimeException e) {
            adapter.onFailure(e, adapted);
        }
    });
    // 返回新的 RequestFuture 对象
    return adapted;
}

public void chain(final RequestFuture<T> future) {
    // 为当前 RequestFuture 添加 listener
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            future.complete(value);
        }

        @Override
        public void onFailure(RuntimeException e) {
            future.raise(e);
        }
    });
}

rebalance 入口在 ConsumerCoordinator#poll

客户端判断是否需要重新加入组,即 rebalance

//ConsumerCoordinator#needRejoin
public boolean needRejoin() {
    if (!subscriptions.partitionsAutoAssigned())
        return false;

    // 所订阅 topic 的分区数量发生变化
    // we need to rejoin if we performed the assignment and metadata has changed
    if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
        return true;

    // 所订阅的 topic 发生变化
    // we need to join if our subscription has changed since the last join
    if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
        return true;

    // 消费者加入组,或退出组,由心跳线程设置 rejoinNeeded = true
    return super.needRejoin();
}

消费者开始 rebalance

// AbstractCoordinator#joinGroupIfNeeded
void joinGroupIfNeeded() {
    while (needRejoin() || rejoinIncomplete()) {
        ensureCoordinatorReady();

        if (needsJoinPrepare) {
            // 调用用户传入的 ConsumerRebalanceListener
            onJoinPrepare(generation.generationId, generation.memberId);
            needsJoinPrepare = false;
        }

        // 发送 join group 的请求
        RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future);

        if (future.succeeded()) {
            onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());

            resetJoinGroupFuture();
            needsJoinPrepare = true;
        } else {
            resetJoinGroupFuture();
            RuntimeException exception = future.exception();
            if (exception instanceof UnknownMemberIdException ||
                    exception instanceof RebalanceInProgressException ||
                    exception instanceof IllegalGenerationException)
                continue;
            else if (!future.isRetriable())
                throw exception;
            time.sleep(retryBackoffMs);
        }
    }
}

AbstractCoordinator#initiateJoinGroup

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    if (joinFuture == null) {
        disableHeartbeatThread();

        state = MemberState.REBALANCING;
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
            @Override
            public void onSuccess(ByteBuffer value) {
                // handle join completion in the callback so that the callback will be invoked
                // even if the consumer is woken up before finishing the rebalance
                synchronized (AbstractCoordinator.this) {
                    log.info("Successfully joined group with generation {}", generation.generationId);
                    state = MemberState.STABLE;
                    rejoinNeeded = false;

                    if (heartbeatThread != null)
                        heartbeatThread.enable();
                }
            }

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.UNJOINED;
                }
            }
        });
    }
    return joinFuture;
}

AbstractCoordinator#sendJoinGroupRequest

private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();

    // send a join group request to the coordinator
    log.info("(Re-)joining group");
    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
            groupId,
            this.sessionTimeoutMs,
            this.generation.memberId,
            protocolType(),
            metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);

    log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
    return client.send(coordinator, requestBuilder)
            .compose(new JoinGroupResponseHandler());
}

重点关注 client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
为老的 RequestFuture 添加 listener,返回新的 RequestFuture

ConsumerNetworkClient#send

public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
    long now = time.milliseconds();
    // 使用 RequestFutureCompletionHandler 作为回调函数
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
            completionHandler);
    unsent.put(node, clientRequest);

    // wakeup the client in case it is blocking in poll so that we can send the queued request
    client.wakeup();
    return completionHandler.future;
}

JoinGroupResponseHandler#handle

public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
    Errors error = joinResponse.error();
    if (error == Errors.NONE) {
        log.debug("Received successful JoinGroup response: {}", joinResponse);
        sensors.joinLatency.record(response.requestLatencyMs());

        synchronized (AbstractCoordinator.this) {
            if (state != MemberState.REBALANCING) {
                // if the consumer was woken up before a rebalance completes, we may have already left
                // the group. In this case, we do not want to continue with the sync group.
                future.raise(new UnjoinedGroupException());
            } else {
                AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
                        joinResponse.memberId(), joinResponse.groupProtocol());
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            }
        }
    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
        log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
        // backoff and retry
        future.raise(error);
    } else if (error == Errors.UNKNOWN_MEMBER_ID) {
        // reset the member id and retry immediately
        resetGeneration();
        log.debug("Attempt to join group failed due to unknown member id.");
        future.raise(Errors.UNKNOWN_MEMBER_ID);
    } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
            || error == Errors.NOT_COORDINATOR) {
        // re-discover the coordinator and retry with backoff
        markCoordinatorUnknown();
        log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
        future.raise(error);
    } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
            || error == Errors.INVALID_SESSION_TIMEOUT
            || error == Errors.INVALID_GROUP_ID) {
        // log the error and re-throw the exception
        log.error("Attempt to join group failed due to fatal error: {}", error.message());
        future.raise(error);
    } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
        future.raise(new GroupAuthorizationException(groupId));
    } else {
        // unexpected error, throw the exception
        future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
    }
}

收到响应后,最终的执行流是 RequestFutureCompletionHandler -> JoinGroupResponseHandler#handle

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // perform the leader synchronization and send back the assignment for the group
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                joinResponse.members());

        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
        log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}

private RequestFuture<ByteBuffer> onJoinFollower() {
    // send follower's sync group with an empty assignment
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
                    Collections.<String, ByteBuffer>emptyMap());
    log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
    return sendSyncGroupRequest(requestBuilder);
}


private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();
    return client.send(coordinator, requestBuilder)
            .compose(new SyncGroupResponseHandler());
}

用 RequestFuture 把 JoinGroupResponseHandler 和 SyncGroupResponseHandler 串联起来了

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
    @Override
    public void handle(SyncGroupResponse syncResponse,
                       RequestFuture<ByteBuffer> future) {
        Errors error = syncResponse.error();
        if (error == Errors.NONE) {
            sensors.syncLatency.record(response.requestLatencyMs());
            future.complete(syncResponse.memberAssignment());
        } else {
            requestRejoin();

            if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("SyncGroup failed because the group began another rebalance");
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID
                    || error == Errors.ILLEGAL_GENERATION) {
                log.debug("SyncGroup failed: {}", error.message());
                resetGeneration();
                future.raise(error);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.debug("SyncGroup failed: {}", error.message());
                markCoordinatorUnknown();
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
            }
        }
    }
}

 rebalance 过程最后的 listener

joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
    @Override
    public void onSuccess(ByteBuffer value) {
        // handle join completion in the callback so that the callback will be invoked
        // even if the consumer is woken up before finishing the rebalance
        synchronized (AbstractCoordinator.this) {
            log.info("Successfully joined group with generation {}", generation.generationId);
            state = MemberState.STABLE;
            rejoinNeeded = false;

            if (heartbeatThread != null)
                heartbeatThread.enable();
        }
    }

    @Override
    public void onFailure(RuntimeException e) {
        // we handle failures below after the request finishes. if the join completes
        // after having been woken up, the exception is ignored and we will rejoin
        synchronized (AbstractCoordinator.this) {
            state = MemberState.UNJOINED;
        }
    }
});

 

上一篇:6.Linux文本处理


下一篇:如何修改Windows管理员用户的系统文件夹名