定义RPC协议
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface ClientProtocol extends VersionedProtocol {
public static final long versionID = 1L;
public String hello(String msg) throws IOException;
}
实现RPC协议
import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;
/**
* DateTime: 2014年12月28日 上午9:41:38
*
*/
public class ClientProtocolImpl implements ClientProtocol {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(versionID, null);
}
@Override
public String hello(String msg) throws IOException {
return "hello " + msg;
}
}
构建并启动RPC Server
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
public class RPCServer {
private static final String HOST = "localhost";
private static final int PORT = 2181;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
.setInstance(new ClientProtocolImpl()).setBindAddress(HOST)
.setNumHandlers(2)
.setPort(PORT).build();
server.start();
}
}
构造并启动RPC Clinet并发送RPC请求
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
/**
* DateTime: 2014年12月28日 上午9:52:19
*
*/
public class RPCClient {
private static final String HOST = "localhost";
private static final int PORT = 2181;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
ClientProtocol proxy = RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID,
new InetSocketAddress(HOST, PORT), conf);
String result = proxy.hello("world");
System.out.println(result);
}
}