1、Flink主节点TaskManager启动分析:
TaskManager是Flink的worker节点,它负责Flink中本机slot资源的管理以及具体task的执行。
TaskManager上的基本资源单位是slot,一个作业的task最终会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并来与FlinkMaster和JobManager通信。
根据前面的启动分析:TaskManager的启动主类:TaskManagerRunner。
2、源码分析
代码执行的大致流程如下:
TaskManagerRunner.main()
runTaskManagerSecurely(args, ResourceID.generate());
# 加载配置
Configuration configuration = loadConfiguration(args);
# 启动 TaskManager
runTaskManagerSecurely(configuration, resourceID);
# 启动 TaskManager
runTaskManager(configuration, resourceID, pluginManager);
# 构建 TaskManagerRunner 实例
taskManagerRunner = new TaskManagerRunner(...);
# 初始化一个线程池
this.executor = Executors.newScheduledThreadPool(....)
# 获取高可用模式
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...)
# 创建 RPC 服务
rpcService = createRpcService(configuration,
highAvailabilityServices);
# 创建心跳服务
heartbeatServices = HeartbeatServices.fromConfiguration(conf);
# 创建 BlobCacheService
blobCacheService = new BlobCacheService(....)
# 创建 TaskManager
taskManager = startTaskManager(.....)
# 初始化 TaskManagerServices
taskManagerServices = TaskManagerServices.fromConfiguration(...)
# 初始化 TaskEventDispatcher
taskEventDispatcher = new TaskEventDispatcher();
# 初始化 IOManagerASync
ioManager = new IOManagerAsync(...)
# 初始化 NettyShuffleEnvironment
shuffleEnvironment = createShuffleEnvironment(...)
# 初始化 KVStageService
kvStateService =
KvStateService.fromConfiguration(...)
# 初始化 BroadCastVariableManager
broadcastVariableManager = new BroadcastVariableManager();
# 初始化 TaskSlotTable
taskSlotTable = createTaskSlotTable(...)
# 初始化 DefaultJobTable
jobTable = DefaultJobTable.create();
# 初始化 JobLeaderService
jobLeaderService = new DefaultJobLeaderService(....)
# 初始化 TaskStateManager
taskStateManager = new TaskExecutorLocalStateStoresManager()
# 初始化 LibraryCacheManager
libraryCacheManager = new BlobLibraryCacheManager()
# 返回
return new TaskManagerServices(....)
# 初始化一个 TaskExecutor
return new TaskExecutor(.....)
# 初始化心跳管理器:jobManagerHeartbeatManager
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,resourceId);
# 初始化心跳管理器:resourceManagerHeartbeatManager
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices,resourceId);
# 转到 TaskExecutor 的 onStart() 方法
TaskExecutor.onStart();
startTaskExecutorServices();
# 启动 TaskManagerRunner
taskManagerRunner.start();
TaskManagerRunner的启动大致分为三类比较重要的:
- 一些基础服务
- TaskManagerService
- TaskExecutor