目录
(1)时间窗口概念
什么是时间窗口?
按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。
使用场景:
- 计算每隔10分钟的销售额。
- 计算上1分钟成交金额
- 计算近1个小时最火爆的微博等。
Storm的时间窗口机制
Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。
有如下两种时间窗口:
- 滚动窗口 Tumbling Window
- 滑动窗口 Sliding Window
注意:时间窗口的计算,通常都是单并发(Executer)。
(1)滚动窗口 Tumbling Window
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。
在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5 0 5 10 15 -> time
|<------- w1 -->|
|<---------- w2 ----->|
|<-------------- w3 ---->|
因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。
BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))
(2)滑动窗口 Tumbling Window
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。
BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))
该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。
数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。
(2)滚动窗口程序开发及测试
MainTopology:
package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/29
* @time : 11:58 上午
*/
public class MainTopology {
public static void main(String[] args) {
// 创建Topology
TopologyBuilder builder = new TopologyBuilder();
// set Spout
builder.setSpout("spout",new AmtSpout());
// set 滚动窗口 Tumbling Window
builder.setBolt("amtBolt",new TumblingBolt()
.withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1)
.shuffleGrouping("spout");
// 设置日志等级
Config conf = new Config();
conf.setDebug(false);
try {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("filePrint", conf, builder.createTopology());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
AmtSpout:
package com.kfk.window;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/29
* @time : 12:24 下午
*/
public class AmtSpout implements IRichSpout {
Integer[] amt = {10,20,30,40};
String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
String[] product = {"java","python","c","scala"};
Random random = new Random();
SpoutOutputCollector spoutOutputCollector = null;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
spoutOutputCollector = collector;
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
try {
// 模拟数据
int _amt = amt[random.nextInt(4)];
String _time = time[random.nextInt(4)];
String _city = city[random.nextInt(5)];
String _product = product[random.nextInt(4)];
// emit给Bolt节点
spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
Thread.sleep(1000);
} catch (Exception e){
e.printStackTrace();
}
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// set Fields
declarer.declare(new Fields("amt","time","city","product"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
TumblingBolt:
package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/29
* @time : 12:09 下午
*/
public class TumblingBolt extends BaseWindowedBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
super.declareOutputFields(declarer);
}
@Override
public void execute(TupleWindow inputWindow) {
List<Tuple> list = inputWindow.get();
int amt = 0;
for (Tuple tuple:list){
amt += Integer.parseInt(tuple.getStringByField("amt"));
}
System.out.println(amt);
}
}
(2)滑动窗口程序开发及测试
MainTopology:
package com.kfk.window;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/29
* @time : 11:58 上午
*/
public class MainTopology {
public static void main(String[] args) {
// 创建Topology
TopologyBuilder builder = new TopologyBuilder();
// set Spout
builder.setSpout("spout",new AmtSpout());
// set 滑动窗口 Sliding Window
builder.setBolt("amtBolt",new SliderBolt()
.withWindow(BaseWindowedBolt.Duration.seconds(5),
BaseWindowedBolt.Duration.seconds(2)),1)
.shuffleGrouping("spout");
// 设置日志等级
Config conf = new Config();
conf.setDebug(false);
try {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("filePrint", conf, builder.createTopology());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
SliderBolt:
package com.kfk.window;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.List;
import java.util.Map;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/29
* @time : 12:18 下午
*/
public class SliderBolt extends BaseWindowedBolt {
int amt = 0;
OutputCollector outputCollector = null;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
outputCollector = collector;
super.prepare(stormConf, context, collector);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
super.declareOutputFields(declarer);
}
@Override
public void execute(TupleWindow inputWindow) {
List<Tuple> listNew = inputWindow.getNew();
List<Tuple> listExp = inputWindow.getExpired();
for (Tuple tuple : listNew){
amt += Integer.parseInt(tuple.getStringByField("amt"));
}
for (Tuple tuple : listExp){
int expAmt= Integer.parseInt(tuple.getStringByField("amt"));
amt -=expAmt;
}
System.out.println("近3秒订单:"+amt);
}
}
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!