storm manual drpc 的远程调用

一.创建server端

public class ManualDRPC {
    
    private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class);
    
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        
        TopologyBuilder builder = new TopologyBuilder();
        
        DRPCSpout spout = new DRPCSpout("add");
        builder.setSpout("drpc", spout);
        builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
        
        Config conf = new Config();
        StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
        LOG.warn("==================================================");
        LOG.warn("the topology {} is submitted.","ManualDRPC");
        LOG.warn("==================================================");
        
    }
    public static class AddBolt extends BaseBasicBolt{

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object returnInfo = input.getValue(1);
            String params = input.getString(0);
            String[] numbers = params.split(",");
            String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1]));
            collector.emit(new Values(conversValue,returnInfo));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("result","return-info"));
        }
        
    }
}

二. client端

public class ManualClientDRPC {
    
    private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class);
    
    public static void main(String[] args) {
        Config conf = new Config();
        conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
        
        try {
            DRPCClient client = new DRPCClient(conf, "master", 3777);
            String result = client.execute("add", "1,2");
            LOG.info("============== result:{}",result);
        } catch (Exception e) {
            LOG.info("ERR");
        }
    }
}

 

上一篇:教你大数据必修三大技能 快快记录下来


下一篇:实时流处理 Storm、Spark Streaming、Samza、Flink 比较