Netty实现RPC
1.服务接口定义
public interface CityService
{
String getCityName();
}
2.服务提供者
2.1服务接口实现
public class CityServiceImpl implements CityService
{
@Override
public String getCityName()
{
return "厦门";
}
}
2.2服务netty server实现
public class NettyServer
{
public static void main(String[] args) throws Exception
{
new NettyServer().start();
}
private static Map<String, CityService> serviceMap = new HashMap<>();
public void start() throws Exception
{
initService();
initNettyServer();
}
private void initService()
{
serviceMap.put("com.yalong.NIO.rpc.CityService", new CityServiceImpl());
}
private void initNettyServer() throws InterruptedException
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try
{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.
group(bossGroup, workGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG, 1024).
childOption(ChannelOption.SO_KEEPALIVE, true).
childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel channel) throws Exception
{
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new NettyServerHandler(serviceMap));
}
});
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally
{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
2.3接口对应的处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter
{
private Map<String, CityService> serviceMap;
public NettyServerHandler(Map<String, CityService> serviceMap)
{
this.serviceMap = serviceMap;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
if (msg instanceof Msg)
{
String className = ((Msg) msg).getClassName();
CityService cityService = serviceMap.get(className);
if (cityService != null)
{
String cityName = cityService.getCityName();
ctx.writeAndFlush(cityName);
}
}else
{
System.out.println("++++================++++");
}
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.服务消费者
3.1服务Client端实现
public class NettyClient
{
public static void main(String[] args) throws Exception
{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
NettyClientHandler nettyClientHandler = new NettyClientHandler();
//启动netty client
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(nettyClientHandler);
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
//通过代理模式发送消息
CityService cityService = (CityService) Proxy.newProxyInstance(CityService.class.getClassLoader(),
new Class[]{CityService.class}, new InvocationHandler()
{
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
//发送远程调用
System.out.println("=========开始发送远程调用===========");
String clazzName = CityService.class.getName();
Msg msg = new Msg();
msg.setClassName(clazzName);
try
{
future.channel().writeAndFlush(msg);
future.channel().closeFuture().sync();
} finally
{
group.shutdownGracefully();
}
return nettyClientHandler.getResult();
}
});
String cityName = cityService.getCityName();
System.out.println("=====远程调用结果====="+cityName);
}
}
3.2服务Client端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter{
public Object getResult()
{
return result;
}
private Object result;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
this.result = msg;
super.channelRead(ctx, msg);
}
}