strom 简单案例

Topology

package com.zxf.strom;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

/**
 * strom
 */
public class MyTopology {

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("myspout",new MySpout(),2);
        topologyBuilder.setBolt("mybolt",new MyBolt(),2).shuffleGrouping("myspout");
        try {
            if(args.length==0) {
                //这个是本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("submitTopology",new Config(),topologyBuilder.createTopology());
            }else {
                Config config = new Config();
                config.setNumWorkers(2);
                StormSubmitter.submitTopology("wordcount1", config, topologyBuilder.createTopology());
            }
        }catch (Exception e){
           System.out.println(e.getStackTrace());
        }
    }
}

Spout

package com.zxf.strom;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 *
 */
public class MySpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private Long num = 0l;
    /**
     * 这个是strom 初始化 接口
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
        System.out.println("RichSpout open 初始化了 ");
    }

    public void nextTuple() {
        num = num+1;
        collector.emit(new Values(num));
        System.out.println("RichSpout nextTuple 被调用了 ");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("num"));
    }
}

Bolt

package com.zxf.strom;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

import java.util.Map;


/**
 *
 */
public class MyBolt extends BaseBasicBolt {

    private long count = 0 ;
    /**
     *  BaseBasicBolt
     * @param topoConf
     * @param context
     */
    public void prepare(Map<String, Object> topoConf, TopologyContext context) {
        System.out.println("BaseBasicBolt 被初始化了");
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        Long num = tuple.getLongByField("num");
        count = count+ num;
        System.out.println("BaseBasicBolt execute"+count);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

 

上一篇:关于TiDB数据脱敏的一些想法


下一篇:Strom概述及部署