storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
ClusterState协议
(set-ephemeral-node [this path data])
(delete-node [this path])
(create-sequential [this path data])
;; if node does not exist, create persistent with this data
(set-data [this path data])
(get-data [this path watch?])
(get-version [this path watch?])
(get-data-with-version [this path watch?])
(get-children [this path watch?])
(mkdirs [this path])
(close [this])
(register [this callback])
(unregister [this id]))
StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:
StormClusterState协议
(assignments [this callback])
(assignment-info [this storm-id callback])
(assignment-info-with-version [this storm-id callback])
(assignment-version [this storm-id callback])
(active-storms [this])
(storm-base [this storm-id callback])
(get-worker-heartbeat [this storm-id node port])
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(activate-storm! [this storm-id storm-base])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-storm! [this storm-id])
(report-error [this storm-id task-id node port error])
(errors [this storm-id task-id])
(disconnect [this]))
命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数
;; conf绑定了storm.yaml中的配置信息,是一个map对象
[conf]
;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
;; 创建storm集群在zookeeper上的根目录,默认值为/storm
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
;; callbacks绑定回调函数集合,是一个map对象
(let [callbacks (atom {})
;; active标示zookeeper集群状态
active (atom true)
;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event
;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数
;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
zk (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
:auth-conf conf
:root (conf STORM-ZOOKEEPER-ROOT)
;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
(when-not (= :none type)
(doseq [callback (vals @callbacks)]
(callback type path))))))]
;; reify相当于java中的implements,这里表示实现一个协议
(reify
ClusterState
;; register函数用于将回调函数加入callbacks中,key是一个32位的标识
(register
[this callback]
(let [id (uuid)]
(swap! callbacks assoc id callback)
id))
;; unregister函数用于将指定key的回调函数从callbacks中删除
(unregister
[this id]
(swap! callbacks dissoc id))
;; 在zookeeper上添加一个临时节点
(set-ephemeral-node
[this path data]
(zk/mkdirs zk (parent-path path))
(if (zk/exists zk path false)
(try-cause
(zk/set-data zk path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk path data :ephemeral)
))
(zk/create-node zk path data :ephemeral)))
;; 在zookeeper上添加一个顺序节点
(create-sequential
[this path data]
(zk/create-node zk path data :sequential))
;; 修改某个节点数据
(set-data
[this path data]
;; note: this does not turn off any existing watches
(if (zk/exists zk path false)
(zk/set-data zk path data)
(do
(zk/mkdirs zk (parent-path path))
(zk/create-node zk path data :persistent))))
;; 删除指定节点
(delete-node
[this path]
(zk/delete-recursive zk path))
;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)
(get-data
[this path watch?]
(zk/get-data zk path watch?))
;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
(get-data-with-version
[this path watch?]
(zk/get-data-with-version zk path watch?))
;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同
(get-version
[this path watch?]
(zk/get-version zk path watch?))
;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
(get-children
[this path watch?]
(zk/get-children zk path watch?))
;; 在zookeeper上创建一个节点
(mkdirs
[this path]
(zk/mkdirs zk path))
;; 关闭zk client
(close
[this]
(reset! active false)
(.close zk)))))
mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互在启动nimbus和supervisor的函数中均调用了
mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数
;; 删除to-kill中包含的节点
(doseq [k to-kill]
(delete-node cluster-state (str path "/" k)))))
;; 得到给定的storm-id component-id下的异常信息
(errors
[this storm-id component-id]
(let [path (error-path storm-id component-id)
_ (mkdirs cluster-state path)
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
maybe-deserialize)]
(when data
(struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
)))
]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除
(disconnect
[this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state))))))
zookeeper.clj中mk-client函数定义如下:
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
mk-client函数
[conf servers port
:root ""
:watcher default-watcher
:auth-conf nil]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
(.. fk
(getCuratorListenable)
(addListener
(reify CuratorListener
(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
(when (= (.getType e) CuratorEventType/WATCHED)
(let [^WatchedEvent event (.getWatchedEvent e)]
(watcher (zk-keeper-states (.getState event))
(zk-event-types (.getType event))
(.getPath event))))))))
(.start fk)
fk))
以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:
mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行哪些函数将由ClusterState实例决定
- ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考
http://f.dataguru.cn/thread-120125-1-1.html