处理流程:
方法原型:
(defn sync-processes [supervisor])
函数说明:
Supervisor是一个supervisor-data对象.
- 从local-state中获取LS_LOCAL_ASSIGNMENTS集合<port,Assignment>集合,保存到assigned-executors。
- 调用read-allocated-workers获取当前已经分配的Worker信息,<worker-id,<worker state,worker heartbeat>>集合,保存到allocated,其中记录了与当前分配的Worker相对应的状态和心态信息。
- 从allocated中过滤,保留其中worker-state为:valid的Worker,保存到keepers。
- 从keepers数据项中的心跳信息中获取其所对应的端口信息,保存到keep-ports.
- 根据assigned-executor和keep-ports,确定需重新分配的executor信息,返回<port,Assignment>集合,保存到reassign-executors.
- 为reassign-executors中的每个端口创建一个新的worker-id,返回<port,worker-id> 集合,保存到new-worker-ids.
- 对allocated集合中worker-state不为:valid的Worker调用shutdown-worker方法 关闭。
- 为new-worker-ids集合中的每个worker-id创建pid文件夹,路径为STORM_LOCAL_DIR/workers/<worker-id>/pids/
- 更新local-state中存储的LS_APPROVED-WORKERS信息。 获取当前local-state中存储的LS-APPROVED-WORKERS信息,再根据 将keepers过滤出有效的Worker信息最后与new-worker-ids合并进行保存到local-state。
- 调用launch-worker方法启动 Worker,返回启动的Worker的worker-id,最后调用wait-for-workers-launch方法等待这些Worker启动起来.
Supervisor启动流程