Storm-源码分析-Topology Submit-Supervisor


(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
  (log-message "Starting Supervisor with conf " conf)
  (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) ;;初始化supervisor-id,并存在localstate中(参考ISupervisor的实现)
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) ;;清空本机的supervisor目录 (let [supervisor (supervisor-data conf shared-context isupervisor)
;;创建两个event-manager,用于在后台执行function [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] sync-processes (partial sync-processes supervisor) ;;partial sync-process ;;mk-synchronize-supervisor, mk-supervisor的主要工作,参考下面
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) ;;定义生成supervisor hb的funciton
heartbeat-fn (fn [] (.supervisor-heartbeat! (:storm-cluster-state supervisor) (:supervisor-id supervisor) (SupervisorInfo. (current-time-secs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] ;;先调用heartbeat-fn发送一次supervisor的hb
    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
    (schedule-recurring (:timer supervisor)
                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)


supervisor很简单, 主要管两件事, 
当assignment发生变化时, 从nimbus同步topology的代码到本地 
当assignment发生变化时, check workers状态, 保证被分配的work的状态都是valid

1. 当assignment发生变化时触发 
    怎样通过zookeeper的watcher实现这个反复触发机制, 参考
Storm-源码分析- Storm中Zookeeper的使用

2. 因为比较耗时, 后台执行 
    创建两个event-manager, 分别用于后台执行mk-synchronize-supervisor和sync-processes

mk-synchronize-supervisor, 比较特别的是内部用了一个有名字的匿名函数this来封装这个函数体 
刚开始看到非常诧异, 其实目的是为了可以在sync-callback中将这个函数add到event-manager里面去 
即每次被调用, 都需要再一次把sync-callback注册到zk, 以保证下次可以被继续触发


(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
  (fn this []
    (let [conf (:conf supervisor)
          storm-cluster-state (:storm-cluster-state supervisor)
          ^ISupervisor isupervisor (:isupervisor supervisor)
          ^LocalState local-state (:local-state supervisor) ;;本地缓存数据库
          sync-callback (fn [& ignored] (.add event-manager this)) ;;生成callback函数(后台执行mk-synchronize-supervisor)
          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) ;;读取assignments,并注册callback,在zk->assignment发生变化时被触发
          storm-code-map (read-storm-code-locations assignments-snapshot) ;;从哪儿下载topology code
          downloaded-storm-ids (set (read-downloaded-storm-ids conf)) ;;已经下载了哪些topology
          all-assignment (read-assignments  ;;supervisor的port上被分配了哪些executors
                           (:assignment-id supervisor)) ;;supervisor-id
          new-assignment (->> all-assignment ;;new=all,因为confirmAssigned没有具体实现,always返回true
                              (filter-key #(.confirmAssigned isupervisor %)))
          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) ;;supervisor上被分配的topology id集合
           existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] ;;从local-state数据库里面读出当前保存的local assignments
      (doseq [[storm-id master-code-dir] storm-code-map]
        (when (and (not (downloaded-storm-ids storm-id))
                   (assigned-storm-ids storm-id))
         (download-storm-code conf storm-id master-code-dir)))     
      (.put local-state     ;;把new-assignment存到local-state数据库中
      (reset! (:curr-assignment supervisor) new-assignment) ;;把new-assignment cache到supervisor对象中
      ;;删除无用的topology code 
;;remove any downloaded code that's no longer assigned or active (doseq [storm-id downloaded-storm-ids] (when-not (assigned-storm-ids storm-id) (log-message "Removing code for storm id " storm-id) (rmr (supervisor-stormdist-root conf storm-id)) ))
      (.add processes-event-manager sync-processes)


sync-processes用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers 
首先从本地读出workers的hb, 来判断work状况, shutdown所有状态非valid的workers 
并为被assignment, 而worker状态非valid的slot, 创建新的worker

(defn sync-processes [supervisor]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
        now (current-time-secs)
        allocated (read-allocated-workers supervisor assigned-executors now) ;;1.读取当前worker的状况
        keepers (filter-val     ;;找出状态为valid的worker
                 (fn [[state _]] (= state :valid))
        keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) ;;keepers的ports集合
         ;;select-keys-pred(pred map), 对map中的key使用pred进行过滤
         ;;找出assigned-executors中executor的port, 哪些不属于keep-ports, 
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) new-worker-ids (into {} (for [port (keys reassign-executors)] ;;为reassign-executors的port产生新的worker-id [port (uuid)])) ] ;; 1. to kill are those in allocated that are dead or disallowed ;; 2. kill the ones that should be dead ;; - read pids, kill -9 and individually remove file ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) ;; 3. of the rest, figure out what assignments aren't yet satisfied ;; 4. generate new worker ids, write new "approved workers" to LS ;; 5. create local dir for worker id ;; 5. launch new workers (give worker-id, port, and supervisor-id) ;; 6. wait for workers launch (doseq [[id [state heartbeat]] allocated] (when (not= :valid state) ;;shutdown所有状态不是valid的worker (shutdown-worker supervisor id))) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) ;;为新的worker创建目录, 并加到local-state的LS-APPROVED-WORKERS中 (.put local-state LS-APPROVED-WORKERS ;;更新的approved worker, 状态为valid的 + new workers (merge (select-keys (.get local-state LS-APPROVED-WORKERS) ;;现有approved worker中状态为valid (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) ;;new workers )) (wait-for-workers-launch ;;2.wait-for-workers-launch conf (dofor [[port assignment] reassign-executors] (let [id (new-worker-ids port)] (launch-worker supervisor (:storm-id assignment) port id) id))) ))

1. read-allocated-workers

(defn read-allocated-workers
  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
  [supervisor assigned-executors now]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
;从local-state中读出每个worker的hb, 当然每个worker进程会不断的更新本地hb id->heartbeat (read-worker-heartbeats conf)
        approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] ;;从local-state读出approved的worker
     (dofor [[id hb] id->heartbeat] ;;根据hb来判断worker的当前状态
            (let [state (cond
                         (or (not (contains? approved-ids id))
                             (not (matches-an-assignment? hb assigned-executors)))
                           :disallowed  ;;不被允许
                         (not hb)
                           :not-started ;;无hb,没有start
                         (> (- now (:time-secs hb))
                            (conf SUPERVISOR-WORKER-TIMEOUT-SECS))
                           :timed-out  ;;超时,dead
              (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
              [id [state hb]] ;;返回每个worker的当前state和hb


2. wait-for-workers-launch


最终调用wait-for-workers-launch, 等待worder被成功launch

逻辑也比较简单, check hb, 如果没有就不停的sleep, 至到超时, 打印failed to start

(defn- wait-for-worker-launch [conf id start-time]
  (let [state (worker-state conf id)]    
    (loop []
      (let [hb (.get state LS-WORKER-HEARTBEAT)]
        (when (and
               (not hb)
                (- (current-time-secs) start-time)
          (log-message id " still hasn't started")
          (Time/sleep 500)
    (when-not (.get state LS-WORKER-HEARTBEAT)
      (log-message "Worker " id " failed to start")

(defn- wait-for-workers-launch [conf ids]
  (let [start-time (current-time-secs)]
    (doseq [id ids]
      (wait-for-worker-launch conf id start-time))


