PVBolt1进行多并发局部汇总,PVSumbolt单线程进行全局汇总
(1)创建数据输入源PVSpout
package storm.test;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class PVSpout implements IRichSpout{
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector ;
private BufferedReader reader;
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
private String str;
@Override
public void nextTuple() {
try {
while((str = reader.readLine()) != null){
collector.emit(new Values(str));
Thread.sleep(500);
}
} catch (Exception e) {
}
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
(2)创建数据处理pvbolt1
package storm.test;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class PVBolt1 implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private long pv = 0;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
// 获取传递过来的数据
String logline = input.getString(0);
// 截取出sessionid
String session_id = logline.split("\t")[1];
// 根据会话id不同统计pv次数
if (session_id != null) {
pv++;
}
// 提交
collector.emit(new Values(Thread.currentThread().getId(), pv));
System.err.println("threadid:" + Thread.currentThread().getId() + " pv:" + pv);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("thireadID", "pv"));
}
@Override
public void cleanup() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
(3)创建PVSumBolt
package storm.test;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class PVSumBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
private Map<Long, Long> counts = new HashMap<>();
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
Long threadID = input.getLong(0);
Long pv = input.getLong(1);
counts.put(threadID, pv);
long word_sum = 0;
Iterator<Long> iterator = counts.values().iterator();
while (iterator.hasNext()) {
word_sum += iterator.next();
}
System.err.println("pv_all:" + word_sum);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
(4)驱动
package storm.test;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class PVMain {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("PVSpout", new PVSpout(), 1);
builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");
builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");
Config conf = new Config();
conf.setNumWorkers(2);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("pvtopology", conf, builder.createTopology());
}
}
}
结果:
134pv_num : 9
sumnum ==>33
128pv_num : 7
sumnum ==>34
134pv_num : 10
sumnum ==>35
128pv_num : 8
sumnum ==>36
128pv_num : 9
sumnum ==>37
128pv_num : 10