Jstorm调度规则

调度细则

1.任务调度算法以worker为维度;
2.调度过程中正在进行的调度动作不会对已发生的调度动作产生影响;
3.调度过程中用户可以自定义 useDefined Assignment,和使用已有的old Assignment,这两者的优先级是:useDefined Assignment>old Assignment;
4.用户可以设置task.on.differ.node参数,强制要求同组件的task分布到不同supervisor上;
5.worker只会被唯一的拓扑使用
6.在启动supervisor时,worker不会启动,只有在有Task时才会启动,每个拓扑可以指定worker的资源占用

默认调度算法

1.以worker为维度,尽量将worker平均分配到各个supervisor上;(让Worker获取尽量多的资源)
2.以worker为单位,确认worker与task数目大致的对应关系(注意在这之前已经其他拓扑占用利用的worker不再参与本次动作);
3.建立task-worker关系的优先级依次为:尽量避免同类task在同一work和supervisor下的情况(避免资源争用,例如多个kafka_reader在同一个worker,大量拉取,会争抢CPU,带宽资源),尽量保证task在worker和supervisor基准上平均分配,尽量保证有直接信息流传输的task在同一worker下(避免进程通信,进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率);

Worker资源足够,调度分配

task.on.differ.node=false
topology_nums=1
topology_level=1
workers=6
spout.parallel=6
bolt_0.parallel=6
acker.executors=6
Worker 1 2 3 4 5 6
Worker 1 2 3 4 5 6
spout-task 1 1 1 1 1 1
Bolt0-task 1 1 1 1 1 1
acker-task 1 1 1 1 1 1

当有Supervisor超过6个时,Worker会平均分配到每一个

集群组件变化对调度的影响 1. 增加superivsor,不影响已有拓扑任务调度 2. 杀死worker,重启worker,且调度正常 3. 杀死supervisor,拓扑继续跑,一切正常 4. 其中一台机子宕机,之前该机子跑的task将重新参与调度,调度结果符合要求 5. 增加supervisor,再rebalance,新增加的supervisor会加入任务调度过程

用户自定义任务调度测试

代码例子:

com.alipay.dw.jstorm.example.userdefined.scheduler.UserDefinedWorkerTopology
user.worker.num=1 #设置work数量
1_worker_hostname=glowd.ali.com #指定supervisor的地址
1_worker_port=6801    #指定supervisor地址想要适配的worker端口号
1_worker_mem=10    #worker的内存
1_worker_cpu=2     #worker的cpu占用
1_worker_jvm=2     #worker的JVM占用
1_worker_component=__acker:1;kafka_reader:2 #worker中运行的所有组件

task.on.differ.node

    builder.setBolt(TopologyDef.ANALYSIS_COMPONENT, new AnalysisBolt(), 3).localOrShuffleGrouping(TopologyDef.KAFKA_READER_COMPONENT).addConfiguration("task.on.differ.node", "true");

1
此处将TopologyDef.ANALYSIS_COMPONENT强制分配到不同的superviser的worker中执行。如果此拓扑配置的Worker数量小于3,或者superviser小于3,无法进行调度分配,因为和task.on.differ.node相互矛盾

作者:glowd
原文:https://blog.csdn.net/zengqiang1/article/details/78444872
版权声明:本文为博主原创文章,转载请附上博文链接!

上一篇:自动安装带nginx_upstream_check_module模块的Nginx脚本


下一篇:详解百度应用的 WormHole 后门