Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
ClientSocket.java
package org.itstack.demo.rpc.network.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.itstack.demo.rpc.network.codec.RpcDecoder; import org.itstack.demo.rpc.network.codec.RpcEncoder; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class ClientSocket implements Runnable { private ChannelFuture future; @Override public void run() { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.AUTO_READ, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new RpcDecoder(Response.class), new RpcEncoder(Request.class), new MyClientHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", 7397).sync(); this.future = f; f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } public ChannelFuture getFuture() { return future; } public void setFuture(ChannelFuture future) { this.future = future; } }
MyClientHandler.java
package org.itstack.demo.rpc.network.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.itstack.demo.rpc.network.future.SyncWriteFuture; import org.itstack.demo.rpc.network.future.SyncWriteMap; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Response msg = (Response) obj; String requestId = msg.getRequestId(); SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); if (future != null) { future.setResponse(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
RpcDecoder.java
package org.itstack.demo.rpc.network.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.itstack.demo.rpc.network.util.SerializationUtil; import java.util.List; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); out.add(SerializationUtil.deserialize(data, genericClass)); } }
RpcEncoder.java
package org.itstack.demo.rpc.network.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.itstack.demo.rpc.network.util.SerializationUtil; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } } }
SyncWrite.java
package org.itstack.demo.rpc.network.future; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SyncWrite { public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception { if (channel == null) { throw new NullPointerException("channel"); } if (request == null) { throw new NullPointerException("request"); } if (timeout <= 0) { throw new IllegalArgumentException("timeout <= 0"); } String requestId = UUID.randomUUID().toString(); request.setRequestId(requestId); WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId()); SyncWriteMap.syncKey.put(request.getRequestId(), future); Response response = doWriteAndSync(channel, request, timeout, future); SyncWriteMap.syncKey.remove(request.getRequestId()); return response; } private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { writeFuture.setWriteResult(future.isSuccess()); writeFuture.setCause(future.cause()); //失败移除 if (!writeFuture.isWriteSuccess()) { SyncWriteMap.syncKey.remove(writeFuture.requestId()); } } }); Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS); if (response == null) { if (writeFuture.isTimeout()) { throw new TimeoutException(); } else { // write exception throw new Exception(writeFuture.cause()); } } return response; } }
SyncWriteFuture.java
package org.itstack.demo.rpc.network.future; import org.itstack.demo.rpc.network.msg.Response; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SyncWriteFuture implements WriteFuture<Response> { private CountDownLatch latch = new CountDownLatch(1); private final long begin = System.currentTimeMillis(); private long timeout; private Response response; private final String requestId; private boolean writeResult; private Throwable cause; private boolean isTimeout = false; public SyncWriteFuture(String requestId) { this.requestId = requestId; } public SyncWriteFuture(String requestId, long timeout) { this.requestId = requestId; this.timeout = timeout; writeResult = true; isTimeout = false; } public Throwable cause() { return cause; } public void setCause(Throwable cause) { this.cause = cause; } public boolean isWriteSuccess() { return writeResult; } public void setWriteResult(boolean result) { this.writeResult = result; } public String requestId() { return requestId; } public Response response() { return response; } public void setResponse(Response response) { this.response = response; latch.countDown(); } public boolean cancel(boolean mayInterruptIfRunning) { return true; } public boolean isCancelled() { return false; } public boolean isDone() { return false; } public Response get() throws InterruptedException, ExecutionException { latch.wait(); return response; } public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (latch.await(timeout, unit)) { return response; } return null; } public boolean isTimeout() { if (isTimeout) { return isTimeout; } return System.currentTimeMillis() - begin > timeout; } }
SyncWriteMap.java
package org.itstack.demo.rpc.network.future; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class SyncWriteMap { public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>(); }
WriteFuture.java
package org.itstack.demo.rpc.network.future; import org.itstack.demo.rpc.network.msg.Response; import java.util.concurrent.Future; public interface WriteFuture<T> extends Future<T> { Throwable cause(); void setCause(Throwable cause); boolean isWriteSuccess(); void setWriteResult(boolean result); String requestId(); T response(); void setResponse(Response response); boolean isTimeout(); }
Request.java
package org.itstack.demo.rpc.network.msg; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class Request { private String requestId; private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } }
Response.java
package org.itstack.demo.rpc.network.msg; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class Response { private String requestId; private String param; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getParam() { return param; } public void setParam(String param) { this.param = param; } }
MyServerHandler.java
package org.itstack.demo.rpc.network.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class MyServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object obj){ Request msg = (Request) obj; //反馈 Response request = new Response(); request.setRequestId(msg.getRequestId()); request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。"); ctx.writeAndFlush(request); //释放 ReferenceCountUtil.release(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } }
ServerSocket.java
package org.itstack.demo.rpc.network.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.itstack.demo.rpc.network.codec.RpcDecoder; import org.itstack.demo.rpc.network.codec.RpcEncoder; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class ServerSocket implements Runnable { private ChannelFuture f; @Override public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch){ ch.pipeline().addLast( new RpcDecoder(Request.class), new RpcEncoder(Response.class), new MyServerHandler()); } }); ChannelFuture f = null; f = b.bind(7397).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
SerializationUtil.java
package org.itstack.demo.rpc.network.util; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Created by fuzhengwei1 on 2016/10/20. */ public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap(); private static Objenesis objenesis = new ObjenesisStd(); private SerializationUtil() { } /** * 序列化(对象 -> 字节数组) * * @param obj 对象 * @return 字节数组 */ public static <T> byte[] serialize(T obj) { Class<T> cls = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化(字节数组 -> 对象) * * @param data * @param cls * @param <T> */ public static <T> T deserialize(byte[] data, Class<T> cls) { try { T message = objenesis.newInstance(cls); Schema<T> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(data, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } private static <T> Schema<T> getSchema(Class<T> cls) { Schema<T> schema = (Schema<T>) cachedSchema.get(cls); if (schema == null) { schema = RuntimeSchema.createFrom(cls); cachedSchema.put(cls, schema); } return schema; } }
StartClient.java
package org.itstack.demo.test.client; import com.alibaba.fastjson.JSON; import io.netty.channel.ChannelFuture; import org.itstack.demo.rpc.network.client.ClientSocket; import org.itstack.demo.rpc.network.future.SyncWrite; import org.itstack.demo.rpc.network.msg.Request; import org.itstack.demo.rpc.network.msg.Response; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class StartClient { private static ChannelFuture future; public static void main(String[] args) { ClientSocket client = new ClientSocket(); new Thread(client).start(); while (true) { try { //获取future,线程有等待处理时间 if (null == future) { future = client.getFuture(); Thread.sleep(500); continue; } //构建发送参数 Request request = new Request(); request.setResult("查询用户信息"); SyncWrite s = new SyncWrite(); Response response = s.writeAndSync(future.channel(), request, 1000); System.out.println("调用结果:" + JSON.toJSON(response)); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } } }
StartServer.java
package org.itstack.demo.test.server; import org.itstack.demo.rpc.network.server.ServerSocket; /** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */ public class StartServer { public static void main(String[] args) { System.out.println("启动服务端开始"); new Thread(new ServerSocket()).start(); System.out.println("启动服务端完成"); } }
测试结果
启动StartServer
启动服务端开始 启动服务端完成 log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
启动StartClient
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"} 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"} 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"} 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"} 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"} 调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"} ...