Spout数据源的思考
数据库:只适合读取数据库的配置文件
文件:只适合测试(开发过程中是分布式的集群)
企业产生的log文件处理步骤:
- 读取内容写入到MQ
- Storm再处理
分组策略(Stream Grouping)
stream grouping用来定义一个stream应该如何分配给Bolts上面的多个executor(多线程并发)
Strom里面有7种类型的Stream grouping
1)Shuffle Grouping:随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
2)Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
3)All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
4)Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5)Non Grouping:不分组**,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping**是一样的效果。在多线程情况下不平均分配。
6)Direct Grouping**:直接分组**,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
7)Local or shuffle grouping**:**如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。
8)测试
(1)spout并发度修改为2,bolt并发度修改为1,shuffleGrouping模式
builder.setSpout(“WebLogSpout”, new WebLogSpout(),2); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 1).shuffleGrouping(“WebLogSpout”); Spout开两个线程会对数据读取两份,打印出来就是2份。如果数据源是消息队列,就不会出来读取两份的数据(统一消费者组,只能有一个消费者) 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number102 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number103 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number104
(2)spout并发度修改为1,bolt并发度修改为2,noneGrouping模式
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1);
builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).noneGrouping(“WebLogSpout”);
每个bolt接收到的单词不同。
Thread-33-WebLogBolt-executor[1 1]lines:14 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678
Thread-34-WebLogBolt-executor[2 2]lines:16 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678
(3)spout并发度修改为1,bolt并发度修改为2,fieldsGrouping
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).fieldsGrouping(“WebLogSpout”, new Fields(“log”)); 基于web案例不明显,后续案例比较明显
(4)spout并发度修改为1,bolt并发度修改为2,allGrouping(“spout”);
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).allGrouping(“WebLogSpout”); 每一个bolt获取到的数据都是一样的。 Thread-43-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-23-WebLogBolt-executor[2 2]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 ,
(5)spout并发度修改为1,bolt并发度修改为2,globalGrouping(“spout”);
builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).globalGrouping(“WebLogSpout”); Task的id最低的bolt获取到了所有数据。 Thread-28-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678