流式计算-Jstorm提交Topology过程(上)

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系。它能够被提交到Jstorm集群。

本文以Jstorm自带的SequenceTopology简介一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及详细业务,

1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf)。该方法主要依据配置文件,使用TopologyBuilder构造Topology的spout和bolt。以及spout和bolt之间的关系

2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,

3、在StormSubmitter.submitTopology方法中。首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map。之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码例如以下:

if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException(
"Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
putUserInfo(conf, stormConf);
String serConf = JSON.toJSONString(stormConf);
if (localNimbus != null) {
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if (topologyNameExists(conf, name)) {//检查名字是否反复,Jstorm要求每一个topology名称必须唯一
throw new RuntimeException("Topology with name `" + name
+ "` already exists on cluster");
}
submitJar(conf);//上传Jar包到ZK
client.getClient().submitTopologyWithOpts(name, path,
serConf, topology, opts);//通过Thrift将topology提交到集群

4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology。其详细逻辑例如以下(代码已经精简)

public void submitTopologyWithOpts(String topologyname,
String uploadedJarLocation, String jsonConf,
StormTopology topology, SubmitOptions options)
throws AlreadyAliveException, InvalidTopologyException,
TopologyAssignException, TException {
//首先检查topology是否已经存在
checkTopologyActive(data, topologyname, false);
//生成topology的唯一标识
int counter = data.getSubmittedCount().incrementAndGet();
String topologyId = topologyname + "-" + counter + "-"
+ TimeUtils.current_time_secs();
try {
//反序列化topology配置项
Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
.from_json(jsonConf);
if (serializedConf == null) {
LOG.warn("Failed to serialized Configuration");
throw new InvalidTopologyException(
"Failed to serilaze topology configuration");
}
//将topology的名称和ID添加到配置项中
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
Map<Object, Object> stormConf;
stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
topology);
Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
conf);
totalStormConf.putAll(stormConf);
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
stormConf, topology); // this validates the structure of the topology
Common.validate_basic(normalizedTopology, totalStormConf,
topologyId);
// don't need generate real topology, so skip Common.system_topology
// Common.system_topology(totalStormConf, topology);
StormClusterState stormClusterState = data.getStormClusterState();
// 创建topology在ZK上的文件夹
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
normalizedTopology); // 为每个spout或者bolt生成Task,并在ZK上创建对应的task文件夹<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span>
setupZkTaskInfo(conf, topologyId, stormClusterState);
// 进行任务分配
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyname);
assignEvent.setOldStatus(Thrift
.topologyInitialStatusToStormStatus(options
.get_initial_status())); TopologyAssign.push(assignEvent);
LOG.info("Submit for " + topologyname + " with conf "
+ serializedConf); boolean isSuccess = assignEvent.waitFinish();
if (isSuccess == true) {
LOG.info("Finish submit for " + topologyname);
}
上一篇:BlazeDS简介(转自openkk的日志)


下一篇:ubuntu下Qt之android环境配置以及一些常见问题解决