Strom 与mysql整合+与hdfs整合

Storm 与mysql整合

代码目录结构如下:

Strom 与mysql整合+与hdfs整合

?

?

DataSpout


package?com.gec.demo.stormToMysql;

import?org.apache.storm.spout.SpoutOutputCollector;
import?org.apache.storm.task.TopologyContext;
import?org.apache.storm.topology.OutputFieldsDeclarer;
import?org.apache.storm.topology.base.BaseRichSpout;
import?org.apache.storm.tuple.Fields;
import?org.apache.storm.tuple.Values;

import?java.util.Map;
import?java.util.Random;

public?class?DataSpout?extends?BaseRichSpout?{
????private??static??String[]?words=new?String[]{
????????"hadoop","yarn","mapreduce"
????};
????private??static??int??_id=0;
????private??SpoutOutputCollector?collector;
????@Override
????public?void?open(Map?map,?TopologyContext?topologyContext,?SpoutOutputCollector?spoutOutputCollector)?{
????????this.collector=spoutOutputCollector;
????}

????@Override
????public?void?nextTuple()?{
????????this.collector.emit(new?Values(_id++,words[new?Random().nextInt(words.length)]));
????}

????@Override
????public?void?declareOutputFields(OutputFieldsDeclarer?outputFieldsDeclarer)?{
????????outputFieldsDeclarer.declare(new?Fields("_id","word"));
????}
}


?
JdbcInsertBoltMain


package?com.gec.demo.stormToMysql;

import?com.google.common.collect.Maps;
import?org.apache.storm.Config;
import?org.apache.storm.LocalCluster;
import?org.apache.storm.StormSubmitter;
import?org.apache.storm.generated.AlreadyAliveException;
import?org.apache.storm.generated.AuthorizationException;
import?org.apache.storm.generated.InvalidTopologyException;
import?org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import?org.apache.storm.jdbc.common.ConnectionProvider;
import?org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import?org.apache.storm.jdbc.mapper.JdbcMapper;
import?org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import?org.apache.storm.topology.TopologyBuilder;

import?java.util.Map;

public?class?JdbcInsertBoltMain?{
????public?static?void?main(String[]?args)?throws?InvalidTopologyException,?AuthorizationException,?AlreadyAliveException?{
????????Map?hikariConfigMap?=?Maps.newHashMap();
????????hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
????????hikariConfigMap.put("dataSource.url",?"jdbc:mysql://hadoop-001:3306/storm_db");
????????hikariConfigMap.put("dataSource.user","root");
????????hikariConfigMap.put("dataSource.password","123456");
????????ConnectionProvider?connectionProvider?=?new?HikariCPConnectionProvider(hikariConfigMap);
????????String?tableName="t_word";
????????JdbcMapper?simpleJdbcMapper=new?SimpleJdbcMapper(tableName,connectionProvider);
????????JdbcInsertBolt?userPersistanceBolt?=?new?JdbcInsertBolt(connectionProvider,?simpleJdbcMapper)
????????????????.withInsertQuery("insert?into?t_word?values?(?,?)")
????????????????.withQueryTimeoutSecs(30);


????????TopologyBuilder?builder?=?new?TopologyBuilder();
????????builder.setSpout("dataspout",new?DataSpout());
????????//添加我们storm与jdbc整合的bolt
????????builder.setBolt("userPersistanceBolt"?,userPersistanceBolt).localOrShuffleGrouping("dataspout");

????????Config?config?=?new?Config();
????????if(null??!=?args?&&?args.length?>0){
????????????StormSubmitter.submitTopology(args[0],config,builder.createTopology());
????????}else{
????????????LocalCluster?cluster?=?new?LocalCluster();
????????????cluster.submitTopology("JdbcInsertBoltMain",config,builder.createTopology());
????????}
????}
}

效果图如下:
?

?


?

?

?

Storm 与hdfs整合

代码目录结构如下:

Strom 与mysql整合+与hdfs整合

?

DataSpout

package?com.gec.demo.stormtohdfs;

import?org.apache.storm.spout.SpoutOutputCollector;
import?org.apache.storm.task.TopologyContext;
import?org.apache.storm.topology.OutputFieldsDeclarer;
import?org.apache.storm.topology.base.BaseRichSpout;
import?org.apache.storm.tuple.Fields;
import?org.apache.storm.tuple.Values;

import?java.util.Map;
import?java.util.Random;

public?class?DataSpout?extends?BaseRichSpout?{
????private?SpoutOutputCollector?collector;
????private?static?String?datas[]=new?String[]{
????????????"hello","world","java","hadoop"
????};


????@Override
????public?void?open(Map?conf,?TopologyContext?context,?SpoutOutputCollector?collector)?{

????????this.collector=collector;

????}

????/*
?????*?循环调用
?????*?*/
????@Override
????public?void?nextTuple()?{

????????//生成此数据
????????String?data=datas[new?Random().nextInt(datas.length)];
????????//发送数据到下游组件
????????collector.emit(new?Values(data));

????}

????/**
?????*?此方法是对发送数据进行声明
?????*?*/
????@Override
????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{

????????declarer.declare(new?Fields("word"));

????}
}


StromHdfsMain

package?com.gec.demo.stormtohdfs;

import?org.apache.storm.Config;
import?org.apache.storm.LocalCluster;
import?org.apache.storm.StormSubmitter;
import?org.apache.storm.generated.AlreadyAliveException;
import?org.apache.storm.generated.AuthorizationException;
import?org.apache.storm.generated.InvalidTopologyException;
import?org.apache.storm.hdfs.bolt.HdfsBolt;
import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import?org.apache.storm.hdfs.bolt.format.FileNameFormat;
import?org.apache.storm.hdfs.bolt.format.RecordFormat;
import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import?org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import?org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import?org.apache.storm.topology.TopologyBuilder;
import?org.apache.storm.tuple.Tuple;

public?class?StromHdfsMain?{
????public?static?void?main(String[]?args)?throws?InvalidTopologyException,?AuthorizationException,?AlreadyAliveException?{
???????//?use?"|"?instead?of?","??for?field?delimiter
????????RecordFormat?format=new?DelimitedRecordFormat()
????????????????.withFieldDelimiter("|");
????????/**
?????????*?文件的控制策略,使用两种方式,第一种:数据条数的多少
?????????*?第二种:文件的内容大小
?????????*/
????????//?sync?the?filesystem?after?every?1k?tuples
????????SyncPolicy?syncPolicy=new?CountSyncPolicy(1000);
????????//rotate?files?when?they?reach?5?MB
????????FileRotationPolicy?rotationPolicy=new?FileSizeRotationPolicy(5.0f,?FileSizeRotationPolicy.Units.MB)?;
????????FileNameFormat?fileNameFormat=new?DefaultFileNameFormat()
????????????????.withPath("/stormToHdfs/");
????????HdfsBolt?hdfsBolt=new?HdfsBolt()
????????????????.withFsUrl("hdfs://hadoop-001:9000")
????????????????.withFileNameFormat(fileNameFormat)
????????????????.withRecordFormat(format)
????????????????.withRotationPolicy(rotationPolicy)
????????????????.withSyncPolicy(syncPolicy);
????????TopologyBuilder?topologyBuilder=new?TopologyBuilder();
????????topologyBuilder.setSpout("dataSpout",new?DataSpout());
????????topologyBuilder.setBolt("hdfsBolt",hdfsBolt).localOrShuffleGrouping("dataSpout");
????????Config?config??=?new?Config();
????????if(args?!=null?&&?args.length?>0){
????????????StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
????????}else{
????????????LocalCluster?cluster?=?new?LocalCluster();
????????????cluster.submitTopology("stormToHdfs",config,topologyBuilder.createTopology());

????????}
????}
}


效果图如下:

Strom 与mysql整合+与hdfs整合

?

(0)
(0)
   
举报
评论 一句话评论(0
上一篇:windows用Navicat连接虚拟机的Mysql


下一篇:一.数据库基础