worker进程中线程的分类及用途
本文重点分析storm的worker进程在正常启动之后有哪些类型的线程,针对每种类型的线程,剖析其用途及消息的接收与发送流程。
概述
worker进程启动过程中最重要的两个函数是mk-worker和worker-data,代码就不一一列出了。worker顺利启动之后会拥有如下图所示的各类线程。
接收和发送线程
worker在启动的时候会生成进程级别的消息接收和消息发送线程,它们视具体配置而定,可以是基于zmq,也可以基于netty,这个没有太多好说的。socket connection的建立过程可以在tuple消息传递一文中找到说明。
zk client
worker需要定期的向zk server发送心跳消息,与zk server之间的连接处理就落到zk client这个线程身上了。具体代码见函数do-heartbeat及do-executor-heartbeats。
定时器线程
worker进程需要定期的做些事情,比如发送心跳消息,刷新socket连接,这些定时器归为如下几类,每类定时器运行在各自的线程。
- :heartbeat-timer worker
- :refresh-connections-timer worker
- :refresh-active-timer worker
- :executor-heartbeat-timer worker
- :user-timer worker
上述定时器分类见于worker的shutdown函数,有时候在分析代码的时候,如果从入口看不清楚的话,不妨试试从退出的处理逻辑哪里找找答案。
SystemBolt
在topology提交的时候曾经见过函数system-topology!,这个函数会创建SystemBolt,每个worker内有且只有一个SystemBolt,可以见SystemBolt.java中注释的说明或参考github上storm对该改变的说明,https://github.com/nathanmarz/storm/pull/517。
SystemBolt主要进行进程相关的统计功能,比如内存使用情况,网络包的吞吐量,具体可见SystemBolt.java。SystemBolt是不接收tuple,只有出度,没有入度。
Metrics Bolt线程
MetricsBolt主要也是处理统计工作,与systembolt不同的是,metricsbolt主要处理executor级别的,如果用户在配置文件中定义了相关的MetricsConsumer类,那么这些类会在此被执行。
与之相关的配置内容,
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "backtype.storm.metrics.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
Shared Executor
这个是在storm 0.8中引入的,其用途可在0.8的release notes中找到,创建共享线程池,具体用途没太搞清楚,:).
Metrics的执行流程
metrics所做的计量工作是在什么时候被唤醒的呢,也就是说如何一步步的触发直到MetricsConsumeBolt的execute函数被调用。
下图勾勒出与metrics相关的线程间的消息传递过程。
简要说明如下
- worker在启动的时候,会往:user-timer中注册metrics timer(见setup-metrics!函数).
- 一旦metrics timer超时,会发送一个stream-id为metrics-tick-stream-id的tuple到非metrics类型的bolt,如user/acker/system bolt.
- 接收到tuple之后,会调用metrics-tick函数发送task-data给MetricsConsumerBolt, stream-id为metrics-stream-id
- MetricsConsumerBolt接收到stream-id为metrics-stream-id的tuple后,会执行execute
注:在worker内部还有另一套计量api,定义于builtin-metrics.clj中,与MetricsConsumerBolt的区别在于,builtin-metrics是在处理外部进程发送过来的tuple时进行计量统计,而MetricsConsumerBolt是定时触发。
worker进程内部消息传递处理和数据结构分析
本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。
Topology到Worker的映射关系
Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。
无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。
- worker是进程,executor对应于线程,spout或bolt是一个个的task
- 同一个worker只会执行同一个topology相关的task
- 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是spout类的task
- 运行的时候,spout和bolt需要被包装成一个又一个task
worker,executor, task三者之间的关系可以用下图表示
小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.
每一个executor使用的是actor pattern,high level的处理逻辑如下图所示
外部消息的接收和处理
在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?
换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?
先图示一下,外部消息处理的大概流程。
注:圈起来的数字表示消息转换和处理的序列。
步骤一
监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.
那么在worker-data中哪些数据项与这个过程相关呢
- :port
- :executor-receive-queue-map
- :short-executor-receive-queue-map
- :task->short-executor
- :transfer-local-fn
transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。
步骤二
接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。
executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。
executor中与步骤2相关的数据项
- :type executor-type
- :receive-queue
- :deserializer (executor-data中的数据项)
步骤三:
步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer-thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。
一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.
executor中与步骤3相关的数据项
- :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
- :batch-transfer-queue
在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题
- tuple中含有哪些字段 -- 该问题的解答由spout或bolt中的declareOutFields来解决
- 由哪个node+port来接收该tuple -- 由grouping来解决,这个时候就可以看出为什么需要task这一层的逻辑抽象了,有关grouping的详细解释,请参考fxjwind撰写的Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
步骤四:
处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.
start-batch-transfer->worker-handler
(defn start-batch-transfer->worker-handler! [worker executor-data] (let [worker-transfer-fn (:transfer-fn worker) cached-emit (MutableObject. (ArrayList.)) storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ] (disruptor/consume-loop* (:batch-transfer-queue executor-data) (disruptor/handler [o seq-id batch-end?] (let [^ArrayList alist (.getObject cached-emit)] (.add alist o) (when batch-end? (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) :kill-fn (:report-error-and-die executor-data))))
worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。
(defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] (fast-list-iter [[task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) (local-transfer local) ;; not using map because the lazy seq shows up in perf profiles (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])] (disruptor/publish transfer-queue serialized-pairs) )))))
步骤五:
处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。
(defn mk-transfer-tuples-handler [worker] (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (ArrayList.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] (disruptor/clojure-handler (fn [packets _ batch-end?] (.addAll drainer packets) (when batch-end? (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead) ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering) (fast-list-iter [[task ser-tuple] drainer] ;; TODO: consider write a batch of tuples here to every target worker ;; group by node+port, do multipart send (let [node-port (get task->node+port task)] (when node-port (.send ^IConnection (get node+port->socket node-port) task ser-tuple)) )))) (.clear drainer))))))
tuple发送的时候需要用到connection,但目前只知道task-id,所以在worker中需要保存task-id到node+port的映射,node+port与outgoing connections之间的映射。
worker中与步骤5相关的数据项:
- :cached-node+port->socket
- :cached-task->node+port
- :component->stream->fields
- :component->sorted-tasks
- :endpoint-socket-lock
- :transfer-queue (线程内部的消息队列)
- :task->component
其它的数据项
上述五个步骤并没有涵盖worker-data所有的数据项,那么其它的数据项归一归类,大体如下
timer相关,timer相关的数据项包括timer及其对应的处理句柄
- :heartbeat-timer
- :refresh-connection-timer
- :refresh-active-timer
- :executor-heartbeat-timer
- :user-timer
zk相关
- :storm-cluster-state
- :storm-active-atom
- :cluster-state
配置相关
- :conf
- :mq-context 在transport layer是使用zmq还是netty
Assignment相关
- :storm-id
- :assigment-id
- :worker-id
- :executors
- :task-ids
- :storm-conf
- :topology
- :system-topology
进程关闭相关
- :suicide-fn
其它的其它
- :uptime 运行时间,统计用
- :default-shared-resources 线程池
- :user-shared-resources 未启用
小结
设计的时候,一定是先画出一个大概的蓝图,然后逐步的细化并加以实现。具体来说,步骤如下
- manifest 定义主要的功能
- draw skeleton 画出实现草图,定义主要的接口
- discussion 与团队讨论
- data structures 数据结构
- function 函数实现
- testing 测试