五:storm-wordcount 实时版开发
1:编写Spout
package wc; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * @author Dawn * @date 2019年6月5日15:59:04 * @version 1.0 * 需求:单词计数 hello dawn hello dawn indicate * 实现接口:IRichSpout IRichBolt * 继承抽象类:BaseRichSpout BaseRichSpout 常用 */ public class WordCountSpout extends BaseRichSpout{ //定义收集器 private SpoutOutputCollector collector; //发送数据 @Override public void nextTuple() { //1:发送数据到blot collector.emit(new Values("hello dawn hello dawn indicate")); //2.设置延迟 try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //创建收集器 @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector=collector; } //声明 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { //起别名 declare.declare(new Fields("dawn")); } }
2:编写分词bolt
package wc; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * @author Dawn * @date 2019年6月5日16:09:58 * @version 1.0 * 单词切分Bolt组件 */ public class WordCountSplitBolt extends BaseRichBolt{ //数据继续发送到下一个bolt private OutputCollector collector; @Override public void execute(Tuple in) { //1.获取数据 String line = in.getStringByField("dawn"); //2.切分数据 String[] fields = line.split(" "); //3.<单词,1> 发送出去 下一个bolt(累加求和) for(String w:fields) { collector.emit(new Values(w,1)); } } //初始化 @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector=collector; } //声明描述 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("word","sum")); } }
3:编写计数bolt
package wc; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; /** * @author Dawn * @date 2019年6月5日16:17:45 * @version 1.0 * 计数bolt */ public class WordCount extends BaseRichBolt{ private Map<String, Integer> map=new HashMap<>(); //累加求和 @Override public void execute(Tuple in) { //1.获取数据 String word = in.getStringByField("word"); Integer sum = in.getIntegerByField("sum"); //2.业务处理 if(map.containsKey(word)) { //之前出现几次 Integer count = map.get(word); //已有的 map.put(word, count+sum); }else { map.put(word, sum); } //3.打印控制台 System.err.println(Thread.currentThread().getName() + "\t 单词为:" + word + "\t 当前已出现次数为:" + map.get(word)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { } //没有下一个阶段就不用写 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { } }
4:编写driver驱动类
package wc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @author Dawn * @date 2019年6月5日16:18:52 * @version 1.0 * 驱动类,以及使用不同的分组策略演示(字段,随机,全局) */ public class WordCountDriver { public static void main(String[] args) { //1.hadoop->Job storm->topology 创建拓扑 TopologyBuilder builder = new TopologyBuilder(); //2.指定设置 //设置任务的spout组件 builder.setSpout("WordCountSpout", new WordCountSpout(),2);//拓扑名,数据源,并行度 //设置任务的单词拆分的bolt组件,是字段分组 并行度为2,总任务数 4 builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).fieldsGrouping("WordCountSpout", new Fields("dawn")); //设置任务的单词计数的bolt组件,是字段分组 ,并行度为2 builder.setBolt("WordCount", new WordCount(),4).fieldsGrouping("WordCountSplitBolt", new Fields("word","sum")); //============================================================================================================》 // //设置任务的单词拆分的bolt组件,是随机分组 并行度为2,总任务数 4 // builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).shuffleGrouping("WordCountSpout"); // // //设置任务的单词计数的bolt组件,是随机分组 ,并行度为2 // builder.setBolt("WordCount", new WordCount(),4).shuffleGrouping("WordCountSplitBolt"); //============================================================================================================》 // //设置任务的单词拆分的bolt组件,是全局分组 并行度为2,总任务数 4 // //分配给task id值最小的 根据线程id判断,只分噢诶给线程id最小的 // builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(),2).setNumTasks(4).globalGrouping("WordCountSpout"); // // //设置任务的单词计数的bolt组件,是全局分组 ,并行度为2 // builder.setBolt("WordCount", new WordCount(),4).globalGrouping("WordCountSplitBolt"); // //3.创建配置信息 Config conf = new Config(); //conf.setNumWorkers(10);可以设置work数 // //集群模式运行 // try { // StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); // } catch (AlreadyAliveException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } catch (InvalidTopologyException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } catch (AuthorizationException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } //4.提交任务(本地模式) LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordcounttopology", conf, builder.createTopology()); } }
六:storm-wordcount 提交到集群上运行
1:打包程序到Linux上
2:提交任务
3:在Storm UI上看任务执行情况
七:分组策略
使用上面word count程序来学习分组策略
总图:参照这个来看
一个executor就是一个线程数
一个task就是一个任务数
1) Fields Grouping
按照字段分组。相同字段发送到一个task中。
运行结果:
可以看出都是进行字段进行分组的,为什么了?应为我这里字段(hello dawn hello dawn indicate)也就只有3个,而且我这里并行度设置的是4(就理解为线程数)。从结果中只有3个线程在使用!!!也就是相同的字段放入一个task中。。。
2)shuffle Grouping
随机分组。轮询。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同。
运行结果:
可以看出明显不是字段分组。因为这里并行度为4,并且这4个线程都用上了。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同。而且这里次数出现的都有点问题。个人觉得有点像线程中的那个同步问题。(个人觉得哈!!只是助于理解,具体是不是我也不知道)
3)Non Grouping
不分组
采用这种策略每个bolt中接收的单词不同。
4)All Grouping
广播发送
5)Global Grouping
全局分组
分配给task id值最小的
根据线程id判断,只分噢诶给线程id最小的
运行结果:
可以看出虽然我们设置了4个线程。但是这个全局分组分配给task id值(线程id)最小的。根据线程id判断,只分给线程id最小的。只用到了一个线程id最小的线程
总结:一般来说,就字段分组和随机分组用的多点。其他用的都很少