Supervisor(二)

2021SC@SDUSC

supervisor-data函数定义如下:

supervisor-data函数返回一个包含了supervisor元数据的map对象。

supervisor-data函数

( defn  supervisor-data  [ conf  shared-context  ^ ISupervisor  isupervisor ]
  ;; 保存集群配置信息
  { :conf  conf
   ;; 启动supervisor时,shared-context为nil
   :shared-context  shared-context
   ;; 保存supervisor实例
   :isupervisor  isupervisor
   ;; 保存supervisor是否是活跃的(默认是活跃的)
   :active ( atom  true)
   ;; 保存supervisor启动时间
   :uptime ( uptime-computer)
   ;; 保存工作线程id
   :worker-thread-pids-atom ( atom  {})
   ;; 保存StormClusterState对象
   :storm-cluster-state ( cluster/mk-storm-cluster-state  conf)
   ;; 保存supervisor的LocalState对象,该LocalState对象的根目录是"{storm.local.dir}/supervisor/localstate"
   :local-state ( supervisor-state  conf)
   ;; 保存supervisor的id
   :supervisor-id ( .getSupervisorId  isupervisor)
   ;; 保存supervisor的分配id,分配id与supervisor_id相同
   :assignment-id ( .getAssignmentId  isupervisor)
   ;; 保存supervisor的主机名,如果配置conf(map对象)中包含"storm.local.hostname",那么就使用配置的主机名,否则通过调用InetAddress.getLocalHost().getCanonicalHostName()获取主机名
   :my-hostname ( if ( contains?  conf  STORM-LOCAL-HOSTNAME)
                 ( conf  STORM-LOCAL-HOSTNAME)
                 ( local-hostname))
   ;; 心跳时汇报当前集群的所有分配信息
   :curr-assignment ( atom  nil)  ;; used for reporting used ports when heartbeating
   ;; 保存一个storm定时器timer,kill-fn函数会在timer-thread发生exception的时候被调用
   :timer ( mk-timer  :kill-fn ( fn  [ t ]
                              ( log-error  t  "Error when processing event")
                              ( exit-process!  20  "Error when processing an event")
                              ))
   ;; 创建一个用于存放带有版本号的分配信息的map
   :assignment-versions ( atom  {})
   })

sync-processes函数定义如下:

sync-processes函数

;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers
;; supervisor标识supervisor的元数据
( defn  sync-processes  [ supervisor ]
  ;; conf绑定storm的配置信息map
 ( let  [ conf ( :conf  supervisor)
        ;; local-state绑定supervisor的LocalState实例
        ^ LocalState  local-state ( :local-state  supervisor)
        ;; 从supervisor的LocalState实例中获取本地分配信息端口port->LocalAssignment实例的map,LocalAssignment实例封装了storm-id和分配给该storm-id的executors
        assigned-executors ( defaulted ( .get  local-state  LS-LOCAL-ASSIGNMENTS)  {})
        ;; now绑定当前时间
        now ( current-time-secs)
        ;; allocated绑定worker-id->worker状态和心跳的map,read-allocated-workers函数请参见其定义部分
        allocated ( read-allocated-workers  supervisor  assigned-executors  now)
        ;; 过滤掉allocated中state不等于:valid的元素,并将过滤后的结果绑定到keepers
        keepers ( filter-val
                ( fn  [[ state  _ ]] ( =  state  :valid))
                 allocated)
        ;; keep-ports绑定keepers中心跳信息所包含的端口
        keep-ports ( set ( for  [[ id  [ _  hb ]]  keepers ] ( :port  hb)))
        ;; reassign-executors绑定assigned-executors中端口不在集合keep-ports的键值对构成的map,也就是说已分配的线程所对应的进程挂掉了,需要重新进行分配
        reassign-executors ( select-keys-pred ( complement  keep-ports)  assigned-executors)
        ;; new-worker-ids绑定port->worker-id的map,new-worker-ids保存了需要重新启动进程的worker-id
        new-worker-ids ( into
                        {}
                       ( for  [ port ( keys  reassign-executors )]
                          [ port ( uuid )]))
        ]
    ;; 1. to kill are those in allocated that are dead or disallowed
    ;; 2. kill the ones that should be dead
    ;;     - read pids, kill -9 and individually remove file
    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
    ;; 3. of the rest, figure out what assignments aren't yet satisfied
    ;; 4. generate new worker ids, write new "approved workers" to LS
    ;; 5. create local dir for worker id
    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
    ;; 6. wait for workers launch
 
   ( log-debug  "Syncing processes")
   ( log-debug  "Assigned executors: "  assigned-executors)
   ( log-debug  "Allocated: "  allocated)
    ;; allocated绑定worker-id->worker状态和心跳的map,id绑定worker-id,state绑定worker状态,heartbeat绑定worker心跳时间
   ( doseq  [[ id  [ state  heartbeat ]]  allocated ]
      ;; 如果worker的状态不是:valid,那么就关闭worker
     ( when ( not=  :valid  state)
       ( log-message
         "Shutting down and clearing state for id "  id
         ". Current supervisor time: "  now
         ". State: "  state
         ", Heartbeat: " ( pr-str  heartbeat))
        ;; shutdown-worker函数关闭进程,shutdown-worker函数请参见其定义部分
       ( shutdown-worker  supervisor  id)
       ))
    ;; new-worker-ids保存了需要重新启动进程的worker-id,遍历new-worker-ids,为每个worker-id创建本地目录"{storm.local.dir}/workers/{worker_id}"
   ( doseq  [ id ( vals  new-worker-ids )]
     ( local-mkdirs ( worker-pids-root  conf  id)))
    ;; 将合并后的map重新保存到local-state的LS-APPROVED-WORKERS中
   ( .put  local-state  LS-APPROVED-WORKERS
          ;; 将new-worker-ids的键值交换由原来的port->worker-id转换成worker-id->port,并与local-state的LS-APPROVED-WORKERS合并
         ( merge
           ;; select-keys函数从local-state的LS-APPROVED-WORKERS中获取key包含在keepers中的键值对,返回结果是一个map
          ( select-keys ( .get  local-state  LS-APPROVED-WORKERS)
                       ( keys  keepers))
           ;; zipmap函数返回new-worker-ids的worker-id->port的map
          ( zipmap ( vals  new-worker-ids) ( keys  new-worker-ids))
          ))
    ;; wait-for-workers-launch函数等待所有worker启动完成,请参见wait-for-workers-launch函数定义部分
   ( wait-for-workers-launch
     conf
     ;; assignment绑定在该port运行的executor信息
    ( dofor  [[ port  assignment ]  reassign-executors ]
       ;; id为port所对应的worker-id
      ( let  [ id ( new-worker-ids  port )]
        ( log-message  "Launching worker with assignment "
                     ( pr-str  assignment)
                      " for this supervisor "
                     ( :supervisor-id  supervisor)
                      " on port "
                      port
                      " with id "
                      id
                     )
         ;; launch-worker函数负责启动worker,关于worker启动的相关分析会在以后的文章中详细介绍,在此不再介绍
        ( launch-worker  supervisor
                       ( :storm-id  assignment)
                        port
                        id)
         id)))
   ))

read-allocated-workers函数定义如下:

read-allocated-workers函数

;; 返回worker-id->worker状态和心跳的map,如果worker心跳为nil,那么worker是"dead"
( defn  read-allocated-workers
  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
  ;; supervisor绑定supervisor元数据,assigned-executors绑定supervisor分配信息端口port->LocalAssignment实例的map,now绑定当前时间
  [ supervisor  assigned-executors  now ]
  ;; 获取集群配置信息
 ( let  [ conf ( :conf  supervisor)
    ;; 获取supervisor的LocalState实例
        ^ LocalState  local-state ( :local-state  supervisor)
    ;; id->heartbeat绑定supervisor上运行进程的worker-id->心跳信息的map
        id->heartbeat ( read-worker-heartbeats  conf)
    ;; approved-ids绑定supervisor的LocalState实例中保存的worker-id的集合
        approved-ids ( set ( keys ( .get  local-state  LS-APPROVED-WORKERS )))]
    ;; 生成worker-id->[state hb]的map
   ( into
     {}
    ( dofor  [[ id  hb ]  id->heartbeat ]
            ;; cond相当于if...else嵌套
           ( let  [ state ( cond
                 ;; 如果心跳信息为nil,那么state值为:not-started关键字
                        ( not  hb)
                           :not-started
             ;; 如果approved-ids不包含id或者matches-an-assignment?返回false,那么state值为:disallowed关键字
                        ( or ( not ( contains?  approved-ids  id))
                 ;; matches-an-assignment?函数通过比较心跳信息和分配信息中的storm-id和线程id集合是否相同,来判定该worker是否已分配
                            ( not ( matches-an-assignment?  hb  assigned-executors)))
                           :disallowed
             ;; 如果当前时间-上次心跳时间>心跳超时时间,state值为:timed-out关键字
                        ( > ( -  now ( :time-secs  hb))
                           ( conf  SUPERVISOR-WORKER-TIMEOUT-SECS))
                           :timed-out
             ;; 以上条件均不满足时,state值为:valid关键字
                         true
                           :valid )]
             ( log-debug  "Worker "  id  " is "  state  ": " ( pr-str  hb)  " at supervisor time-secs "  now)
              [ id  [ state  hb ]]
             ))
    )))

read-worker-heartbeats函数定义如下:

read-worker-heartbeats函数

;; 获取supervisor上运行进程的worker-id->心跳信息的map
( defn  read-worker-heartbeats
  "Returns map from worker id to heartbeat"
  [ conf ]
  ;; ids绑定supervisor上进程的worker-id集合
 ( let  [ ids ( my-worker-ids  conf )]
    ;; 生成worker-id->心跳信息的map
   ( into  {}
     ( dofor  [ id  ids ]
        ;; read-worker-heartbeat函数获取指定worker-id的心跳信息,从supervisor上"{storm.local.dir}/workers/{worker-id}/heartbeats"中获取心跳信息
        [ id ( read-worker-heartbeat  conf  id )]))
   ))

my-worker-ids函数定义如下:

my-worker-ids函数

;; 获取supervisor上运行的进程的worker-id
( defn  my-worker-ids  [ conf ]
  ;; worker-root函数返回supervisor本地目录"{storm.local.dir}/workers",read-dir-contents函数获取目录"{storm.local.dir}/workers"下所有文件名的集合(即该supervisor上正在运行的所有进程的worker-id)
 ( read-dir-contents ( worker-root  conf)))

matches-an-assignment?函数定义如下:

matches-an-assignment?函数

;; worker-heartbeat标识心跳信息,assigned-executors标识supervisor分配信息port->LocalAssignment实例的map
( defn  matches-an-assignment?  [ worker-heartbeat  assigned-executors ]
  ;; 从worker-heartbeat中获取进程占用的端口,进而从assigned-executors中获取LocalAssignment实例
 ( let  [ local-assignment ( assigned-executors ( :port  worker-heartbeat ))]
    ;; 如果local-assignment不为nil,且心跳信息中的storm-id和分配信息中的storm-id相等,且心跳信息中的线程id集合和分配信息中的线程id集合相等,那么返回true;否则返回false
   ( and  local-assignment
        ( = ( :storm-id  worker-heartbeat) ( :storm-id  local-assignment))
     ;; Constants/SYSTEM_EXECUTOR_ID标识"系统bolt"的线程id,我定义的topology除了我们指定的spout和bolt外,还包含一些"系统bolt"
        ( = ( disj ( set ( :executors  worker-heartbeat))  Constants/SYSTEM_EXECUTOR_ID)
           ( set ( :executors  local-assignment))))))

shutdown-worker函数定义如下:

shutdown-worker函数

;; 关闭进程,supervisor标识supervisor元数据,id标识worker_id
( defn  shutdown-worker  [ supervisor  id ]
 ( log-message  "Shutting down " ( :supervisor-id  supervisor)  ":"  id)
  ;; conf绑定集群配置信
 ( let  [ conf ( :conf  supervisor)
        ;; 注意当storm集群"分布式模式"运行时,supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路径中存放了worker实际对应的jvm进程id
        ;; 从supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路径获取进程id,worker_id标识我们指定的进程id,pids目录存放了该worker实际对应的jvm进程的id
        pids ( read-dir-contents ( worker-pids-root  conf  id))
        ;; 注意当storm集群"本地模式"运行时,supervisor元数据中关键字:worker-thread-pids-atom所对应的map用于存放worker_id->线程id集合的键值对
        ;; 先从supervisor元数据中获取worker-id(我们人为分配给worker的id)->jvm进程id的map,thread-pid实际上绑定的是worker的jvm进程id
        thread-pid ( @( :worker-thread-pids-atom  supervisor)  id )]
    ;; 当thread-pid不为空时,kill掉该进程
   ( when  thread-pid
      ;; 调用backtype.storm.process-simulator中的kill-process函数kill掉进程
     ( psim/kill-process  thread-pid))
    ;; 遍历pids集合,kill掉每个进程
   ( doseq  [ pid  pids ]
      ;; 调用backtype.storm.util中的kill-process-with-sig-term函数,kill-process-with-sig-term函数又调用了send-signal-to-process函数,send-signal-to-process函数实现比较简单就是执行系统命令"kill -15 pid",kill掉进程
      ;; 注意在创建worker进程时为worker进程指定了关闭回调函数,当调用"kill -15 pid"关闭worker进程时会触发回调函数执行,回调函数是在worker.clj的mk-worker函数中添加的
     ( kill-process-with-sig-term  pid))
    ;; 如果pids不为空,sleep 1秒,等着"清理函数"--关闭回调函数执行完毕
   ( if-not ( empty?  pids) ( sleep-secs  1))  ;; allow 1 second for execution of cleanup threads on worker.
    ;; 通过调用"kill -15 pid"命令未能关闭的进程,将通过调用force-kill-process函数关闭,force-kill-process函数只是调用了"kill -9 pid"命令
   ( doseq  [ pid  pids ]
     ( force-kill-process  pid)
     ( try
        ;; 删除"{storm.local.dir}/workers/{worker_id}/pids"
       ( rmpath ( worker-pid-path  conf  id  pid))
       ( catch  Exception  e)))  ;; on windows, the supervisor may still holds the lock on the worker directory
    ;; try-cleanup-worker函数清理本地目录,try-cleanup-worker函数参见其定义部分
   ( try-cleanup-worker  conf  id))
 ( log-message  "Shut down " ( :supervisor-id  supervisor)  ":"  id))

try-cleanup-worker函数定义如下:

try-cleanup-worker函数

;; 清理本地目录
( defn  try-cleanup-worker  [ conf  id ]
 ( try
    ;; 删除"{storm.local.dir}/workers/{worker_id}/heartbeats"目录
   ( rmr ( worker-heartbeats-root  conf  id))
    ;; this avoids a race condition with worker or subprocess writing pid around same time
    ;; 删除"{storm.local.dir}/workers/{worker_id}/pids"目录
   ( rmpath ( worker-pids-root  conf  id))
    ;; 删除"{storm.local.dir}/workers/{worker_id}"目录
   ( rmpath ( worker-root  conf  id))
 ( catch  RuntimeException  e
   ( log-warn-error  e  "Failed to cleanup worker "  id  ". Will retry later")
   )
 ( catch  java.io.FileNotFoundException  e ( log-message ( .getMessage  e)))
 ( catch  java.io.IOException  e ( log-message ( .getMessage  e)))
   ))

上一篇:linux 配置启动supervisor详细


下一篇:【填坑之旅-hadoop-09】2.10.1 jdk1.8 Storm1.2.3 流式计算 nimbus ui supervisor topo spouts bolts tuple tas