Storm分组策略

Spout数据源的思考

数据库:只适合读取数据库的配置文件

文件:只适合测试(开发过程中是分布式的集群)

企业产生的log文件处理步骤:

  1. 读取内容写入到MQ
  2. Storm再处理

分组策略(Stream Grouping)

stream grouping用来定义一个stream应该如何分配给Bolts上面的多个executor(多线程并发)

Storm分组策略

Strom里面有7种类型的Stream grouping

1)Shuffle Grouping:随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

Storm分组策略

2Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

3)All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

4)Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5)Non Grouping:不分组**,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping**是一样的效果。在多线程情况下不平均分配。

6)Direct Grouping**:直接分组**,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

7)Local or shuffle grouping**:**如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。

8)测试

​ (1)spout并发度修改为2,bolt并发度修改为1,shuffleGrouping模式

builder.setSpout(“WebLogSpout”, new WebLogSpout(),2); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 1).shuffleGrouping(“WebLogSpout”); Spout开两个线程会对数据读取两份,打印出来就是2份。如果数据源是消息队列,就不会出来读取两份的数据(统一消费者组,只能有一个消费者) 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number102 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number103 158session_id VVVYH6Y4V4SFXZ56JIPDPB4V678line_number104

(2)spout并发度修改为1,bolt并发度修改为2,noneGrouping模式

builder.setSpout(“WebLogSpout”, new WebLogSpout(),1);

builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).noneGrouping(“WebLogSpout”);

每个bolt接收到的单词不同。

Thread-33-WebLogBolt-executor[1 1]lines:14 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

Thread-34-WebLogBolt-executor[2 2]lines:16 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

(3)spout并发度修改为1,bolt并发度修改为2,fieldsGrouping

builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).fieldsGrouping(“WebLogSpout”, new Fields(“log”)); 基于web案例不明显,后续案例比较明显

(4)spout并发度修改为1,bolt并发度修改为2,allGrouping(“spout”);

builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).allGrouping(“WebLogSpout”); 每一个bolt获取到的数据都是一样的。 Thread-43-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 Thread-23-WebLogBolt-executor[2 2]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678 ,

(5)spout并发度修改为1,bolt并发度修改为2,globalGrouping(“spout”);

builder.setSpout(“WebLogSpout”, new WebLogSpout(),1); builder.setBolt(“WebLogBolt”, new WebLogBolt(), 2).globalGrouping(“WebLogSpout”); Task的id最低的bolt获取到了所有数据。 Thread-28-WebLogBolt-executor[1 1]lines:30 session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

上一篇:Storm实现单词统计案例


下一篇:Storm模拟将接收到日志的会话id打印在控制台