Storm时间窗口

目录

(1)时间窗口概念

什么是时间窗口?
按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。

使用场景:

  • 计算每隔10分钟的销售额。
  • 计算上1分钟成交金额
  • 计算近1个小时最火爆的微博等。

Storm的时间窗口机制
Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。

有如下两种时间窗口:

  • 滚动窗口 Tumbling Window
  • 滑动窗口 Sliding Window

注意:时间窗口的计算,通常都是单并发(Executer)。

(1)滚动窗口 Tumbling Window

Storm时间窗口
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5      0       5            10          15   -> time
|<------- w1 -->|
        |<---------- w2 ----->|
                |<-------------- w3 ---->|

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

(2)滑动窗口 Tumbling Window

Storm时间窗口
按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。

BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10))

该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。

数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。

(2)滚动窗口程序开发及测试

MainTopology:

package com.kfk.window;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();

        // set Spout
        builder.setSpout("spout",new AmtSpout());

        // set 滚动窗口 Tumbling Window
        builder.setBolt("amtBolt",new TumblingBolt()
                .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1)
                .shuffleGrouping("spout");

        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);

        try {
            if (args != null && args.length > 0) {

                conf.setNumWorkers(3);

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.window;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:24 下午
 */
public class AmtSpout implements IRichSpout {

    Integer[] amt = {10,20,30,40};
    String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {"java","python","c","scala"};

    Random random = new Random();
    SpoutOutputCollector spoutOutputCollector = null;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            spoutOutputCollector = collector;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public void nextTuple() {

        try {
            // 模拟数据
            int _amt = amt[random.nextInt(4)];
            String _time = time[random.nextInt(4)];
            String _city = city[random.nextInt(5)];
            String _product = product[random.nextInt(4)];
            // emit给Bolt节点
            spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
            Thread.sleep(1000);
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void ack(Object msgId) {

    }

    @Override
    public void fail(Object msgId) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

TumblingBolt:

package com.kfk.window;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;

import java.util.List;
import java.util.Map;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:09 下午
 */
public class TumblingBolt extends BaseWindowedBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> list = inputWindow.get();

        int amt = 0;
        for (Tuple tuple:list){
            amt += Integer.parseInt(tuple.getStringByField("amt"));
        }

        System.out.println(amt);
    }
}

(2)滑动窗口程序开发及测试

MainTopology:

package com.kfk.window;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:58 上午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();

        // set Spout
        builder.setSpout("spout",new AmtSpout());

        // set 滑动窗口 Sliding Window
        builder.setBolt("amtBolt",new SliderBolt()
                .withWindow(BaseWindowedBolt.Duration.seconds(5),
                        BaseWindowedBolt.Duration.seconds(2)),1)
                .shuffleGrouping("spout");

        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);

        try {
            if (args != null && args.length > 0) {

                conf.setNumWorkers(3);

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("filePrint", conf, builder.createTopology());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

SliderBolt:

package com.kfk.window;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;

import java.util.List;
import java.util.Map;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 12:18 下午
 */
public class SliderBolt extends BaseWindowedBolt {

    int amt = 0;
    OutputCollector outputCollector = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        outputCollector = collector;

        super.prepare(stormConf, context, collector);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        super.declareOutputFields(declarer);
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> listNew = inputWindow.getNew();
        List<Tuple> listExp = inputWindow.getExpired();

        for (Tuple tuple : listNew){
            amt += Integer.parseInt(tuple.getStringByField("amt"));

        }

        for (Tuple tuple : listExp){
            int expAmt= Integer.parseInt(tuple.getStringByField("amt"));
            amt -=expAmt;

        }

        System.out.println("近3秒订单:"+amt);
    }
}

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

上一篇:Java如何成为以后的主流语


下一篇:Storm介绍