Kafka Broker源码解析二:API层设计

一、简介

  • 版本:1.1.1
  • API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理
  • API层,创建了个线程用于进行逻辑处理、IO操作,所有的API行为均线程中完成

二、整体架构

2.1 核心逻辑

相对于网络层,API层的实现相当简单,只是对底层实现的封装,得益于此,从API层几乎就能了解到Kafka broker所能提供的全部功能

  1. 启动num.io.threads个RequestHandler
  2. 每个RequestHandler都从全局requestQueue中poll Request
  3. 根据Request的ApiKeys调用对应的handle*Api进行实际的业务逻辑处理

2.2 核心类、方法介绍

KafkaRequestHandler
   |-- run()
KafkaApis
   |-- handle()              // 所有请求的入口,根据ApiKeys分发请求

三、核心流程分析

3.1 启动流程

启动流程前半部分和网络层一致,这里不再赘述

// KafkaServer.scala
def startup() {
    // 初始化api层
    apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
      kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
      fetchManager, brokerTopicStats, clusterId, time, tokenManager)
    // 初始化IO线程池
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
      config.numIoThreads)
}

// KafkaRequestHandler.scala
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {
    val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
    for (i <- 0 until numThreads) {
        createHandler(i)
    }
    
    def createHandler(id: Int): Unit = synchronized {
        runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
        KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
    }
}

实际就是实例化API层的入口,再启动num.io.threads个RequestHandler线程,用于实际处理请求,值得注意的是这里的线程池,其实不是真正意义上的线程池,线程数目是固定的,只有通过动态参数改变线程池大小时,才会重新调整线程数目

def resizeThreadPool(newSize: Int): Unit = synchronized {
    val currentSize = threadPoolSize.get
    info(s"Resizing request handler thread pool size from $currentSize to $newSize")
    if (newSize > currentSize) {
      // 补上差额的线程
      for (i <- currentSize until newSize) {
        createHandler(i)
      }
    } else if (newSize < currentSize) {
      for (i <- 1 to (currentSize - newSize)) {
        // 关闭部分线程直到达到目标值
        runnables.remove(currentSize - i).stop()
      }
    }
    threadPoolSize.set(newSize)
}

3.2 请求分发流程

RequestHandler线程启动后,每个线程都一直从网络层获取Request再交给KafkaApis进行请求分发

// KafkaRequestHandler.scala
def run() {
    while (!stopped) {
      // 从全局的阻塞队列里面拉取请求(网络层处理完并反序列化后转换成request)
      val req = requestChannel.receiveRequest(300)
      req match {
        // 关闭命令
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return
        // 正常请求
        case request: RequestChannel.Request =>
          // 请求路由分发
          apis.handle(request)

        case null => // continue
      }
    }
    shutdownComplete.countDown()
}

// RequestChannel.scala
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
    requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
  
// KafkaApis.scala
def handle(request: RequestChannel.Request) {
    // 只是根据request中的apiKey分发请求,实际处理逻辑由对应的handle*方法实现
    request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
        case ApiKeys.END_TXN => handleEndTxnRequest(request)
        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
    }
}
  1. RequestHandler从全局阻塞队列获取网络层组装完的Request,并调用KafkaApi的handle方法
  2. handle方法类似web开发中的dispatch,根据ApiKeys调用对应的handle*执行实际业务逻辑

可以看到API层只是对执行线程、根据ApiKeys进行请求路由的封装。实际逻辑由KafkaApis类中的handle*方法实现,而其中最核心的方法为PRODUCE与FETCH请求,分别对应消息的生产与消费,下面的文章将专门针对这两种API进行源码分析

同类文章

Kafka Broker源码解析二:API层设计

上一篇:Photoshop将外景人物图片打造唯美的韩系冷色调


下一篇:创建一个简单的API项目