手写RPC框架第二章《netty通信》

案例介绍

在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。

这里我们选择netty作为我们的socket框架,采用future方式进行通信。

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

环境准备

1、jdk 1.8.0

2、IntelliJ IDEA Community Edition 2018.3.1 x64

代码示例

1itstack-demo-rpc-02
 2└── src
 3    └── main
 4    │    └── java
 5    │        └── org.itstack.demo.rpc.network
 6    │             ├── client
 7    │             │   ├── ClientSocket.java
 8    │             │   └── MyClientHandler.java  
 9    │             ├── codec
10    │             │   ├── RpcDecoder.java
11    │             │   └── RpcEncoder.java  
12    │             ├── future
13    │             │   ├── SyncWrite.java     
14    │             │   ├── SyncWriteFuture.java    
15    │             │   ├── SyncWriteMap.java    
16    │             │   └── WriteFuture.java    
17    │             ├── msg
18    │             │   ├── Request.java
19    │             │   └── Response.java 
20    │             ├── server
21    │             │   ├── MyServerHandler.java
22    │             │   └── ServerSocket.java     
23    │             └── util
24    │                 └── SerializationUtil.java     
25    └── test
26         └── java
27             └── org.itstack.demo.test
28                 ├── client
29                 │   └── StartClient.java
30                 └── server
31                     └── StartServer.java

ClientSocket.java

1package org.itstack.demo.rpc.network.client;
 2
 3import io.netty.bootstrap.Bootstrap;
 4import io.netty.channel.ChannelFuture;
 5import io.netty.channel.ChannelInitializer;
 6import io.netty.channel.ChannelOption;
 7import io.netty.channel.EventLoopGroup;
 8import io.netty.channel.nio.NioEventLoopGroup;
 9import io.netty.channel.socket.SocketChannel;
10import io.netty.channel.socket.nio.NioSocketChannel;
11import org.itstack.demo.rpc.network.codec.RpcDecoder;
12import org.itstack.demo.rpc.network.codec.RpcEncoder;
13import org.itstack.demo.rpc.network.msg.Request;
14import org.itstack.demo.rpc.network.msg.Response;
15
16/**
17 * http://www.itstack.org
18 * create by fuzhengwei on 2019/5/6
19 */
20public class ClientSocket implements Runnable {
21
22    private ChannelFuture future;
23
24    @Override
25    public void run() {
26        EventLoopGroup workerGroup = new NioEventLoopGroup();
27        try {
28            Bootstrap b = new Bootstrap();
29            b.group(workerGroup);
30            b.channel(NioSocketChannel.class);
31            b.option(ChannelOption.AUTO_READ, true);
32            b.handler(new ChannelInitializer<SocketChannel>() {
33                @Override
34                public void initChannel(SocketChannel ch) throws Exception {
35                    ch.pipeline().addLast(
36                            new RpcDecoder(Response.class),
37                            new RpcEncoder(Request.class),
38                            new MyClientHandler());
39                }
40            });
41            ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
42            this.future = f;
43            f.channel().closeFuture().sync();
44        } catch (InterruptedException e) {
45            e.printStackTrace();
46        } finally {
47            workerGroup.shutdownGracefully();
48        }
49    }
50
51    public ChannelFuture getFuture() {
52        return future;
53    }
54
55    public void setFuture(ChannelFuture future) {
56        this.future = future;
57    }
58}

MyClientHandler.java

1package org.itstack.demo.rpc.network.client;
 2
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5import org.itstack.demo.rpc.network.future.SyncWriteFuture;
 6import org.itstack.demo.rpc.network.future.SyncWriteMap;
 7import org.itstack.demo.rpc.network.msg.Response;
 8
 9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyClientHandler extends ChannelInboundHandlerAdapter {
14
15    @Override
16    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
17        Response msg = (Response) obj;
18        String requestId = msg.getRequestId();
19        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
20        if (future != null) {
21            future.setResponse(msg);
22        }
23    }
24
25    @Override
26    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
27        cause.printStackTrace();
28        ctx.close();
29    }
30
31}

RpcDecoder.java

1package org.itstack.demo.rpc.network.codec;
 2
 3import io.netty.buffer.ByteBuf;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.handler.codec.ByteToMessageDecoder;
 6import org.itstack.demo.rpc.network.util.SerializationUtil;
 7
 8import java.util.List;
 9
10/**
11 * http://www.itstack.org
12 * create by fuzhengwei on 2019/5/6
13 */
14public class RpcDecoder extends ByteToMessageDecoder {
15
16    private Class<?> genericClass;
17
18    public RpcDecoder(Class<?> genericClass) {
19        this.genericClass = genericClass;
20    }
21
22    @Override
23    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
24        if (in.readableBytes() < 4) {
25            return;
26        }
27        in.markReaderIndex();
28        int dataLength = in.readInt();
29        if (in.readableBytes() < dataLength) {
30            in.resetReaderIndex();
31            return;
32        }
33        byte[] data = new byte[dataLength];
34        in.readBytes(data);
35        out.add(SerializationUtil.deserialize(data, genericClass));
36    }
37
38}

RpcEncoder.java

1package org.itstack.demo.rpc.network.codec;
 2
 3import io.netty.buffer.ByteBuf;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.handler.codec.MessageToByteEncoder;
 6import org.itstack.demo.rpc.network.util.SerializationUtil;
 7
 8/**
 9 * http://www.itstack.org
10 * create by fuzhengwei on 2019/5/6
11 */
12public class RpcEncoder extends MessageToByteEncoder {
13
14    private Class<?> genericClass;
15
16    public RpcEncoder(Class<?> genericClass) {
17        this.genericClass = genericClass;
18    }
19
20    @Override
21    protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out)  {
22        if (genericClass.isInstance(in)) {
23            byte[] data = SerializationUtil.serialize(in);
24            out.writeInt(data.length);
25            out.writeBytes(data);
26        }
27    }
28
29}

SyncWrite.java

1package org.itstack.demo.rpc.network.future;
 2
 3import io.netty.channel.Channel;
 4import io.netty.channel.ChannelFuture;
 5import io.netty.channel.ChannelFutureListener;
 6import org.itstack.demo.rpc.network.msg.Request;
 7import org.itstack.demo.rpc.network.msg.Response;
 8
 9import java.util.UUID;
10import java.util.concurrent.TimeUnit;
11import java.util.concurrent.TimeoutException;
12
13public class SyncWrite {
14
15    public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {
16
17        if (channel == null) {
18            throw new NullPointerException("channel");
19        }
20        if (request == null) {
21            throw new NullPointerException("request");
22        }
23        if (timeout <= 0) {
24            throw new IllegalArgumentException("timeout <= 0");
25        }
26
27        String requestId = UUID.randomUUID().toString();
28        request.setRequestId(requestId);
29
30        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
31        SyncWriteMap.syncKey.put(request.getRequestId(), future);
32
33        Response response = doWriteAndSync(channel, request, timeout, future);
34
35        SyncWriteMap.syncKey.remove(request.getRequestId());
36        return response;
37    }
38
39    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {
40
41        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
42            public void operationComplete(ChannelFuture future) throws Exception {
43                writeFuture.setWriteResult(future.isSuccess());
44                writeFuture.setCause(future.cause());
45                //失败移除
46                if (!writeFuture.isWriteSuccess()) {
47                    SyncWriteMap.syncKey.remove(writeFuture.requestId());
48                }
49            }
50        });
51
52        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
53        if (response == null) {
54            if (writeFuture.isTimeout()) {
55                throw new TimeoutException();
56            } else {
57                // write exception
58                throw new Exception(writeFuture.cause());
59            }
60        }
61        return response;
62    }
63
64}

SyncWriteFuture.java

1package org.itstack.demo.rpc.network.future;
 2
 3
 4import org.itstack.demo.rpc.network.msg.Response;
 5
 6import java.util.concurrent.CountDownLatch;
 7import java.util.concurrent.ExecutionException;
 8import java.util.concurrent.TimeUnit;
 9import java.util.concurrent.TimeoutException;
10
11public class SyncWriteFuture implements WriteFuture<Response> {
12
13    private CountDownLatch latch = new CountDownLatch(1);
14    private final long begin = System.currentTimeMillis();
15    private long timeout;
16    private Response response;
17    private final String requestId;
18    private boolean writeResult;
19    private Throwable cause;
20    private boolean isTimeout = false;
21
22    public SyncWriteFuture(String requestId) {
23        this.requestId = requestId;
24    }
25
26    public SyncWriteFuture(String requestId, long timeout) {
27        this.requestId = requestId;
28        this.timeout = timeout;
29        writeResult = true;
30        isTimeout = false;
31    }
32
33
34    public Throwable cause() {
35        return cause;
36    }
37
38    public void setCause(Throwable cause) {
39        this.cause = cause;
40    }
41
42    public boolean isWriteSuccess() {
43        return writeResult;
44    }
45
46    public void setWriteResult(boolean result) {
47        this.writeResult = result;
48    }
49
50    public String requestId() {
51        return requestId;
52    }
53
54    public Response response() {
55        return response;
56    }
57
58    public void setResponse(Response response) {
59        this.response = response;
60        latch.countDown();
61    }
62
63    public boolean cancel(boolean mayInterruptIfRunning) {
64        return true;
65    }
66
67    public boolean isCancelled() {
68        return false;
69    }
70
71    public boolean isDone() {
72        return false;
73    }
74
75    public Response get() throws InterruptedException, ExecutionException {
76        latch.wait();
77        return response;
78    }
79
80    public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
81        if (latch.await(timeout, unit)) {
82            return response;
83        }
84        return null;
85    }
86
87    public boolean isTimeout() {
88        if (isTimeout) {
89            return isTimeout;
90        }
91        return System.currentTimeMillis() - begin > timeout;
92    }
93}

SyncWriteMap.java

1package org.itstack.demo.rpc.network.future;
 2
 3import java.util.Map;
 4import java.util.concurrent.ConcurrentHashMap;
 5
 6public class SyncWriteMap {
 7
 8    public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();
 9
10}

WriteFuture.java

1package org.itstack.demo.rpc.network.future;
 2
 3import org.itstack.demo.rpc.network.msg.Response;
 4
 5import java.util.concurrent.Future;
 6
 7public interface WriteFuture<T> extends Future<T> {
 8
 9    Throwable cause();
10
11    void setCause(Throwable cause);
12
13    boolean isWriteSuccess();
14
15    void setWriteResult(boolean result);
16
17    String requestId();
18
19    T response();
20
21    void setResponse(Response response);
22
23    boolean isTimeout();
24
25
26}

Request.java

1package org.itstack.demo.rpc.network.msg;
 2
 3/**
 4 * http://www.itstack.org
 5 * create by fuzhengwei on 2019/5/6
 6 */
 7public class Request {
 8
 9    private String requestId;
10    private Object result;
11
12    public String getRequestId() {
13        return requestId;
14    }
15
16    public void setRequestId(String requestId) {
17        this.requestId = requestId;
18    }
19
20    public Object getResult() {
21        return result;
22    }
23
24    public void setResult(Object result) {
25        this.result = result;
26    }
27
28}

Response.java

1package org.itstack.demo.rpc.network.msg;
 2
 3/**
 4 * http://www.itstack.org
 5 * create by fuzhengwei on 2019/5/6
 6 */
 7public class Response {
 8
 9    private String requestId;
10    private String param;
11
12    public String getRequestId() {
13        return requestId;
14    }
15
16    public void setRequestId(String requestId) {
17        this.requestId = requestId;
18    }
19
20    public String getParam() {
21        return param;
22    }
23
24    public void setParam(String param) {
25        this.param = param;
26    }
27
28}

MyServerHandler.java

1package org.itstack.demo.rpc.network.server;
 2
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5import io.netty.util.ReferenceCountUtil;
 6import org.itstack.demo.rpc.network.msg.Request;
 7import org.itstack.demo.rpc.network.msg.Response;
 8
 9/**
10 * http://www.itstack.org
11 * create by fuzhengwei on 2019/5/6
12 */
13public class MyServerHandler extends ChannelInboundHandlerAdapter{
14
15    @Override
16    public void channelRead(ChannelHandlerContext ctx, Object obj){
17        Request msg = (Request) obj;
18        //反馈
19        Response request = new Response();
20        request.setRequestId(msg.getRequestId());
21        request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。");
22        ctx.writeAndFlush(request);
23        //释放
24        ReferenceCountUtil.release(msg);
25    }
26
27    @Override
28    public void channelReadComplete(ChannelHandlerContext ctx) {
29        ctx.flush();
30    }
31
32}

ServerSocket.java

1package org.itstack.demo.rpc.network.server;
 2
 3import io.netty.bootstrap.ServerBootstrap;
 4import io.netty.channel.ChannelFuture;
 5import io.netty.channel.ChannelInitializer;
 6import io.netty.channel.ChannelOption;
 7import io.netty.channel.EventLoopGroup;
 8import io.netty.channel.nio.NioEventLoopGroup;
 9import io.netty.channel.socket.SocketChannel;
10import io.netty.channel.socket.nio.NioServerSocketChannel;
11import org.itstack.demo.rpc.network.codec.RpcDecoder;
12import org.itstack.demo.rpc.network.codec.RpcEncoder;
13import org.itstack.demo.rpc.network.msg.Request;
14import org.itstack.demo.rpc.network.msg.Response;
15
16/**
17 * http://www.itstack.org
18 * create by fuzhengwei on 2019/5/6
19 */
20public class ServerSocket implements Runnable {
21
22    private ChannelFuture f;
23
24    @Override
25    public void run() {
26        EventLoopGroup bossGroup = new NioEventLoopGroup();
27        EventLoopGroup workerGroup = new NioEventLoopGroup();
28        try {
29            ServerBootstrap b = new ServerBootstrap();
30            b.group(bossGroup, workerGroup)
31                    .channel(NioServerSocketChannel.class)
32                    .option(ChannelOption.SO_BACKLOG, 128)
33                    .childHandler(new ChannelInitializer<SocketChannel>() {
34                        @Override
35                        public void initChannel(SocketChannel ch){
36                            ch.pipeline().addLast(
37                                    new RpcDecoder(Request.class),
38                                    new RpcEncoder(Response.class),
39                                    new MyServerHandler());
40                        }
41                    });
42
43            ChannelFuture f = null;
44            f = b.bind(7397).sync();
45            f.channel().closeFuture().sync();
46
47
48        } catch (InterruptedException e) {
49            e.printStackTrace();
50        } finally {
51            workerGroup.shutdownGracefully();
52            bossGroup.shutdownGracefully();
53        }
54
55    }
56
57}

SerializationUtil.java

1package org.itstack.demo.rpc.network.util;
 2
 3import com.dyuproject.protostuff.LinkedBuffer;
 4import com.dyuproject.protostuff.ProtostuffIOUtil;
 5import com.dyuproject.protostuff.Schema;
 6import com.dyuproject.protostuff.runtime.RuntimeSchema;
 7import org.objenesis.Objenesis;
 8import org.objenesis.ObjenesisStd;
 9
10import java.util.Map;
11import java.util.concurrent.ConcurrentHashMap;
12
13/**
14 * Created by fuzhengwei1 on 2016/10/20.
15 */
16public class SerializationUtil {
17
18    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap();
19
20    private static Objenesis objenesis = new ObjenesisStd();
21
22    private SerializationUtil() {
23
24    }
25
26    /**
27     * 序列化(对象 -> 字节数组)
28     *
29     * @param obj 对象
30     * @return 字节数组
31     */
32    public static <T> byte[] serialize(T obj) {
33        Class<T> cls = (Class<T>) obj.getClass();
34        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
35        try {
36            Schema<T> schema = getSchema(cls);
37            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
38        } catch (Exception e) {
39            throw new IllegalStateException(e.getMessage(), e);
40        } finally {
41            buffer.clear();
42        }
43    }
44
45    /**
46     * 反序列化(字节数组 -> 对象)
47     *
48     * @param data
49     * @param cls
50     * @param <T>
51     */
52    public static <T> T deserialize(byte[] data, Class<T> cls) {
53        try {
54            T message = objenesis.newInstance(cls);
55            Schema<T> schema = getSchema(cls);
56            ProtostuffIOUtil.mergeFrom(data, message, schema);
57            return message;
58        } catch (Exception e) {
59            throw new IllegalStateException(e.getMessage(), e);
60        }
61    }
62
63    private static <T> Schema<T> getSchema(Class<T> cls) {
64        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
65        if (schema == null) {
66            schema = RuntimeSchema.createFrom(cls);
67            cachedSchema.put(cls, schema);
68        }
69        return schema;
70    }
71
72}

StartClient.java

1package org.itstack.demo.test.client;
 2
 3import com.alibaba.fastjson.JSON;
 4import io.netty.channel.ChannelFuture;
 5import org.itstack.demo.rpc.network.client.ClientSocket;
 6import org.itstack.demo.rpc.network.future.SyncWrite;
 7import org.itstack.demo.rpc.network.msg.Request;
 8import org.itstack.demo.rpc.network.msg.Response;
 9
10/**
11 * http://www.itstack.org
12 * create by fuzhengwei on 2019/5/6
13 */
14public class StartClient {
15
16    private static ChannelFuture future;
17
18    public static void main(String[] args) {
19        ClientSocket client = new ClientSocket();
20        new Thread(client).start();
21
22        while (true) {
23            try {
24                //获取future,线程有等待处理时间
25                if (null == future) {
26                    future = client.getFuture();
27                    Thread.sleep(500);
28                    continue;
29                }
30                //构建发送参数
31                Request request = new Request();
32                request.setResult("查询用户信息");
33                SyncWrite s = new SyncWrite();
34                Response response = s.writeAndSync(future.channel(), request, 1000);
35                System.out.println("调用结果:" + JSON.toJSON(response));
36                Thread.sleep(1000);
37            } catch (Exception e) {
38                e.printStackTrace();
39            }
40        }
41    }
42
43}

StartServer.java

1package org.itstack.demo.test.server;
 2
 3import org.itstack.demo.rpc.network.server.ServerSocket;
 4
 5/**
 6 * http://www.itstack.org
 7 * create by fuzhengwei on 2019/5/6
 8 */
 9public class StartServer {
10
11    public static void main(String[] args) {
12        System.out.println("启动服务端开始");
13        new Thread(new ServerSocket()).start();
14        System.out.println("启动服务端完成");
15    }
16
17}

测试结果

启动StartServer

1启动服务端开始
2启动服务端完成
3log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
4log4j:WARN Please initialize the log4j system properly.
5log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
6

启动StartClient

1log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
 2log4j:WARN Please initialize the log4j system properly.
 3log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
 4调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}
 5调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}
 6调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}
 7调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}
 8调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}
 9调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}
10
11...
上一篇:基于JavaAgent的全链路监控六《开发应用级监控》


下一篇:基于jvmti定位java异常信息