Flink源码解析(三)TaskManager启动分析

1、Flink主节点TaskManager启动分析:

        TaskManager是Flink的worker节点,它负责Flink中本机slot资源的管理以及具体task的执行。
        TaskManager上的基本资源单位是slot,一个作业的task最终会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并来与FlinkMaster和JobManager通信。
        根据前面的启动分析:TaskManager的启动主类:TaskManagerRunner。

2、源码分析

代码执行的大致流程如下:

TaskManagerRunner.main()
    runTaskManagerSecurely(args, ResourceID.generate());
        # 加载配置
        Configuration configuration = loadConfiguration(args);
        # 启动 TaskManager
        runTaskManagerSecurely(configuration, resourceID);
            # 启动 TaskManager
            runTaskManager(configuration, resourceID, pluginManager);
                # 构建 TaskManagerRunner 实例
                taskManagerRunner = new TaskManagerRunner(...);
                    # 初始化一个线程池
                    this.executor = Executors.newScheduledThreadPool(....)
                    # 获取高可用模式
                    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...)
                    # 创建 RPC 服务
                    rpcService = createRpcService(configuration,
                    highAvailabilityServices);
                    # 创建心跳服务
                    heartbeatServices = HeartbeatServices.fromConfiguration(conf);
                    # 创建 BlobCacheService
                    blobCacheService = new BlobCacheService(....)
                    # 创建 TaskManager
                    taskManager = startTaskManager(.....)
                    # 初始化 TaskManagerServices
                    taskManagerServices = TaskManagerServices.fromConfiguration(...)
                    # 初始化 TaskEventDispatcher
                    taskEventDispatcher = new TaskEventDispatcher();
                    # 初始化 IOManagerASync
                    ioManager = new IOManagerAsync(...)
                    # 初始化 NettyShuffleEnvironment
                    shuffleEnvironment = createShuffleEnvironment(...)
                    # 初始化 KVStageService
                    kvStateService =
                    KvStateService.fromConfiguration(...)
                    # 初始化 BroadCastVariableManager
                    broadcastVariableManager = new BroadcastVariableManager();
                    # 初始化 TaskSlotTable
                    taskSlotTable = createTaskSlotTable(...)
                    # 初始化 DefaultJobTable
                    jobTable = DefaultJobTable.create();
                    # 初始化 JobLeaderService
                    jobLeaderService = new DefaultJobLeaderService(....)
                    # 初始化 TaskStateManager
                    taskStateManager = new TaskExecutorLocalStateStoresManager()
                    # 初始化 LibraryCacheManager
                    libraryCacheManager = new BlobLibraryCacheManager()
                    # 返回
                    return new TaskManagerServices(....)
                # 初始化一个 TaskExecutor
                return new TaskExecutor(.....)
                    # 初始化心跳管理器:jobManagerHeartbeatManager
                    this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,resourceId);
                    # 初始化心跳管理器:resourceManagerHeartbeatManager
                    this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices,resourceId);
                    # 转到 TaskExecutor 的 onStart() 方法
                    TaskExecutor.onStart();
                        startTaskExecutorServices();
        # 启动 TaskManagerRunner
        taskManagerRunner.start();

 TaskManagerRunner的启动大致分为三类比较重要的:

  • 一些基础服务
  • TaskManagerService
  • TaskExecutor

上一篇:Flink计算资源的调度是如何实现的?


下一篇:第十七篇:生产环境中的并行度和资源配置