public class BlackListBolt extends BaseRichBolt{ private static Logger logger = Logger.getLogger(BlackListBolt.class); private OutputCollector collector_; private Map<String,List<String>> blacklistMap_ = new ConcurrentHashMap<String,List<String>>(); //实现了从数据库中获取黑名单基础数据的表,加载至内存作为比阿娘blacklistMap_维护 nextTuple()在每个tuple到达时被调用,这里主要实现了车牌在黑名单中的比对。 public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { collector_ = collector; Connection con = null; Statement jjhmd_statement = null; ResultSet jjhmd_resultSet = null; String jjhmd_queryString = "select ID,CPHID,SFSSBJ,SFCL from JJHMD"; try{ con = DBUtil.getDataSource().getConnection(); jjhmd_statement = con.createStatement(); jjhmd_resultSet = jjhmd_statement.executeQuery(jjhmd_queryString); while(jjhmd_resultSet.next()){ String jjhmd_cphid = jjhmd_resultSet.getString("CPHID"); String jjhmd_sfssbj = jjhmd_resultSet.getString("SFSSBJ"); String jjhmd_SFCL = jjhmd_resultSet.getString("SFCL"); String jjhmd_id = jjhmd_resultSet.getString("ID"); List<String> temp_info = new ArrayList<String>(); temp_info.add(jjhmd_sfssbj); temp_info.add(jjhmd_SFCL); temp_info.add(jjhmd_id); blacklistMap_.put(jjhmd_cphid,temp_info); } jjhmd_resultSet.close(); jjhmd_statement.close(); }catch(SQLException e){ e.printStackTrace(); }finally{ if(con!=null){ try{ con.close(); }catch(SQLException e){ logger.warn("",e); } } } } public void execute(Tuple tuple){ String no = tuple.getStringByField("no"); String location = tuple.getStringByFiled("location"); String tpid = tuple.getStringByField(tpidl:); try{ if(blacklistMap_.containsKey(no)){ List<String>temp_info = blacklistMap_.get(no); if(temp_info.get(1).equals("否")){ String msg = convertToMsg(tuple); conllector_.emit(new Values(msg)); } } }catch(Excetption e){ logger.error(e.getMessage()); }finally{ } } …… }
public class BlackListTopology{ //topicSpout接收来自JMS消息中间件的主题数据,且不设置并行度(这是由topic在JMS协议中的语义决定的) public static final String TOPIC_SPOUT = "topic_spout"; //以随机分组的方式接收来自JmsSpout的数据,并行度被设置为2. public static final String BLACKLIST_BOLT = "blacklist_bolt"; //下面这两个均以随机分组的方式接收来自BlackListBolt的数据,分别向消息中间件和数据库写入计算的结果数据. public static final String TOPIC_BOLT = "topic_bolt"; public static final String DB_BOLT = "db_bolt"; public static void main(String[] args) throws Exception{ ConfigUtil cu = ConfigUtil.getInstance(); JmsProvider jmsTopicProvbider_source = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port+")",cu.getMessage_sb_topic(),"",""); //消息中间件的IP地址、端口和主题名称,都是在配置文件中维护的,此处通过ConfigUtil对象从配置文件中获取的。 JmsSpout topicSpout = new JmsSpout(); topicSpout.setJmsProvider(jmsTopicProvider_source); topicSpout.setJmsTupleProducer(new SB_Beijing_TupleProducer()); topicSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); topicSpout.setDistributed(false); JmsProvider jmsTopicProvider_target = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port()+")",cu.getMessage_ijhmdbj_topic(),"","") JmsBolt topicBolt = new JmsBolt(); topicBolt.setJmsProvider(jmsTopicProvider_target); topicBolt.setJmsMessageProducer(new JsonMessageProducer()); topicBolt.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); TopologyBuilder builder = new ToplogyBuilder(); builder.setSpout(TOPIC_SPOUT,topicSpout; builder.setBolt(BLACKLIST_BOLT,new BlackListBolt(),2).shuffleGrouping(TOPIC_SPOUT); builder.setBolt(TOPIC_BOLT,topicBolt,1).shuffleGrouping(BLACKLIST_BOLT); RegisterBlackCarBolt dbBolt = new RegisterBlackCarBolt(); builder.setBolt(DB_BOLT,dbBolt,1).shuffleGrouping(BLACKLIST_BOLT); Config conf = new Config(); conf.setNumWorkers(2) if(args.length >0){ conf.setDebug(false); StormSubmitter.submitTopology(args[0],conf,builder.createTopology()); }else{ conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-traffic-blcaklist",conf,builder.createTopology()); Utils.sleep(6000000); cluster.killTopology("storm-traffic-blacklist"); cluster.shutdown(); } } }
topicBolt是类JmsBolt的对象,它以随机分组的方式,也接受来自BlackListBolt的数据,即黑名单检索的即时结果,然后向消息中间件写入计算的结果数据 public class JmsBolt extends BaseRichBolt{ private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); private Connection connection; …… public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){ if(this.jmsProvider == null || this.producer == null){ throw new IllegalStateException("JMS Provider and MessageProducer not set."); } this.collector = collector; try{ ConnectionFactory cf = this.jmsProvider.connectionFactory(); Destination dest = this.jmsProvider.destination(); this.connection = cf.createConnection(); this.session = connection.createSeesion(this.jmsTransactional,this.jmsAcknowledgeMode); this.messageProducer = session.createProducer(dest); connection.start(); } catch(Exception e){ LOG.warn("Error creating JMS connection.",e); } } public void execute(Tuple input){ try{ Message msg = this.producer.toMessage(this.session,input); if(msg!=null){ if(msg.getJMSDestination()!=null){ this.messageProducer.sen(msg.getJMSDestination(),msg); }else{ this.messageProducer.send(msg); } } if(this.autoAck){ LOG.debug("ACKing tuple:"+input); this.collector.ack(intput); } }catch(JMSException e){ LOG.warn("Failing tuple:" + input + "Exception:" + e); this.collector.fail(input); } } …… }
JmsSpout类继承了BaseRichSpout类并实现了MessageListener接口。作为JMS的客户端,JmsSpout实现了MessageListener接口,这里分析一下该接口声明的方法onMessage().方法onMessage()在Jms消息中间件向它推送一个消息时这里的实现是将得到的消息放入缓存队列queue对象中. public class JmsSpout extends BaseRichSpout implements MessageListener{ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); private LinkedBlockingQueue<Message>queue; private ConcurrentHashMap<String,Message>pendingMessages; …… public void onMessage(Message msg){ try{ LOG.debug("Queuing msg ["+msg.getJMSMessageID()+"]"); }catch(JMSException e){ } this.queue.offer(msg); } //从queue对象获取数据,组织为tuple结构后发送(emit); public void nextTuple(){ Message msg = this.queue.poll(); if(msg == null){ Utils.sleep(50); }else{ LOG.debug("sending tuple:"+msg); try{ Values vals = this.tupleProducer.toTuple(msg); if(this.isDurableSubscription()||(msg.getJMSDeliveryMode()!=Session.AUTO_ACKNOWLEDGE)){ LOG.debug("Requesting acks."); this.collector.emit(vals,msg.getJMSMessageID()); this.pendingMessages.put(msg.getJMSMessageID(),msg); }else{ this.collector.emit(vals); }catch(JMSException e){ LOG.warn("Unable to convert JMS message:"+msg); } } } //在tuple需要被确认处理成功时调用,这里的实现是从中间结果队列pendingMessages移除相应数据项,并对这条消息调用JMS的方法acknowledge()进行确认. public void ack(Object msgId){ Message msg = this.pendingMessage.remove(msgId); if(msg!=null){ try{ msg.acknowledge(); LOG.debug("JMS Message Scked:"+msgId); }catch(JMSException e){ LOG.warn("Error acknowldging JMS message:" + msgId,e); } }else{ LOG.warn("Couldn't acknowledge unknown JMS message ID:"+msgId); } } //在tuple需要被确认处理失败时调用,这里的实现是从中间结果队列pendingMessages移除相应数据项,并设置存在失败的标志位. public void fail(Object msgId){ LOG.warn("Message failed:" + msgId); this.pendingMessages.remove(msgId); synchronized(this.recoveryMutex);{ this.hasFailures = true; } } } …… }
//dbBolt是类RegisterBlackCarBolt的对象,它以随机分组的方式,接受来自BlackListBolt的数据,也即黑名单检索的即时结果,然后向数据库写入计算的结果数据。 public class RegisterBlackCarBolt implements IBasicBolt{ private static Logger log = Logger.getLogger(RegisterBlackCarBolt.class); private Connection con = null; private String tableName = "JJHMDBJ"; private void prepare(Map stormConf,TopologyContext context){ try{ con = DBUtil.getDataSource().getConnection(); }catch(SQLException el){ el.printStackTrace(); } } public void execute(Tuple input,BasicOutputCollector collector){ String json = (String)input.getValue(0); String[] tupleStrs = json.split(","); try{ String stmt = "insert into "+tableName+"("+TPID+","+JCDID+","+HMDID+","+CPHID+","+LRSJ+","+primaryKey+","+FQH+") values(?,?,?,?,?,?,?)"; PreparedStatment prepstmt = con.prepareStatement(stmt); if(tupleStrs.length==5){ prepstmt.setString(1,tupleStrs[0]); prepstmt.setString(2,tupleStrs[1]); prepstmt.setString(3,tupleStrs[2]); prepstmt.setString(4,tupleStrs[3]); prepstmt.setTimestamp(5,new Timestamp((TimeUtil.string2datetime(tupleStrs[4])).getTime())); prepstmt.setInt(6,1); prepstmt.setInt(7,getPartNO(tupleStrs[4])); }else{ log.error("tupple attribte size error!"); } int r = prepstmt.executeUpdate(); log.info("insert"+r+" row"); }catch(Exception e){ e.printStackTrace(); } } …… }