Storm实时计算网站pv

PVBolt1进行多并发局部汇总,PVSumbolt单线程进行全局汇总

(1)创建数据输入源PVSpout

package storm.test;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class PVSpout implements IRichSpout{

    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector ;
    private BufferedReader reader;

    @SuppressWarnings("rawtypes")
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;

        try {
            reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8"));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {

        try {
            if (reader != null) {
                reader.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    private String str;

    @Override
    public void nextTuple() {

        try {
            while((str = reader.readLine()) != null){

                collector.emit(new Values(str));

                Thread.sleep(500);
            }
        } catch (Exception e) {

        }
    }

    @Override
    public void ack(Object msgId) {
    }


    @Override
    public void fail(Object msgId) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

(2)创建数据处理pvbolt1

package storm.test;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class PVBolt1 implements IRichBolt {

    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    private long pv = 0;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        // 获取传递过来的数据
        String logline = input.getString(0);

        // 截取出sessionid
        String session_id = logline.split("\t")[1];

        // 根据会话id不同统计pv次数
        if (session_id != null) {
            pv++;
        }

        // 提交
        collector.emit(new Values(Thread.currentThread().getId(), pv));

        System.err.println("threadid:" + Thread.currentThread().getId() + "  pv:" + pv);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("thireadID", "pv"));

    }

    @Override
    public void cleanup() {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

(3)创建PVSumBolt

package storm.test;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PVSumBolt implements IRichBolt {

    private static final long serialVersionUID = 1L;
    private Map<Long, Long> counts = new HashMap<>();

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        Long threadID = input.getLong(0);
        Long pv = input.getLong(1);

        counts.put(threadID, pv);

        long word_sum = 0;

        Iterator<Long> iterator = counts.values().iterator();

        while (iterator.hasNext()) {
            word_sum += iterator.next();
        }

        System.err.println("pv_all:" + word_sum);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

(4)驱动

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

public class PVMain {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("PVSpout", new PVSpout(), 1);
        builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");
        builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");

        Config conf = new Config();

        conf.setNumWorkers(2);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("pvtopology", conf, builder.createTopology());
        }
    }
}

结果:

134pv_num : 9
sumnum ==>33
128pv_num : 7
sumnum ==>34
134pv_num : 10
sumnum ==>35
128pv_num : 8
sumnum ==>36
128pv_num : 9
sumnum ==>37
128pv_num : 10

上一篇:hibernate-mapping-3.0.dtd;hibernate-configuration-3.0.dtd;hibernate.properties所在路径


下一篇:8-3 持久化存储PersistantVolume