Storm模拟将接收到日志的会话id打印在控制台

需求:

(1)模拟访问网站的日志信息,包括:网站名称、会话id、访问网站时间等

(2)将接收到日志的会话id打印到控制台

分析

​ (1)创建网站访问日志工具类

​ (2)在spout中读取日志文件,并一行一行发射出去

​ (3)在bolt中将获取到的一行一行数据的会话id获取到,并打印到控制台。

​ (4)main方法负责拼接spout和bolt的拓扑。

案例实操

(1)创建网站访问日志

GenerateData生产数据:
package storm;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class GenerateData {
    public static void main (String[] args){
        File logFile = new File("F:\\test\\websit.log");
        Random random = new Random();

        //1 网站名称
        String[] hosts = {"www.zyd.com"};

        //2 会话id
        String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123",
                "XXYH6YCGFJYERTT834R52FDXV9U34",
                "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567",
                "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
        //3 访问网站时间
        String[] time = { "2017-08-07 08:40:50",
                "2017-08-07 08:40:51", 
                "2017-08-07 08:40:52", 
                "2017-08-07 08:40:53",
                "2017-08-07 09:40:49", 
                "2017-08-07 10:40:49", 
                "2017-08-07 11:40:49",
                "2017-08-07 12:40:49" };
        //4 拼接网站访问日志
        StringBuffer sbBuffer = new StringBuffer();
        for (int i = 0; i < 40; i++) {
            sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]
                    + "\t" + time[random.nextInt(8)] + "\n"
            );
        }

        //5 写数据到文件中
        FileOutputStream outputStream = null;
        try {
            outputStream = new FileOutputStream(logFile);
            outputStream.write(sbBuffer.toString().getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

生产数据集

www.zyd.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-08-07 08:40:52
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 11:40:49
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 08:40:53
www.zyd.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-08-07 10:40:49
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 08:40:52
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 12:40:49

WebLogSpout: 接收一行一行的文件
package storm.weblog;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.*;
import java.util.Map;

public class WebLogSpout implements IRichSpout{
    private static final long serialVersionUID = 1L;
    private BufferedReader br;
    private SpoutOutputCollector collector = null;
    private String str = null;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

        //打开输入的文件
        try {
            this.collector = collector;
            this.br = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public void nextTuple() {

        //循环调用的方法
        try {
            while ((str = this.br.readLine())!= null){
                //发射出去
                collector.emit(new Values(str));
                //Thread.sleep(3000);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        //声明输出字段类型
        outputFieldsDeclarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

WebLogBolt

package storm.weblog;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

public class WebLogBolt implements IRichBolt {
    private int line_number = 0;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        //准备
    }

    @Override
    public void execute(Tuple input) {
        //执行
        //1 获取数据

        String log = input.getStringByField("log");//和spout获取数据信息名称要对应

        String line = input.getString(0);
        //2 切割数据
        String[] split = line.split("\t");

        String session_id = split[1];
        //3 统计发送行数
        line_number++;

        //4 打印 线程id,方便后期测试
        System.out.println(Thread.currentThread().getId()+ "session_id"+"   "+session_id+"line_number"+line_number);

    }

    @Override
    public void cleanup() {
        //清除资源

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //声明
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        //获取配置信息
        return null;
    }
}

WebLogMain组合Spout和Bolt

package storm.weblog;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

public class WebLogMain {
    public static void main (String[] args){
        // 1 创建拓扑
        TopologyBuilder builder = new TopologyBuilder();
        //2 名称 ,对象 ,设置并行度
        builder.setSpout("WebLogSpout",new WebLogSpout(),1);
        //  7种分组方式,ShuffleGrouping分组方式比较常见,随机分配
        builder.setBolt("WebLogBolt",new WebLogBolt(),1).shuffleGrouping("WebLogSpout");
        //3 创建配置信息对象
        Config conf  = new Config();
        conf.setNumWorkers(2);

        //4 提交程序
        if (args.length > 0){ //集群提交
            try {
                StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else {// 本地提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("webtopology",conf,builder.createTopology());

        }

    }
}

输出举例

158session_id XXYH6YCGFJYERTT834R52FDXV9U34line_number34
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number35
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number36

对原weblog.txt文件,增加信息和插入信息,信息格式不变.控制台可以实时监测文件增加信息,并把信息读取到再处理

上一篇:Storm分组策略


下一篇:Storm命令以及常用API