netty实现远程调用RPC功能
PRC的功能一句话说白了,就是远程调用其他电脑的api
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.31.Final</version>
</dependency>
服务端功能模块编写
-
项目结构
-
ClassInfo
package test; import java.io.Serializable; public class ClassInfo implements Serializable { private static final long serialVersionUID = -8970942815543515064L; private String className;//类名
private String methodName;//函数名称
private Class<?>[] types;//参数类型
private Object[] objects;//参数列表
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypes() {
return types;
}
public void setTypes(Class<?>[] types) {
this.types = types;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
} -
InvokerHandler
package test; import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class InvokerHandler extends ChannelInboundHandlerAdapter {
public static ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<String,Object>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo)msg;
Object claszz = null;
if(!classMap.containsKey(classInfo.getClassName())){
try {
claszz = Class.forName(classInfo.getClassName()).getDeclaredConstructor().newInstance();
classMap.put(classInfo.getClassName(), claszz);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
e.printStackTrace();
}
}else {
claszz = classMap.get(classInfo.getClassName());
}
Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
Object result = method.invoke(claszz, classInfo.getObjects());
System.out.println("服务器接受客户端的数据" + result);
ctx.write("SeverSendData");
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
} } -
RPCServer
package test; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder; public class RPCServer {
private int port; public RPCServer(int port) {
this.port = port;
} public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class).localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder",
new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new InvokerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new RPCServer(port).start();
}
} -
SeverClass
package test; public class SeverClass implements SeverInterface { @Override
public String fn(String data) {
return data;
} } -
SeverInterface
package test; public interface SeverInterface {
String fn(String data);
}
客户端功能模块编写
-
目录结构
-
App
package test; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder; public class App
{
public static void main(String [] args) throws Throwable{ ClassInfo classInfo = new ClassInfo();
classInfo.setClassName("test.SeverClass");
classInfo.setMethodName("fn");
classInfo.setObjects(new Object[]{"ClientSendData"});
classInfo.setTypes(new Class[]{String.class}); ResultHandler resultHandler = new ResultHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",resultHandler);
}
}); ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
} System.out.println("调用服务器数据返回回来的数据:" + resultHandler.getResponse());
}
} -
ResultHandler
package test; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class ResultHandler extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() {
return response;
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
System.out.println("调用服务器数据返回回来的数据:" + response);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}