基于netty的RPC框架

1 Socket网络编程

1.1 socket概述

  Socket,套接字就是两台主机之间逻辑连接的端点。TCP/IP协议是传输层协议,主要解决数据如何 在网络中传输,而HTTP是应用层协议,主要解决如何包装数据。Socket是通信的基石,是支持TCP/IP协 议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信 息:连接使用的协议、本地主机的IP地址、本地进程的协议端口、远程主机的IP地址、远程进程的协议 端口。

1.2 Socket整体流程

  Socket编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字 (ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。端口号的范围是0到 65536,但是0到1024是为特权服务保留的端口号,可以选择任意一个当前没有被其他进程使用的端 口。

  客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接 字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。

基于netty的RPC框架

 

 

 1.3 代码实现

服务端代码:

public class ServerDemo {
public static void main(String[] args) throws Exception {
//1.创建一个线程池,如果有客户端连接就创建一个线程, 与之通信
ExecutorService executorService = Executors.newCachedThreadPool();
//2.创建 ServerSocket 对象
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务器已启动");
while (true) {
//3.监听客户端
Socket socket = serverSocket.accept();
System.out.println("有客户端连接");
//4.开启新的线程处理
executorService.execute(new Runnable() {
@Override
public void run() {
handle(socket);
}
});
}
}
public static void handle(Socket socket) {
try {
System.out.println("线程ID:" + Thread.currentThread().getId()
+ " 线程名称:" + Thread.currentThread().getName());
//从连接中取出输入流来接收消息
InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("客户端:" + new String(b, 0, read));
//连接中取出输出流并回话
OutputStream os = socket.getOutputStream();
os.write("你好".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//关闭连接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

客户端代码:

public class ClientDemo {
public static void main(String[] args) throws Exception {
while (true) {
//1.创建 Socket 对象
Socket s = new Socket("127.0.0.1", 9999);
//2.从连接中取出输出流并发消息
OutputStream os = s.getOutputStream();
System.out.println("请输入:");
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
os.write(msg.getBytes());
//3.从连接中取出输入流并接收回话
InputStream is = s.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("你好:" + new String(b, 0, read).trim());
//4.关闭
s.close();
}
}
}

2 I/O模型  

https://www.cnblogs.com/jeolyli/p/15087007.html

3 netty核心原理

3.1 原生NIO存在的问题

  1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、 SocketChannel、ByteBuffer等。

  2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须 对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥 塞和异常流的处理等等。

  4. JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7版本该问题仍旧存在,没有被根本解决。

  在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方 法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场 景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影 响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。

3.2 概述

  Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框 架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为 当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了 广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

  Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、 WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。

3.3 线程模型

  目前存在的线程模型有:

  ·传统阻塞 I/O 服务模型

  ·Reactor 模型

  根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现: 单 Reactor 单线程, 单 Reactor 多线程 ,主从 Reactor 多线程

3.3.1 传统阻塞 I/O 服务模型

采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回 工作。

基于netty的RPC框架

 

 

 

存在问题: 1. 当并发数很大,就会创建大量的线程,占用很大系统资源 2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

3.3.2 Reactor 模型

  Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个 请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用 IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.

1.单Reactor单线程

 

基于netty的RPC框架

 

 

 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点:

1. 性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时, 整个进程无法处理其他连接事件,很容易导致性能瓶颈

2. 可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处 理外部消息,造成节点故障

2. 单 Reactor多线程

基于netty的RPC框架

 

 

 Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发

如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求

如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理

handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务

worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler

handler 收到响应后,通过 send 将结果返回给 client

优点: 可以充分的利用多核 cpu 的处理能力

缺点: 多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在 高并发场景容易出现性能瓶颈。

3. 主从 Reactor 多线程

基于netty的RPC框架

 

 

 优点:

1. MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要 接收新连接,SubReactor 线程完成后续的业务处理

2. MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接 传给 SubReactor 线程,SubReactor 线程无需返回数据

3. 多个 SubReactor 线程能够应对更高的并发请求

缺点:

这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括 Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开 发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理 线程。这是业界成熟的服务器程序设计模式。

3.4 netty线程模型

基于netty的RPC框架

 

 

 ·Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的 线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是 NioEventLoopGroup。

·NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLoop。

·NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel) ·NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop

·每个 BossNioEventLoop 中循环执行以下三个步骤

   select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)

   processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上    runAllTasks:再去以此循环处理任务队列中的其他任务

·每个 WorkerNioEventLoop 中循环执行以下三个步骤

  select:轮训注册在其上的 NioSocketChannel 的 read/write 事件 (OP_READ/OP_WRITE 事件)

  processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件            runAllTasks:再去以此循环处理任务队列中的其他任务

·在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器 (拦截处理器、过滤处理器、自定义处理器等)。

3.4 核心API

3.4.1 ChannelHandler及其实现类

  ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻 辑。

基于netty的RPC框架

 

 

 Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类,然后 通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法    public void channelActive(ChannelHandlerContext ctx),通道就绪事件

        public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件

  public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件           public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事 件。

3.4.2 ChannelPipeline

  ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和 操作,相当于一个贯穿 Netty 的责任链.

基于netty的RPC框架

 

 

 

  如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型 或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的 Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象 中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler 是按照Pipeline的加载顺序,逆序执行。

3.4.3  ChannelHandlerContext

  这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点 ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时 ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对 ChannelHandler 进行调用。常用方法如下所示:

   ChannelFuture close(),关闭通道

  ChannelOutboundInvoker flush(),刷新

   ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前

  ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

3.4.3 ChannelOption

  Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标 准参数,而非 Netty 独创的。常用的参数配置有:                      

  · ChannelOption.SO_BACKLOG

对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理 客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服 务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。

·ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。该参数用于设置TCP连接,当设置该选 项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以 后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

3.4.4 ChannelFuture

  表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返 回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。 常用方法如下所示: Channel channel(),返回当前正在进行 IO 操作的通道 ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

3.4.5 EventLoopGroup和实现类NioEventLoopGroup

  EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

  EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任 务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如: BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel 对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理。

  BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通 常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup, WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注 册到其维护的 Selector 并对其后续的 IO 事件进行处理。

3.4.6 ServerBootstrap和Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置; Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop

public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop public B channel(Class channelClass),该方法用来设置一个服务器端的通道 实现

public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置。

public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通 道添加配置。

public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处 理类(自定义的 handler)。

public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服 务器端 。

3.4.7 Unpooled类

  这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示: public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和 字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)。

3.5 基于Netty的http及WebSocket开发

  WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间 的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器 只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

WebSocket和HTTP的区别

  http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发 送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一 个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服 务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端 要想实时获取服务端消息就得不断发送长连接到服务端.

  WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送 信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head 的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了 部分多余的信息。

http服务端:

public class NettyHttpServer {
//端口号
private int port;
public NettyHttpServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
//1. 创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup bossGroup = null;
//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//3. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//4. 设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //5. 设置服务端通道
实现为NIO
.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
.childOption(ChannelOption.SO_KEEPALIVE,
Boolean.TRUE)//6. 参数设置
.childHandler(new ChannelInitializer<SocketChannel>() {
//7. 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//8. 向pipeline中添加自定义业务处理handler
//添加编解码器
ch.pipeline().addLast(new HttpServerCodec());
// 自定义业务处理类
ch.pipeline().addLast(new
NettyHttpServerHandler());
}
 NettyHttpServerHandle
});
//9. 启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws
Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("http服务端启动成功.");
//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyHttpServer(8080).run();
}
}
/**
* http服务器处理类
*/
public class NettyHttpServerHandler extends
SimpleChannelInboundHandler<HttpObject> {
/**
* 读取就绪事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
//1.判断请求是不是HTTP请求
if (msg instanceof HttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg;
System.out.println("浏览器请求路径:" + request.uri());
if ("/favicon.ico".equals(request.uri())) {
System.out.println("图标不响应");
return;
}
//2.给浏览器进行响应
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器 ",
CharsetUtil.UTF_8);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, byteBuf);
//2.1 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,
byteBuf.readableBytes());
ctx.writeAndFlush(response);
}
}
}

websocket服务器:

**
* Netty服务器
*/
5. 通道初始化对象
@Component
public class NettyWebSocketServer implements Runnable {
@Autowired
NettyConfig nettyConfig;
@Autowired
WebSocketChannelInit webSocketChannelInit;
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* 资源关闭--在容器销毁是关闭
*/
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void run() {
try {
//1.创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2.设置线程组
serverBootstrap.group(bossGroup, workerGroup);
//3.设置参数
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(webSocketChannelInit);
//4.启动
ChannelFuture channelFuture =
serverBootstrap.bind(nettyConfig.getPort()).sync();
System.out.println("--Netty服务端启动成功---");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  

/**
* 通道初始化对象
*/
@Component
public class WebSocketChannelInit extends ChannelInitializer {
@Autowired
NettyConfig nettyConfig;
@Autowired
WebSocketHandler webSocketHandler;
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//对http协议的支持.
pipeline.addLast(new HttpServerCodec());
// 对大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//post请求分三部分. request line / request header / message body
// HttpObjectAggregator将多个信息转化成单一的request或者response对象
pipeline.addLast(new HttpObjectAggregator(8000));
// 将http协议升级为ws协议. websocket的支持
pipeline.addLast(new
WebSocketServerProtocolHandler(nettyConfig.getPath()));
// 自定义处理handler
pipeline.addLast(webSocketHandler);
}
}

  自定义处理类:

public class WebSocketHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
public static List<Channel> channelList = new ArrayList<>();
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//当有新的客户端连接的时候, 将通道放入集合
channelList.add(channel);
System.out.println("有新的连接.");
}
/**
* 通道未就绪--channel下线
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
Channel channel = ctx.channel();
//当有客户端断开连接的时候,就移除对应的通道
channelList.remove(channel);
}
/**
* 读就绪事件
*
* @param ctx
* @param textWebSocketFrame
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame textWebSocketFrame) throws Exception {
String msg = textWebSocketFrame.text();
System.out.println("msg:" + msg);
//当前发送消息的通道, 当前发送的客户端连接
Channel channel = ctx.channel();
for (Channel channel1 : channelList) {
//排除自身通道
if (channel != channel1) {
channel1.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
/**
* 异常处理事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
//移除集合
channelList.remove(channel);
}
}

3.6 Netty中粘包和拆包的解决方案

  粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息 的时候,都需要考虑TCP底层的粘包/拆包机制。 TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含 义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分 成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包 问题。

1. 业内解决方案

  由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个 问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

  ·消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息

  ·将换行符作为消息结束符

  ·将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符

  ·通过在消息头中定义长度字段来标识消息的总长度

2. Netty中的粘包和拆包解决方案

Netty提供了4种解码器来解决,分别如下:

  ·固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度 的大小

  ·行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割 拆分

  ·分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔 符,进行分割拆分

  ·基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为 接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要 求,就是应用层协议中包含数据包的长度。

4 自定义RPC框架

4.1 分布式架构网络通信

  在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现 远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢?

4.1.1 基本原理

  要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络 通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输 协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输 协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现.

4.1.2 什么是RPC

  RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样调用远程服 务,是一种进程间的通信方式.

  比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用 B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来 表达调用的语义和传达调用的数据。需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调 用过程。

基于netty的RPC框架

 

 

 RPC架构

一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。

·客户端(Client),服务的调用方。

·客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后 通过网络远程发送给服务方。

·服务端(Server),真正的服务提供者。

·服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。

基于netty的RPC框架

 

 

 RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11

注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要 将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。 在java中RPC框架比较多,常见的有Hessian、gRPC、Dubbo 等,其实对 于RPC框架而言,核心模块 就是通讯和序列化

4.1.3 RMI

  Java RMI,即远程方法调用(Remote Method Invocation),一种用于实现远程过程调用(RPCRemote procedure call)的Java API, 能直接传输序列化后的Java对象。它的实现依赖于Java虚拟机,因 此它仅支持从一个JVM到另一个JVM的调用.

/**
* 服务端
*/
public class RMIServer {
public static void main(String[] args) {
try {
//1.注册Registry实例. 绑定端口
Registry registry = LocateRegistry.createRegistry(9998);
//2.创建远程对象
IUserService userService = new UserServiceImpl();
//3.将远程对象注册到RMI服务器上即(服务端注册表上)
registry.rebind("userService", userService);
System.out.println("---RMI服务端启动成功----");
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
/**
* 客户端
*/
public class RMIClient {
public static void main(String[] args) throws RemoteException,
NotBoundException {
//1.获取Registry实例
Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998);
//2.通过Registry实例查找远程对象
IUserService userService = (IUserService)
registry.lookup("userService");
User user = userService.getByUserId(2);
System.out.println(user.getId() + "----" + user.getName());
}
}
public interface IUserService extends Remote {
User getById(int id) throws RemoteException;
}
public class UserServiceImpl extends UnicastRemoteObject implements
IUserService {
Map<Object, User> userMap = new HashMap();
protected UserServiceImpl() throws RemoteException {
super();
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
@Override
public User getById(int id) throws RemoteException {
return userMap.get(id);
}
}

4.2 基于Netty实现RPC框架

  dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架,消费者和提 供者约定接口和协议,消费者远程调用提供者的服务,

1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定,

2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据

3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 进行数据通信

4. 提供者与消费者数据传输使用json字符串数据格式

5. 提供者使用netty集成spring boot 环境实现 

案例: 客户端远程调用服务端提供根据ID查询user对象的方法.

注解RpcService:

/**
* 对外暴露服务接口
*/
@Target(ElementType.TYPE) // 用于接口和类上
@Retention(RetentionPolicy.RUNTIME)// 在运行时可以获取到
public @interface RpcService {
}

实现类UserServiceImpl:

@RpcService
@Service
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User();
服务Netty启动类RpcServer
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}

服务Netty启动类RpcServer :

/**
* 启动类
*/
@Service
public class RpcServer implements DisposableBean {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@Autowired
RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
try {
//1. 创建线程组
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//2. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3. 设置参数
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>
() {
@Override
protected void initChannel(SocketChannel
channel) throws Exception {
ChannelPipeline pipeline =
channel.pipeline();
//添加String的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//业务处理类
pipeline.addLast(rpcServerHandler);
}
});
//4.绑定端口
ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
System.out.println("==========服务端启动成功==========");
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}

服务业务处理类RpcServerHandler 

/**
* 服务端业务处理类
* 1.将标有@RpcService注解的bean缓存
* 2.接收客户端请求
* 3.根据传递过来的beanName从缓存中查找到对应的bean
* 4.解析请求中的方法名称. 参数类型 参数信息
* 5.反射调用bean的方法
* 6.给客户端进行响应
*/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends
SimpleChannelInboundHandler implements ApplicationContextAware {
private static final Map SERVICE_INSTANCE_MAP = new
ConcurrentHashMap();
/**
* 1.将标有@RpcService注解的bean缓存
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext
applicationContext) throws BeansException {
Map serviceMap =
applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size() > 0) {
Set> entries =
serviceMap.entrySet();
for (Map.Entry item : entries) {
Object serviceBean = item.getValue();
if (serviceBean.getClass().getInterfaces().length == 0)
{
throw new RuntimeException("服务必须实现接口");
}
//默认取第一个接口作为缓存bean的名称
String name = serviceBean.getClass().getInterfaces()
[0].getName();
SERVICE_INSTANCE_MAP.put(name, serviceBean);
}
}
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext
channelHandlerContext, String msg) throws Exception {
//1.接收客户端请求- 将msg转化RpcRequest对象
RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
try {
//业务处理
rpcResponse.setResult(handler(rpcRequest));
} catch (Exception exception) {
exception.printStackTrace();
rpcResponse.setError(exception.getMessage());
}
//6.给客户端进行响应
channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* 业务处理逻辑
*
* @return
*/
public Object handler(RpcRequest rpcRequest) throws
InvocationTargetException {
// 3.根据传递过来的beanName从缓存中查找到对应的bean
Object serviceBean =
SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
if (serviceBean == null) {
throw new RuntimeException("根据beanName找不到服务,beanName:"
+ rpcRequest.getClassName());
}
//4.解析请求中的方法名称. 参数类型 参数信息
Class serviceBeanClass = serviceBean.getClass();
String methodName = rpcRequest.getMethodName();
Class[] parameterTypes = rpcRequest.getParameterTypes();
Object[] parameters = rpcRequest.getParameters();
//5.反射调用bean的方法- CGLIB反射调用
FastClass fastClass = FastClass.create(serviceBeanClass);
FastMethod method = fastClass.getMethod(methodName,
parameterTypes);
return method.invoke(serviceBean, parameters);
}
}

启动类ServerBootstrap:

@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.01", 8899);
}
}).start();
}
}

客户端代码实现:

/**
* 客户端
* 1.连接Netty服务端
* 2.提供给调用者主动关闭资源的方法
* 3.提供消息发送的方法
*/
public class RpcClient {
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
private ExecutorService executorService =
Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
/**
* 初始化方法-连接Netty服务端
*/
public void initClient() {
try {
//1.创建线程组
group = new NioEventLoopGroup();
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3.设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel
channel) throws Exception {
ChannelPipeline pipeline =
channel.pipeline();
//String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加客户端处理类
pipeline.addLast(rpcClientHandler);
}
});
//4.连接Netty服务端
channel = bootstrap.connect(ip, port).sync().channel();
} catch (Exception exception) {
exception.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
/**
* 提供给调用者主动关闭资源的方法
*/
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 提供消息发送的方法
*/
public Object send(String msg) throws ExecutionException,
InterruptedException {
rpcClientHandler.setRequestMsg(msg);
Future submit = executorService.submit(rpcClientHandler);
return submit.get();
}
}

客户端业务处理类RpcClientHandler

 

/**
* 客户端处理类
* 1.发送消息
* 2.接收消息
*/
public class RpcClientHandler extends
SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
//发送的消息
String requestMsg;
//服务端的消息
String responseMsg;
public void setRequestMsg(String requestMsg) {
this.requestMsg = requestMsg;
}
/**
* 通道连接就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws
Exception {
context = ctx;
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext
channelHandlerContext, String msg) throws Exception {
responseMsg = msg;
//唤醒等待的线程
notify();
}
/**
* 发送消息到服务端
*
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
//消息发送
context.writeAndFlush(requestMsg);
//线程等待
wait();
return responseMsg;
}

RPC代理类:

/**
* 客户端代理类-创建代理对象
* 1.封装request请求对象
* 2.创建RpcClient对象
* 3.发送消息
* 4.返回结果
*/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass) {
return
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//1.封装request请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//2.创建RpcClient对象
RpcClient rpcClient = new RpcClient("127.0.0.1",
8899);
try {
//3.发送消息
Object responseMsg =
rpcClient.send(JSON.toJSONString(rpcRequest));
RpcResponse rpcResponse =
JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new
RuntimeException(rpcResponse.getError());
}
//4.返回结果
Object result = rpcResponse.getResult();
return JSON.parseObject(result.toString(),
method.getReturnType());
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
});
}
}

客户端启动类ClientBootStrap:

/**
* 测试类
*/
public class ClientBootStrap {
public static void main(String[] args) {
IUserService userService = (IUserService)
RpcClientProxy.createProxy(IUserService.class);
User user = userService.getById(1);
System.out.println(user);
}
}


<string, object=""><map.entry<string, object=""><string, object="">

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现 远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢

上一篇:包机制和JavaDoc


下一篇:P1088 [NOIP2004 普及组] 火星人