一.创建server端
public class ManualDRPC { private static final Logger LOG = LoggerFactory.getLogger(ManualDRPC.class); public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TopologyBuilder builder = new TopologyBuilder(); DRPCSpout spout = new DRPCSpout("add"); builder.setSpout("drpc", spout); builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add"); Config conf = new Config(); StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology()); LOG.warn("=================================================="); LOG.warn("the topology {} is submitted.","ManualDRPC"); LOG.warn("=================================================="); } public static class AddBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { Object returnInfo = input.getValue(1); String params = input.getString(0); String[] numbers = params.split(","); String conversValue = String.valueOf(Integer.parseInt(numbers[0]) + Integer.parseInt(numbers[1])); collector.emit(new Values(conversValue,returnInfo)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result","return-info")); } } }
二. client端
public class ManualClientDRPC { private static final Logger LOG = LoggerFactory.getLogger(ManualClientDRPC.class); public static void main(String[] args) { Config conf = new Config(); conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin"); conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10); conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20); try { DRPCClient client = new DRPCClient(conf, "master", 3777); String result = client.execute("add", "1,2"); LOG.info("============== result:{}",result); } catch (Exception e) { LOG.info("ERR"); } } }