之前了解到dubbo 的底层是基于Netty,在学习了Netty 之后简单的模拟一个RPC。
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的字符串
1. HelloService 公共接口
package netty.rpc.publicinterface; /** * 公共接口 */ public interface HelloService { String hello(String name); }
2. HelloServiceImpl 服务实现类
package netty.rpc.provider; import netty.rpc.publicinterface.HelloService; /** * 服务提供者实现类 */ public class HelloServiceImpl implements HelloService { /** * 记录调用次数 */ private int count; @Override public String hello(String name) { count++; System.out.println("netty.rpc.server.HelloServiceImpl.hello, name: " + name + "; count: " + count); return " this is " + name + "; count: " + count; } }
3.Netty 相关类
NettyServerHandler:
package netty.rpc.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import netty.rpc.provider.HelloServiceImpl; import netty.rpc.publicinterface.HelloService; import org.apache.commons.lang.StringUtils; public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final String PROTO = "helloservice://"; private HelloService service = new HelloServiceImpl(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收客户端的消息,并调用服务 System.out.println("msg: " + msg); // 以约定的协议开头,就去掉协议之后调用serviceImpl 进行服务调用 if (msg.toString().startsWith(PROTO)) { String param = StringUtils.substring(msg.toString(), PROTO.length()); String result = service.hello(param); // 回显给客户端 ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
NettyServer
package netty.rpc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyServer { public static void start(String hostName, int port) { startService(hostName, port); } public static void startService(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // 自己的处理器 pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture sync = serverBootstrap.bind(hostname, port).sync(); sync.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("服务器启动成功, hostname: " + hostname + ", port: " + port); } } }); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
NettyClientHandler
package netty.rpc.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext channelHandlerContext; private String result; private String param; // 该方法第一次被调用(1) @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channelHandlerContext = ctx; } // 4 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); notify(); } // 3 - wait - 5 @Override public synchronized Object call() throws Exception { channelHandlerContext.writeAndFlush(param); wait(); return result; } // 2 public void setParam(String param) { this.param = param; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
NettyClient
package netty.rpc.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NettyClient { // 执行器 private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private NettyClientHandler nettyClientHandler; // 使用代理模式获取对象(JDK 动态代理)= 执行过程实际是走Netty 调用服务 public Object getBean(final Class serviceClazz, String proto) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClazz}, (proxy, method, param) -> { if (nettyClientHandler == null) { init(); } // 设置参数为协议 + 参数 (服务器端会根据协议来解析到对应的参数然后来进行服务调用) nettyClientHandler.setParam(proto + param[0]); return executorService.submit(nettyClientHandler).get(); }); } private void init() { nettyClientHandler = new NettyClientHandler(); EventLoopGroup workerGrop = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGrop).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(nettyClientHandler); } }); try { bootstrap.connect("127.0.0.1", 6666).sync(); } catch (Exception e) { e.printStackTrace(); } } }
4. CustomerBootstrap 服务消费者
package netty.rpc.customer; import netty.rpc.netty.NettyClient; import netty.rpc.publicinterface.HelloService; public class CustomerBootstrap { private static final String PROTO = "helloservice://"; public static void main(String[] args) { // 创建一个消费者 NettyClient nettyClient = new NettyClient(); // 创建代理对象 HelloService bean = (HelloService) nettyClient.getBean(HelloService.class, PROTO); for (int i = 0; i < 20; i++) { String hello = bean.hello("123456"); System.out.println(hello); } } }
5. ServerBootstrap
package netty.rpc.provider; import netty.rpc.netty.NettyServer; /** * 启动一个服务的提供者,NettyServer */ public class ServerBootstrap { public static void main(String[] args) { NettyServer.start("127.0.0.1", 6666); } }
可以看到上面的核心逻辑是,启动一个NettyServer、一个NettyClient。 NettyServer 用于启动Server服务器接收客户端发送的数据,NettyClient 实际也是一个任务,通过JDK的动态代理,在调用方法的时候,实际是用NettyClient 去向服务器端发送数据。 服务器端会根据接收到的数据进行判断,是否满足约定的协议,如果满足调用ServiceImpl 获取结果,然后输出给客户端。