Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。
首先,我们先看下它的成员变量:
/* the state of each nodes connection */ // 每个节点连接的状态KafkaClient实例client private final KafkaClient client; /* the record accumulator that batches records */ // 批量记录的记录累加器RecordAccumulator实例accumulator private final RecordAccumulator accumulator; /* the metadata for the client */ // 客户端元数据Metadata实例metadata private final Metadata metadata; /* the maximum request size to attempt to send to the server */ // 试图发送到server端的最大请求大小maxRequestSize private final int maxRequestSize; /* the number of acknowledgements to request from the server */ // 从server端获得的请求发送的已确认数量acks private final short acks; /* the number of times to retry a failed request before giving up */ // 一个失败请求在被放弃之前的重试次数retries private final int retries; /* the clock instance used for getting the time */ // 获取时间的时钟Time实例time private final Time time; /* true while the sender thread is still running */ // Sender线程运行的标志位,为true表示Sender线程一直在运行 private volatile boolean running; /* true when the caller wants to ignore all unsent/inflight messages and force close. */ // 强制关闭的标志位forceClose private volatile boolean forceClose; /* metrics */ // 度量指标 private final SenderMetrics sensors; /* param clientId of the client */ // 客户端的clientId private String clientId; /* the max time to wait for the server to respond to the request*/ // 等到server端响应请求的超时时间requestTimeout private final int requestTimeout;既然是一个线程,我们看下它的主要运行逻辑run()方法,代码如下:
/** * The main run loop for the sender thread * sender线程的主循环 */ public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called // 主循环,一直运行直到close被调用 while (running) {// 标志位running为true,则一直循环 try { // 调用待参数的run()方法 run(time.milliseconds()); } catch (Exception e) { // 截获异常后记录err级别log信息,输出异常 log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求 while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { try { // 调用调用待参数的run()方法继续处理 run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求 if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.abortIncompleteBatches(); } // 关闭客户端 try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }Sender线程的主循环在run()方法内,其主要处理逻辑为:
1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:
调用带参数的run(long now)方法,处理消息的发送;
2、当close被调用时,running被设置为false,while主循环退出:
2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;
2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;
2.3、关闭客户端。