Storm实时计算网站的UV

(1)创建带IP地址的数据源GenerateData

package storm.uv;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class GenerateData {
    public static void main (String[] args){
        File logFile = new File("F:\\test\\websit.log");
        Random random = new Random();

        //1 网站名称
        String[] hosts = {"www.zyd.com"};

        //2 会话id
        String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123",
                "XXYH6YCGFJYERTT834R52FDXV9U34",
                "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567",
                "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
        //3 访问网站时间
        String[] time = { "2017-08-07 08:40:50",
                "2017-08-07 08:40:51", 
                "2017-08-07 08:40:52", 
                "2017-08-07 08:40:53",
                "2017-08-07 09:40:49", 
                "2017-08-07 10:40:49", 
                "2017-08-07 11:40:49",
                "2017-08-07 12:40:49" };

        // 3 访问网站时间
        String[] ip = { "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105",
                "192.168.1.106", "192.168.1.107", "192.168.1.108" };

        //4 拼接网站访问日志
        StringBuffer sbBuffer = new StringBuffer();
        for (int i = 0; i < 40; i++) {
            sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]
                    + "\t" + time[random.nextInt(8)] +"\t"+ip[random.nextInt(8)]+ "\n"
            );
        }

        //5 写数据到文件中
        FileOutputStream outputStream = null;
        try {
            outputStream = new FileOutputStream(logFile);
            outputStream.write(sbBuffer.toString().getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

(2)创建接收数据UVSpout

package storm.uv;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.*;
import java.util.Map;

public class UVSpout implements IRichSpout {
    private SpoutOutputCollector collector;
    private  BufferedReader reader;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            //读取文件
        try {
            reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    private String str = null;
    @Override
    public void nextTuple() {
        //业务逻辑

        try {
            while ((str = reader.readLine())!= null){
                //发送
                collector.emit(new Values(str));
                Thread.sleep(500);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("log"));
    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }



    @Override
    public void ack(Object msgId) {

    }

    @Override
    public void fail(Object msgId) {

    }


    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

创建UVSplitBolt

package storm.uv;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class UVSplitBolt implements IRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {

        //业务

        //获取数据
        String line = input.getString(0);
        //截取
        String[] split = line.split("\t");
        String ip = split[3];
        //发送
        collector.emit(new Values(ip,1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("ip","num"));
    }


    @Override
    public void cleanup() {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

创建UVSumbolt,将结果打印到控制台上

package storm.uv;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class UVSumBolt implements IRichBolt {
    private Map<String,Integer> map = new HashMap<>();
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        //业务

        // 获取数据
        String ip = input.getString(0);
        Integer num = input.getInteger(1);
        //去重业务
        if (map.containsKey(ip)){ //包含
            Integer count = map.get(ip);
            map.put(ip,count+num);

        }else {
            map.put(ip,num);
        }
        //打印到控制台
        System.err.println(" ip: "+ ip+ " num : "+ map.get(ip));
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
上一篇:【linux】PV UV 并发量概念的讲解


下一篇:牛客网后端项目实战(四十一):网站统计数据