案例介绍
在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。
这里我们选择netty作为我们的socket框架,采用future方式进行通信。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
1itstack-demo-rpc-02 2└── src 3 └── main 4 │ └── java 5 │ └── org.itstack.demo.rpc.network 6 │ ├── client 7 │ │ ├── ClientSocket.java 8 │ │ └── MyClientHandler.java 9 │ ├── codec 10 │ │ ├── RpcDecoder.java 11 │ │ └── RpcEncoder.java 12 │ ├── future 13 │ │ ├── SyncWrite.java 14 │ │ ├── SyncWriteFuture.java 15 │ │ ├── SyncWriteMap.java 16 │ │ └── WriteFuture.java 17 │ ├── msg 18 │ │ ├── Request.java 19 │ │ └── Response.java 20 │ ├── server 21 │ │ ├── MyServerHandler.java 22 │ │ └── ServerSocket.java 23 │ └── util 24 │ └── SerializationUtil.java 25 └── test 26 └── java 27 └── org.itstack.demo.test 28 ├── client 29 │ └── StartClient.java 30 └── server 31 └── StartServer.java
ClientSocket.java
1package org.itstack.demo.rpc.network.client; 2 3import io.netty.bootstrap.Bootstrap; 4import io.netty.channel.ChannelFuture; 5import io.netty.channel.ChannelInitializer; 6import io.netty.channel.ChannelOption; 7import io.netty.channel.EventLoopGroup; 8import io.netty.channel.nio.NioEventLoopGroup; 9import io.netty.channel.socket.SocketChannel; 10import io.netty.channel.socket.nio.NioSocketChannel; 11import org.itstack.demo.rpc.network.codec.RpcDecoder; 12import org.itstack.demo.rpc.network.codec.RpcEncoder; 13import org.itstack.demo.rpc.network.msg.Request; 14import org.itstack.demo.rpc.network.msg.Response; 15 16/** 17 * http://www.itstack.org 18 * create by fuzhengwei on 2019/5/6 19 */ 20public class ClientSocket implements Runnable { 21 22 private ChannelFuture future; 23 24 @Override 25 public void run() { 26 EventLoopGroup workerGroup = new NioEventLoopGroup(); 27 try { 28 Bootstrap b = new Bootstrap(); 29 b.group(workerGroup); 30 b.channel(NioSocketChannel.class); 31 b.option(ChannelOption.AUTO_READ, true); 32 b.handler(new ChannelInitializer<SocketChannel>() { 33 @Override 34 public void initChannel(SocketChannel ch) throws Exception { 35 ch.pipeline().addLast( 36 new RpcDecoder(Response.class), 37 new RpcEncoder(Request.class), 38 new MyClientHandler()); 39 } 40 }); 41 ChannelFuture f = b.connect("127.0.0.1", 7397).sync(); 42 this.future = f; 43 f.channel().closeFuture().sync(); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } finally { 47 workerGroup.shutdownGracefully(); 48 } 49 } 50 51 public ChannelFuture getFuture() { 52 return future; 53 } 54 55 public void setFuture(ChannelFuture future) { 56 this.future = future; 57 } 58}
MyClientHandler.java
1package org.itstack.demo.rpc.network.client; 2 3import io.netty.channel.ChannelHandlerContext; 4import io.netty.channel.ChannelInboundHandlerAdapter; 5import org.itstack.demo.rpc.network.future.SyncWriteFuture; 6import org.itstack.demo.rpc.network.future.SyncWriteMap; 7import org.itstack.demo.rpc.network.msg.Response; 8 9/** 10 * http://www.itstack.org 11 * create by fuzhengwei on 2019/5/6 12 */ 13public class MyClientHandler extends ChannelInboundHandlerAdapter { 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { 17 Response msg = (Response) obj; 18 String requestId = msg.getRequestId(); 19 SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); 20 if (future != null) { 21 future.setResponse(msg); 22 } 23 } 24 25 @Override 26 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 27 cause.printStackTrace(); 28 ctx.close(); 29 } 30 31}
RpcDecoder.java
1package org.itstack.demo.rpc.network.codec; 2 3import io.netty.buffer.ByteBuf; 4import io.netty.channel.ChannelHandlerContext; 5import io.netty.handler.codec.ByteToMessageDecoder; 6import org.itstack.demo.rpc.network.util.SerializationUtil; 7 8import java.util.List; 9 10/** 11 * http://www.itstack.org 12 * create by fuzhengwei on 2019/5/6 13 */ 14public class RpcDecoder extends ByteToMessageDecoder { 15 16 private Class<?> genericClass; 17 18 public RpcDecoder(Class<?> genericClass) { 19 this.genericClass = genericClass; 20 } 21 22 @Override 23 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 24 if (in.readableBytes() < 4) { 25 return; 26 } 27 in.markReaderIndex(); 28 int dataLength = in.readInt(); 29 if (in.readableBytes() < dataLength) { 30 in.resetReaderIndex(); 31 return; 32 } 33 byte[] data = new byte[dataLength]; 34 in.readBytes(data); 35 out.add(SerializationUtil.deserialize(data, genericClass)); 36 } 37 38}
RpcEncoder.java
1package org.itstack.demo.rpc.network.codec; 2 3import io.netty.buffer.ByteBuf; 4import io.netty.channel.ChannelHandlerContext; 5import io.netty.handler.codec.MessageToByteEncoder; 6import org.itstack.demo.rpc.network.util.SerializationUtil; 7 8/** 9 * http://www.itstack.org 10 * create by fuzhengwei on 2019/5/6 11 */ 12public class RpcEncoder extends MessageToByteEncoder { 13 14 private Class<?> genericClass; 15 16 public RpcEncoder(Class<?> genericClass) { 17 this.genericClass = genericClass; 18 } 19 20 @Override 21 protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) { 22 if (genericClass.isInstance(in)) { 23 byte[] data = SerializationUtil.serialize(in); 24 out.writeInt(data.length); 25 out.writeBytes(data); 26 } 27 } 28 29}
SyncWrite.java
1package org.itstack.demo.rpc.network.future; 2 3import io.netty.channel.Channel; 4import io.netty.channel.ChannelFuture; 5import io.netty.channel.ChannelFutureListener; 6import org.itstack.demo.rpc.network.msg.Request; 7import org.itstack.demo.rpc.network.msg.Response; 8 9import java.util.UUID; 10import java.util.concurrent.TimeUnit; 11import java.util.concurrent.TimeoutException; 12 13public class SyncWrite { 14 15 public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception { 16 17 if (channel == null) { 18 throw new NullPointerException("channel"); 19 } 20 if (request == null) { 21 throw new NullPointerException("request"); 22 } 23 if (timeout <= 0) { 24 throw new IllegalArgumentException("timeout <= 0"); 25 } 26 27 String requestId = UUID.randomUUID().toString(); 28 request.setRequestId(requestId); 29 30 WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId()); 31 SyncWriteMap.syncKey.put(request.getRequestId(), future); 32 33 Response response = doWriteAndSync(channel, request, timeout, future); 34 35 SyncWriteMap.syncKey.remove(request.getRequestId()); 36 return response; 37 } 38 39 private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception { 40 41 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { 42 public void operationComplete(ChannelFuture future) throws Exception { 43 writeFuture.setWriteResult(future.isSuccess()); 44 writeFuture.setCause(future.cause()); 45 //失败移除 46 if (!writeFuture.isWriteSuccess()) { 47 SyncWriteMap.syncKey.remove(writeFuture.requestId()); 48 } 49 } 50 }); 51 52 Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS); 53 if (response == null) { 54 if (writeFuture.isTimeout()) { 55 throw new TimeoutException(); 56 } else { 57 // write exception 58 throw new Exception(writeFuture.cause()); 59 } 60 } 61 return response; 62 } 63 64}
SyncWriteFuture.java
1package org.itstack.demo.rpc.network.future; 2 3 4import org.itstack.demo.rpc.network.msg.Response; 5 6import java.util.concurrent.CountDownLatch; 7import java.util.concurrent.ExecutionException; 8import java.util.concurrent.TimeUnit; 9import java.util.concurrent.TimeoutException; 10 11public class SyncWriteFuture implements WriteFuture<Response> { 12 13 private CountDownLatch latch = new CountDownLatch(1); 14 private final long begin = System.currentTimeMillis(); 15 private long timeout; 16 private Response response; 17 private final String requestId; 18 private boolean writeResult; 19 private Throwable cause; 20 private boolean isTimeout = false; 21 22 public SyncWriteFuture(String requestId) { 23 this.requestId = requestId; 24 } 25 26 public SyncWriteFuture(String requestId, long timeout) { 27 this.requestId = requestId; 28 this.timeout = timeout; 29 writeResult = true; 30 isTimeout = false; 31 } 32 33 34 public Throwable cause() { 35 return cause; 36 } 37 38 public void setCause(Throwable cause) { 39 this.cause = cause; 40 } 41 42 public boolean isWriteSuccess() { 43 return writeResult; 44 } 45 46 public void setWriteResult(boolean result) { 47 this.writeResult = result; 48 } 49 50 public String requestId() { 51 return requestId; 52 } 53 54 public Response response() { 55 return response; 56 } 57 58 public void setResponse(Response response) { 59 this.response = response; 60 latch.countDown(); 61 } 62 63 public boolean cancel(boolean mayInterruptIfRunning) { 64 return true; 65 } 66 67 public boolean isCancelled() { 68 return false; 69 } 70 71 public boolean isDone() { 72 return false; 73 } 74 75 public Response get() throws InterruptedException, ExecutionException { 76 latch.wait(); 77 return response; 78 } 79 80 public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 81 if (latch.await(timeout, unit)) { 82 return response; 83 } 84 return null; 85 } 86 87 public boolean isTimeout() { 88 if (isTimeout) { 89 return isTimeout; 90 } 91 return System.currentTimeMillis() - begin > timeout; 92 } 93}
SyncWriteMap.java
1package org.itstack.demo.rpc.network.future; 2 3import java.util.Map; 4import java.util.concurrent.ConcurrentHashMap; 5 6public class SyncWriteMap { 7 8 public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>(); 9 10}
WriteFuture.java
1package org.itstack.demo.rpc.network.future; 2 3import org.itstack.demo.rpc.network.msg.Response; 4 5import java.util.concurrent.Future; 6 7public interface WriteFuture<T> extends Future<T> { 8 9 Throwable cause(); 10 11 void setCause(Throwable cause); 12 13 boolean isWriteSuccess(); 14 15 void setWriteResult(boolean result); 16 17 String requestId(); 18 19 T response(); 20 21 void setResponse(Response response); 22 23 boolean isTimeout(); 24 25 26}
Request.java
1package org.itstack.demo.rpc.network.msg; 2 3/** 4 * http://www.itstack.org 5 * create by fuzhengwei on 2019/5/6 6 */ 7public class Request { 8 9 private String requestId; 10 private Object result; 11 12 public String getRequestId() { 13 return requestId; 14 } 15 16 public void setRequestId(String requestId) { 17 this.requestId = requestId; 18 } 19 20 public Object getResult() { 21 return result; 22 } 23 24 public void setResult(Object result) { 25 this.result = result; 26 } 27 28}
Response.java
1package org.itstack.demo.rpc.network.msg; 2 3/** 4 * http://www.itstack.org 5 * create by fuzhengwei on 2019/5/6 6 */ 7public class Response { 8 9 private String requestId; 10 private String param; 11 12 public String getRequestId() { 13 return requestId; 14 } 15 16 public void setRequestId(String requestId) { 17 this.requestId = requestId; 18 } 19 20 public String getParam() { 21 return param; 22 } 23 24 public void setParam(String param) { 25 this.param = param; 26 } 27 28}
MyServerHandler.java
1package org.itstack.demo.rpc.network.server; 2 3import io.netty.channel.ChannelHandlerContext; 4import io.netty.channel.ChannelInboundHandlerAdapter; 5import io.netty.util.ReferenceCountUtil; 6import org.itstack.demo.rpc.network.msg.Request; 7import org.itstack.demo.rpc.network.msg.Response; 8 9/** 10 * http://www.itstack.org 11 * create by fuzhengwei on 2019/5/6 12 */ 13public class MyServerHandler extends ChannelInboundHandlerAdapter{ 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object obj){ 17 Request msg = (Request) obj; 18 //反馈 19 Response request = new Response(); 20 request.setRequestId(msg.getRequestId()); 21 request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。"); 22 ctx.writeAndFlush(request); 23 //释放 24 ReferenceCountUtil.release(msg); 25 } 26 27 @Override 28 public void channelReadComplete(ChannelHandlerContext ctx) { 29 ctx.flush(); 30 } 31 32}
ServerSocket.java
1package org.itstack.demo.rpc.network.server; 2 3import io.netty.bootstrap.ServerBootstrap; 4import io.netty.channel.ChannelFuture; 5import io.netty.channel.ChannelInitializer; 6import io.netty.channel.ChannelOption; 7import io.netty.channel.EventLoopGroup; 8import io.netty.channel.nio.NioEventLoopGroup; 9import io.netty.channel.socket.SocketChannel; 10import io.netty.channel.socket.nio.NioServerSocketChannel; 11import org.itstack.demo.rpc.network.codec.RpcDecoder; 12import org.itstack.demo.rpc.network.codec.RpcEncoder; 13import org.itstack.demo.rpc.network.msg.Request; 14import org.itstack.demo.rpc.network.msg.Response; 15 16/** 17 * http://www.itstack.org 18 * create by fuzhengwei on 2019/5/6 19 */ 20public class ServerSocket implements Runnable { 21 22 private ChannelFuture f; 23 24 @Override 25 public void run() { 26 EventLoopGroup bossGroup = new NioEventLoopGroup(); 27 EventLoopGroup workerGroup = new NioEventLoopGroup(); 28 try { 29 ServerBootstrap b = new ServerBootstrap(); 30 b.group(bossGroup, workerGroup) 31 .channel(NioServerSocketChannel.class) 32 .option(ChannelOption.SO_BACKLOG, 128) 33 .childHandler(new ChannelInitializer<SocketChannel>() { 34 @Override 35 public void initChannel(SocketChannel ch){ 36 ch.pipeline().addLast( 37 new RpcDecoder(Request.class), 38 new RpcEncoder(Response.class), 39 new MyServerHandler()); 40 } 41 }); 42 43 ChannelFuture f = null; 44 f = b.bind(7397).sync(); 45 f.channel().closeFuture().sync(); 46 47 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } finally { 51 workerGroup.shutdownGracefully(); 52 bossGroup.shutdownGracefully(); 53 } 54 55 } 56 57}
SerializationUtil.java
1package org.itstack.demo.rpc.network.util; 2 3import com.dyuproject.protostuff.LinkedBuffer; 4import com.dyuproject.protostuff.ProtostuffIOUtil; 5import com.dyuproject.protostuff.Schema; 6import com.dyuproject.protostuff.runtime.RuntimeSchema; 7import org.objenesis.Objenesis; 8import org.objenesis.ObjenesisStd; 9 10import java.util.Map; 11import java.util.concurrent.ConcurrentHashMap; 12 13/** 14 * Created by fuzhengwei1 on 2016/10/20. 15 */ 16public class SerializationUtil { 17 18 private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap(); 19 20 private static Objenesis objenesis = new ObjenesisStd(); 21 22 private SerializationUtil() { 23 24 } 25 26 /** 27 * 序列化(对象 -> 字节数组) 28 * 29 * @param obj 对象 30 * @return 字节数组 31 */ 32 public static <T> byte[] serialize(T obj) { 33 Class<T> cls = (Class<T>) obj.getClass(); 34 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); 35 try { 36 Schema<T> schema = getSchema(cls); 37 return ProtostuffIOUtil.toByteArray(obj, schema, buffer); 38 } catch (Exception e) { 39 throw new IllegalStateException(e.getMessage(), e); 40 } finally { 41 buffer.clear(); 42 } 43 } 44 45 /** 46 * 反序列化(字节数组 -> 对象) 47 * 48 * @param data 49 * @param cls 50 * @param <T> 51 */ 52 public static <T> T deserialize(byte[] data, Class<T> cls) { 53 try { 54 T message = objenesis.newInstance(cls); 55 Schema<T> schema = getSchema(cls); 56 ProtostuffIOUtil.mergeFrom(data, message, schema); 57 return message; 58 } catch (Exception e) { 59 throw new IllegalStateException(e.getMessage(), e); 60 } 61 } 62 63 private static <T> Schema<T> getSchema(Class<T> cls) { 64 Schema<T> schema = (Schema<T>) cachedSchema.get(cls); 65 if (schema == null) { 66 schema = RuntimeSchema.createFrom(cls); 67 cachedSchema.put(cls, schema); 68 } 69 return schema; 70 } 71 72}
StartClient.java
1package org.itstack.demo.test.client; 2 3import com.alibaba.fastjson.JSON; 4import io.netty.channel.ChannelFuture; 5import org.itstack.demo.rpc.network.client.ClientSocket; 6import org.itstack.demo.rpc.network.future.SyncWrite; 7import org.itstack.demo.rpc.network.msg.Request; 8import org.itstack.demo.rpc.network.msg.Response; 9 10/** 11 * http://www.itstack.org 12 * create by fuzhengwei on 2019/5/6 13 */ 14public class StartClient { 15 16 private static ChannelFuture future; 17 18 public static void main(String[] args) { 19 ClientSocket client = new ClientSocket(); 20 new Thread(client).start(); 21 22 while (true) { 23 try { 24 //获取future,线程有等待处理时间 25 if (null == future) { 26 future = client.getFuture(); 27 Thread.sleep(500); 28 continue; 29 } 30 //构建发送参数 31 Request request = new Request(); 32 request.setResult("查询用户信息"); 33 SyncWrite s = new SyncWrite(); 34 Response response = s.writeAndSync(future.channel(), request, 1000); 35 System.out.println("调用结果:" + JSON.toJSON(response)); 36 Thread.sleep(1000); 37 } catch (Exception e) { 38 e.printStackTrace(); 39 } 40 } 41 } 42 43}
StartServer.java
1package org.itstack.demo.test.server; 2 3import org.itstack.demo.rpc.network.server.ServerSocket; 4 5/** 6 * http://www.itstack.org 7 * create by fuzhengwei on 2019/5/6 8 */ 9public class StartServer { 10 11 public static void main(String[] args) { 12 System.out.println("启动服务端开始"); 13 new Thread(new ServerSocket()).start(); 14 System.out.println("启动服务端完成"); 15 } 16 17}
测试结果
启动StartServer
1启动服务端开始 2启动服务端完成 3log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). 4log4j:WARN Please initialize the log4j system properly. 5log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 6
启动StartClient
1log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). 2log4j:WARN Please initialize the log4j system properly. 3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 4调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"} 5调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"} 6调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"} 7调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"} 8调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"} 9调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"} 10 11...