Storm 与mysql整合
代码目录结构如下:
?
?
DataSpout
package?com.gec.demo.stormToMysql; import?org.apache.storm.spout.SpoutOutputCollector; import?org.apache.storm.task.TopologyContext; import?org.apache.storm.topology.OutputFieldsDeclarer; import?org.apache.storm.topology.base.BaseRichSpout; import?org.apache.storm.tuple.Fields; import?org.apache.storm.tuple.Values; import?java.util.Map; import?java.util.Random; public?class?DataSpout?extends?BaseRichSpout?{ ????private??static??String[]?words=new?String[]{ ????????"hadoop","yarn","mapreduce" ????}; ????private??static??int??_id=0; ????private??SpoutOutputCollector?collector; ????@Override ????public?void?open(Map?map,?TopologyContext?topologyContext,?SpoutOutputCollector?spoutOutputCollector)?{ ????????this.collector=spoutOutputCollector; ????} ????@Override ????public?void?nextTuple()?{ ????????this.collector.emit(new?Values(_id++,words[new?Random().nextInt(words.length)])); ????} ????@Override ????public?void?declareOutputFields(OutputFieldsDeclarer?outputFieldsDeclarer)?{ ????????outputFieldsDeclarer.declare(new?Fields("_id","word")); ????} }
?
JdbcInsertBoltMain
package?com.gec.demo.stormToMysql; import?com.google.common.collect.Maps; import?org.apache.storm.Config; import?org.apache.storm.LocalCluster; import?org.apache.storm.StormSubmitter; import?org.apache.storm.generated.AlreadyAliveException; import?org.apache.storm.generated.AuthorizationException; import?org.apache.storm.generated.InvalidTopologyException; import?org.apache.storm.jdbc.bolt.JdbcInsertBolt; import?org.apache.storm.jdbc.common.ConnectionProvider; import?org.apache.storm.jdbc.common.HikariCPConnectionProvider; import?org.apache.storm.jdbc.mapper.JdbcMapper; import?org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import?org.apache.storm.topology.TopologyBuilder; import?java.util.Map; public?class?JdbcInsertBoltMain?{ ????public?static?void?main(String[]?args)?throws?InvalidTopologyException,?AuthorizationException,?AlreadyAliveException?{ ????????Map?hikariConfigMap?=?Maps.newHashMap(); ????????hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); ????????hikariConfigMap.put("dataSource.url",?"jdbc:mysql://hadoop-001:3306/storm_db"); ????????hikariConfigMap.put("dataSource.user","root"); ????????hikariConfigMap.put("dataSource.password","123456"); ????????ConnectionProvider?connectionProvider?=?new?HikariCPConnectionProvider(hikariConfigMap); ????????String?tableName="t_word"; ????????JdbcMapper?simpleJdbcMapper=new?SimpleJdbcMapper(tableName,connectionProvider); ????????JdbcInsertBolt?userPersistanceBolt?=?new?JdbcInsertBolt(connectionProvider,?simpleJdbcMapper) ????????????????.withInsertQuery("insert?into?t_word?values?(?,?)") ????????????????.withQueryTimeoutSecs(30); ????????TopologyBuilder?builder?=?new?TopologyBuilder(); ????????builder.setSpout("dataspout",new?DataSpout()); ????????//添加我们storm与jdbc整合的bolt ????????builder.setBolt("userPersistanceBolt"?,userPersistanceBolt).localOrShuffleGrouping("dataspout"); ????????Config?config?=?new?Config(); ????????if(null??!=?args?&&?args.length?>0){ ????????????StormSubmitter.submitTopology(args[0],config,builder.createTopology()); ????????}else{ ????????????LocalCluster?cluster?=?new?LocalCluster(); ????????????cluster.submitTopology("JdbcInsertBoltMain",config,builder.createTopology()); ????????} ????} }
效果图如下:
?
?
?
?
?
Storm 与hdfs整合
代码目录结构如下:
?
DataSpout
package?com.gec.demo.stormtohdfs; import?org.apache.storm.spout.SpoutOutputCollector; import?org.apache.storm.task.TopologyContext; import?org.apache.storm.topology.OutputFieldsDeclarer; import?org.apache.storm.topology.base.BaseRichSpout; import?org.apache.storm.tuple.Fields; import?org.apache.storm.tuple.Values; import?java.util.Map; import?java.util.Random; public?class?DataSpout?extends?BaseRichSpout?{ ????private?SpoutOutputCollector?collector; ????private?static?String?datas[]=new?String[]{ ????????????"hello","world","java","hadoop" ????}; ????@Override ????public?void?open(Map?conf,?TopologyContext?context,?SpoutOutputCollector?collector)?{ ????????this.collector=collector; ????} ????/* ?????*?循环调用 ?????*?*/ ????@Override ????public?void?nextTuple()?{ ????????//生成此数据 ????????String?data=datas[new?Random().nextInt(datas.length)]; ????????//发送数据到下游组件 ????????collector.emit(new?Values(data)); ????} ????/** ?????*?此方法是对发送数据进行声明 ?????*?*/ ????@Override ????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{ ????????declarer.declare(new?Fields("word")); ????} }
StromHdfsMain
package?com.gec.demo.stormtohdfs; import?org.apache.storm.Config; import?org.apache.storm.LocalCluster; import?org.apache.storm.StormSubmitter; import?org.apache.storm.generated.AlreadyAliveException; import?org.apache.storm.generated.AuthorizationException; import?org.apache.storm.generated.InvalidTopologyException; import?org.apache.storm.hdfs.bolt.HdfsBolt; import?org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import?org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import?org.apache.storm.hdfs.bolt.format.FileNameFormat; import?org.apache.storm.hdfs.bolt.format.RecordFormat; import?org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import?org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import?org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import?org.apache.storm.hdfs.bolt.sync.SyncPolicy; import?org.apache.storm.topology.TopologyBuilder; import?org.apache.storm.tuple.Tuple; public?class?StromHdfsMain?{ ????public?static?void?main(String[]?args)?throws?InvalidTopologyException,?AuthorizationException,?AlreadyAliveException?{ ???????//?use?"|"?instead?of?","??for?field?delimiter ????????RecordFormat?format=new?DelimitedRecordFormat() ????????????????.withFieldDelimiter("|"); ????????/** ?????????*?文件的控制策略,使用两种方式,第一种:数据条数的多少 ?????????*?第二种:文件的内容大小 ?????????*/ ????????//?sync?the?filesystem?after?every?1k?tuples ????????SyncPolicy?syncPolicy=new?CountSyncPolicy(1000); ????????//rotate?files?when?they?reach?5?MB ????????FileRotationPolicy?rotationPolicy=new?FileSizeRotationPolicy(5.0f,?FileSizeRotationPolicy.Units.MB)?; ????????FileNameFormat?fileNameFormat=new?DefaultFileNameFormat() ????????????????.withPath("/stormToHdfs/"); ????????HdfsBolt?hdfsBolt=new?HdfsBolt() ????????????????.withFsUrl("hdfs://hadoop-001:9000") ????????????????.withFileNameFormat(fileNameFormat) ????????????????.withRecordFormat(format) ????????????????.withRotationPolicy(rotationPolicy) ????????????????.withSyncPolicy(syncPolicy); ????????TopologyBuilder?topologyBuilder=new?TopologyBuilder(); ????????topologyBuilder.setSpout("dataSpout",new?DataSpout()); ????????topologyBuilder.setBolt("hdfsBolt",hdfsBolt).localOrShuffleGrouping("dataSpout"); ????????Config?config??=?new?Config(); ????????if(args?!=null?&&?args.length?>0){ ????????????StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology()); ????????}else{ ????????????LocalCluster?cluster?=?new?LocalCluster(); ????????????cluster.submitTopology("stormToHdfs",config,topologyBuilder.createTopology()); ????????} ????} }
效果图如下:
?
踩
(0)
赞
(0)
举报
评论 一句话评论(0)