(1)创建带IP地址的数据源GenerateData
package storm.uv;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
public class GenerateData {
public static void main (String[] args){
File logFile = new File("F:\\test\\websit.log");
Random random = new Random();
//1 网站名称
String[] hosts = {"www.zyd.com"};
//2 会话id
String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123",
"XXYH6YCGFJYERTT834R52FDXV9U34",
"BBYH61456FGHHJ7JL89RG5VV9UYU7",
"CYYH6Y2345GHI899OFG4V9U567",
"VVVYH6Y4V4SFXZ56JIPDPB4V678" };
//3 访问网站时间
String[] time = { "2017-08-07 08:40:50",
"2017-08-07 08:40:51",
"2017-08-07 08:40:52",
"2017-08-07 08:40:53",
"2017-08-07 09:40:49",
"2017-08-07 10:40:49",
"2017-08-07 11:40:49",
"2017-08-07 12:40:49" };
// 3 访问网站时间
String[] ip = { "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105",
"192.168.1.106", "192.168.1.107", "192.168.1.108" };
//4 拼接网站访问日志
StringBuffer sbBuffer = new StringBuffer();
for (int i = 0; i < 40; i++) {
sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]
+ "\t" + time[random.nextInt(8)] +"\t"+ip[random.nextInt(8)]+ "\n"
);
}
//5 写数据到文件中
FileOutputStream outputStream = null;
try {
outputStream = new FileOutputStream(logFile);
outputStream.write(sbBuffer.toString().getBytes());
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
(2)创建接收数据UVSpout
package storm.uv;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.*;
import java.util.Map;
public class UVSpout implements IRichSpout {
private SpoutOutputCollector collector;
private BufferedReader reader;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
//读取文件
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
private String str = null;
@Override
public void nextTuple() {
//业务逻辑
try {
while ((str = reader.readLine())!= null){
//发送
collector.emit(new Values(str));
Thread.sleep(500);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log"));
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
创建UVSplitBolt
package storm.uv;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class UVSplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
//业务
//获取数据
String line = input.getString(0);
//截取
String[] split = line.split("\t");
String ip = split[3];
//发送
collector.emit(new Values(ip,1));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip","num"));
}
@Override
public void cleanup() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
创建UVSumbolt,将结果打印到控制台上
package storm.uv;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class UVSumBolt implements IRichBolt {
private Map<String,Integer> map = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
//业务
// 获取数据
String ip = input.getString(0);
Integer num = input.getInteger(1);
//去重业务
if (map.containsKey(ip)){ //包含
Integer count = map.get(ip);
map.put(ip,count+num);
}else {
map.put(ip,num);
}
//打印到控制台
System.err.println(" ip: "+ ip+ " num : "+ map.get(ip));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}