2021SC@SDUSC
supervisor-data函数定义如下:
supervisor-data函数返回一个包含了supervisor元数据的map对象。
supervisor-data函数
( defn supervisor-data [ conf shared-context ^ ISupervisor isupervisor ]
;; 保存集群配置信息
{ :conf conf
;; 启动supervisor时,shared-context为nil
:shared-context shared-context
;; 保存supervisor实例
:isupervisor isupervisor
;; 保存supervisor是否是活跃的(默认是活跃的)
:active ( atom true)
;; 保存supervisor启动时间
:uptime ( uptime-computer)
;; 保存工作线程id
:worker-thread-pids-atom ( atom {})
;; 保存StormClusterState对象
:storm-cluster-state ( cluster/mk-storm-cluster-state conf)
;; 保存supervisor的LocalState对象,该LocalState对象的根目录是"{storm.local.dir}/supervisor/localstate"
:local-state ( supervisor-state conf)
;; 保存supervisor的id
:supervisor-id ( .getSupervisorId isupervisor)
;; 保存supervisor的分配id,分配id与supervisor_id相同
:assignment-id ( .getAssignmentId isupervisor)
;; 保存supervisor的主机名,如果配置conf(map对象)中包含"storm.local.hostname",那么就使用配置的主机名,否则通过调用InetAddress.getLocalHost().getCanonicalHostName()获取主机名
:my-hostname ( if ( contains? conf STORM-LOCAL-HOSTNAME)
( conf STORM-LOCAL-HOSTNAME)
( local-hostname))
;; 心跳时汇报当前集群的所有分配信息
:curr-assignment ( atom nil) ;; used for reporting used ports when heartbeating
;; 保存一个storm定时器timer,kill-fn函数会在timer-thread发生exception的时候被调用
:timer ( mk-timer :kill-fn ( fn [ t ]
( log-error t "Error when processing event")
( exit-process! 20 "Error when processing an event")
))
;; 创建一个用于存放带有版本号的分配信息的map
:assignment-versions ( atom {})
})
sync-processes函数定义如下:
sync-processes函数
;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers
;; supervisor标识supervisor的元数据
( defn sync-processes [ supervisor ]
;; conf绑定storm的配置信息map
( let [ conf ( :conf supervisor)
;; local-state绑定supervisor的LocalState实例
^ LocalState local-state ( :local-state supervisor)
;; 从supervisor的LocalState实例中获取本地分配信息端口port->LocalAssignment实例的map,LocalAssignment实例封装了storm-id和分配给该storm-id的executors
assigned-executors ( defaulted ( .get local-state LS-LOCAL-ASSIGNMENTS) {})
;; now绑定当前时间
now ( current-time-secs)
;; allocated绑定worker-id->worker状态和心跳的map,read-allocated-workers函数请参见其定义部分
allocated ( read-allocated-workers supervisor assigned-executors now)
;; 过滤掉allocated中state不等于:valid的元素,并将过滤后的结果绑定到keepers
keepers ( filter-val
( fn [[ state _ ]] ( = state :valid))
allocated)
;; keep-ports绑定keepers中心跳信息所包含的端口
keep-ports ( set ( for [[ id [ _ hb ]] keepers ] ( :port hb)))
;; reassign-executors绑定assigned-executors中端口不在集合keep-ports的键值对构成的map,也就是说已分配的线程所对应的进程挂掉了,需要重新进行分配
reassign-executors ( select-keys-pred ( complement keep-ports) assigned-executors)
;; new-worker-ids绑定port->worker-id的map,new-worker-ids保存了需要重新启动进程的worker-id
new-worker-ids ( into
{}
( for [ port ( keys reassign-executors )]
[ port ( uuid )]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
;; 4. generate new worker ids, write new "approved workers" to LS
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
( log-debug "Syncing processes")
( log-debug "Assigned executors: " assigned-executors)
( log-debug "Allocated: " allocated)
;; allocated绑定worker-id->worker状态和心跳的map,id绑定worker-id,state绑定worker状态,heartbeat绑定worker心跳时间
( doseq [[ id [ state heartbeat ]] allocated ]
;; 如果worker的状态不是:valid,那么就关闭worker
( when ( not= :valid state)
( log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " ( pr-str heartbeat))
;; shutdown-worker函数关闭进程,shutdown-worker函数请参见其定义部分
( shutdown-worker supervisor id)
))
;; new-worker-ids保存了需要重新启动进程的worker-id,遍历new-worker-ids,为每个worker-id创建本地目录"{storm.local.dir}/workers/{worker_id}"
( doseq [ id ( vals new-worker-ids )]
( local-mkdirs ( worker-pids-root conf id)))
;; 将合并后的map重新保存到local-state的LS-APPROVED-WORKERS中
( .put local-state LS-APPROVED-WORKERS
;; 将new-worker-ids的键值交换由原来的port->worker-id转换成worker-id->port,并与local-state的LS-APPROVED-WORKERS合并
( merge
;; select-keys函数从local-state的LS-APPROVED-WORKERS中获取key包含在keepers中的键值对,返回结果是一个map
( select-keys ( .get local-state LS-APPROVED-WORKERS)
( keys keepers))
;; zipmap函数返回new-worker-ids的worker-id->port的map
( zipmap ( vals new-worker-ids) ( keys new-worker-ids))
))
;; wait-for-workers-launch函数等待所有worker启动完成,请参见wait-for-workers-launch函数定义部分
( wait-for-workers-launch
conf
;; assignment绑定在该port运行的executor信息
( dofor [[ port assignment ] reassign-executors ]
;; id为port所对应的worker-id
( let [ id ( new-worker-ids port )]
( log-message "Launching worker with assignment "
( pr-str assignment)
" for this supervisor "
( :supervisor-id supervisor)
" on port "
port
" with id "
id
)
;; launch-worker函数负责启动worker,关于worker启动的相关分析会在以后的文章中详细介绍,在此不再介绍
( launch-worker supervisor
( :storm-id assignment)
port
id)
id)))
))
read-allocated-workers函数定义如下:
read-allocated-workers函数
;; 返回worker-id->worker状态和心跳的map,如果worker心跳为nil,那么worker是"dead"
( defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
;; supervisor绑定supervisor元数据,assigned-executors绑定supervisor分配信息端口port->LocalAssignment实例的map,now绑定当前时间
[ supervisor assigned-executors now ]
;; 获取集群配置信息
( let [ conf ( :conf supervisor)
;; 获取supervisor的LocalState实例
^ LocalState local-state ( :local-state supervisor)
;; id->heartbeat绑定supervisor上运行进程的worker-id->心跳信息的map
id->heartbeat ( read-worker-heartbeats conf)
;; approved-ids绑定supervisor的LocalState实例中保存的worker-id的集合
approved-ids ( set ( keys ( .get local-state LS-APPROVED-WORKERS )))]
;; 生成worker-id->[state hb]的map
( into
{}
( dofor [[ id hb ] id->heartbeat ]
;; cond相当于if...else嵌套
( let [ state ( cond
;; 如果心跳信息为nil,那么state值为:not-started关键字
( not hb)
:not-started
;; 如果approved-ids不包含id或者matches-an-assignment?返回false,那么state值为:disallowed关键字
( or ( not ( contains? approved-ids id))
;; matches-an-assignment?函数通过比较心跳信息和分配信息中的storm-id和线程id集合是否相同,来判定该worker是否已分配
( not ( matches-an-assignment? hb assigned-executors)))
:disallowed
;; 如果当前时间-上次心跳时间>心跳超时时间,state值为:timed-out关键字
( > ( - now ( :time-secs hb))
( conf SUPERVISOR-WORKER-TIMEOUT-SECS))
:timed-out
;; 以上条件均不满足时,state值为:valid关键字
true
:valid )]
( log-debug "Worker " id " is " state ": " ( pr-str hb) " at supervisor time-secs " now)
[ id [ state hb ]]
))
)))
read-worker-heartbeats函数定义如下:
read-worker-heartbeats函数
;; 获取supervisor上运行进程的worker-id->心跳信息的map
( defn read-worker-heartbeats
"Returns map from worker id to heartbeat"
[ conf ]
;; ids绑定supervisor上进程的worker-id集合
( let [ ids ( my-worker-ids conf )]
;; 生成worker-id->心跳信息的map
( into {}
( dofor [ id ids ]
;; read-worker-heartbeat函数获取指定worker-id的心跳信息,从supervisor上"{storm.local.dir}/workers/{worker-id}/heartbeats"中获取心跳信息
[ id ( read-worker-heartbeat conf id )]))
))
my-worker-ids函数定义如下:
my-worker-ids函数
;; 获取supervisor上运行的进程的worker-id
( defn my-worker-ids [ conf ]
;; worker-root函数返回supervisor本地目录"{storm.local.dir}/workers",read-dir-contents函数获取目录"{storm.local.dir}/workers"下所有文件名的集合(即该supervisor上正在运行的所有进程的worker-id)
( read-dir-contents ( worker-root conf)))
matches-an-assignment?函数定义如下:
matches-an-assignment?函数
;; worker-heartbeat标识心跳信息,assigned-executors标识supervisor分配信息port->LocalAssignment实例的map
( defn matches-an-assignment? [ worker-heartbeat assigned-executors ]
;; 从worker-heartbeat中获取进程占用的端口,进而从assigned-executors中获取LocalAssignment实例
( let [ local-assignment ( assigned-executors ( :port worker-heartbeat ))]
;; 如果local-assignment不为nil,且心跳信息中的storm-id和分配信息中的storm-id相等,且心跳信息中的线程id集合和分配信息中的线程id集合相等,那么返回true;否则返回false
( and local-assignment
( = ( :storm-id worker-heartbeat) ( :storm-id local-assignment))
;; Constants/SYSTEM_EXECUTOR_ID标识"系统bolt"的线程id,我定义的topology除了我们指定的spout和bolt外,还包含一些"系统bolt"
( = ( disj ( set ( :executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
( set ( :executors local-assignment))))))
shutdown-worker函数定义如下:
shutdown-worker函数
;; 关闭进程,supervisor标识supervisor元数据,id标识worker_id
( defn shutdown-worker [ supervisor id ]
( log-message "Shutting down " ( :supervisor-id supervisor) ":" id)
;; conf绑定集群配置信
( let [ conf ( :conf supervisor)
;; 注意当storm集群"分布式模式"运行时,supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路径中存放了worker实际对应的jvm进程id
;; 从supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路径获取进程id,worker_id标识我们指定的进程id,pids目录存放了该worker实际对应的jvm进程的id
pids ( read-dir-contents ( worker-pids-root conf id))
;; 注意当storm集群"本地模式"运行时,supervisor元数据中关键字:worker-thread-pids-atom所对应的map用于存放worker_id->线程id集合的键值对
;; 先从supervisor元数据中获取worker-id(我们人为分配给worker的id)->jvm进程id的map,thread-pid实际上绑定的是worker的jvm进程id
thread-pid ( @( :worker-thread-pids-atom supervisor) id )]
;; 当thread-pid不为空时,kill掉该进程
( when thread-pid
;; 调用backtype.storm.process-simulator中的kill-process函数kill掉进程
( psim/kill-process thread-pid))
;; 遍历pids集合,kill掉每个进程
( doseq [ pid pids ]
;; 调用backtype.storm.util中的kill-process-with-sig-term函数,kill-process-with-sig-term函数又调用了send-signal-to-process函数,send-signal-to-process函数实现比较简单就是执行系统命令"kill -15 pid",kill掉进程
;; 注意在创建worker进程时为worker进程指定了关闭回调函数,当调用"kill -15 pid"关闭worker进程时会触发回调函数执行,回调函数是在worker.clj的mk-worker函数中添加的
( kill-process-with-sig-term pid))
;; 如果pids不为空,sleep 1秒,等着"清理函数"--关闭回调函数执行完毕
( if-not ( empty? pids) ( sleep-secs 1)) ;; allow 1 second for execution of cleanup threads on worker.
;; 通过调用"kill -15 pid"命令未能关闭的进程,将通过调用force-kill-process函数关闭,force-kill-process函数只是调用了"kill -9 pid"命令
( doseq [ pid pids ]
( force-kill-process pid)
( try
;; 删除"{storm.local.dir}/workers/{worker_id}/pids"
( rmpath ( worker-pid-path conf id pid))
( catch Exception e))) ;; on windows, the supervisor may still holds the lock on the worker directory
;; try-cleanup-worker函数清理本地目录,try-cleanup-worker函数参见其定义部分
( try-cleanup-worker conf id))
( log-message "Shut down " ( :supervisor-id supervisor) ":" id))
try-cleanup-worker函数定义如下:
try-cleanup-worker函数
;; 清理本地目录
( defn try-cleanup-worker [ conf id ]
( try
;; 删除"{storm.local.dir}/workers/{worker_id}/heartbeats"目录
( rmr ( worker-heartbeats-root conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
;; 删除"{storm.local.dir}/workers/{worker_id}/pids"目录
( rmpath ( worker-pids-root conf id))
;; 删除"{storm.local.dir}/workers/{worker_id}"目录
( rmpath ( worker-root conf id))
( catch RuntimeException e
( log-warn-error e "Failed to cleanup worker " id ". Will retry later")
)
( catch java.io.FileNotFoundException e ( log-message ( .getMessage e)))
( catch java.io.IOException e ( log-message ( .getMessage e)))
))