Netty 是一个高性能、异步事件驱动的NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果.
一、Netty 高性能的原因
在 IO 编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者 IO 多路复用技术进行处理。IO 多路复用技术通过把多个 IO 的阻塞复用到同一个select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O 多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。
与 Socket 类和 ServerSocket 类相对应,NIO 也提供了SocketChannel 和ServerSocketChannel 两种不同的套接字通道实现。
1.1、多路复用通讯方式
Netty 架构按照 Reactor 模式设计和实现,它的服务端通信序列图如下:
Reactor 模式设计
客户端通信序列图如下:
客户端通信
Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端 Channel,由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 IO 阻塞导致的线程挂起。
1.2、异步通讯 NIO
由于Netty 采用了异步通信模式,一个IO 线程可以并发处理N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 IO 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
1.3零拷贝(DIRECT BUFFERS 使用堆外直接内存)
- Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写, 不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写, JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存, 消息在发送过程中多了一次缓冲区的内存拷贝。
- Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的Buffer。
- Netty 的文件传输采用了transferTo 方法,它可以直接将文件缓冲区的数据发送到目标Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题。
1.4内存池(基于内存池的缓冲区重用机制)
随着 JVM 虚拟机和 JIT 即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区 Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty 提供了基于内存池的缓冲区重用机制。
1.5、高效的 Reactor 线程模型
常用的 Reactor 线程模型有三种,Reactor 单线程模型, Reactor 多线程模型, 主从 Reactor 多线程模型。
1.5.1、Reactor 单线程模型
Reactor 单线程模型,指的是所有的 IO 操作都在同一个 NIO 线程上面完成,NIO 线程的职责如下:
1) 作为 NIO 服务端,接收客户端的 TCP 连接;
2) 作为 NIO 客户端,向服务端发起 TCP 连接;
3) 读取通信对端的请求或者应答消息
4) 向通信对端发送消息请求或者应答消息。
Reactor 单线程模型
由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO 操作都不会导致阻塞,理论上一个线程可以独立处理所有 IO 相关的操作。从架构层面看,一个 NIO 线程确实可以完成其承担的职责。例如,通过Acceptor 接收客户端的 TCP 连接请求消息,链路建立成功之后,通过 Dispatch 将对应的 ByteBuffer 派发到指定的 Handler 上进行消息解码。用户 Handler 可以通过 NIO 线程将消息发送给客户端。
1.5.2、Reactor多线程模型
Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO 操作。 有专门一个NIO 线程-Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求; 网络 IO 操作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送;
Reactor多线程模型
1.5.3、主从Reactor多线程模型
服务端用于接收客户端连接的不再是个 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。
主从Reactor多线程模型
1.6、无锁设计、线程绑定
Netty 采用了串行无锁化设计,在 IO 线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎 CPU 利用率不高,并发程度不够。但是,通过调整NIO 线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列- 多个工作线程模型性能更优。
无锁
Netty 的NioEventLoop 读取到消息之后,直接调用 ChannelPipeline 的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop 调用到用户的 Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
1.7、 高性能的序列化框架
Netty 默认提供了对 Google Protobuf 的支持,通过扩展 Netty 的编解码接口,用户可以实现其它的高性能序列化框架,例如 Thrift 的压缩二进制编解码框架。
-
SO_RCVBUF 和 SO_SNDBUF:通常建议值为 128K 或者 256K。小包封大包,防止网络阻塞
-
SO_TCPNODELAY:NAGLE 算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率。但是对于时延敏感的应用场景需要关闭该优化算法。软中断 Hash* 值和 CPU* 绑定
-
软中断:开启 RPS 后可以实现软中断,提升网络吞吐量。RPS 根据数据包的源地址,目的地址以及目的和源端口,计算出一个 hash 值,然后根据这个 hash 值来选择软中断运行的 cpu,从上层来看,也就是说将每个连接和cpu 绑定,并通过这个hash 值,来均衡软中断在多个cpu 上,提升网络并行处理性能。
二、Netty与RPC
RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC 可以很好的解耦系统,如 WebService 就是一种基于 Http 协议的 RPC。这个 RPC 整体框架如下:
RPC 整体框架
2.1、关键技术
-
服务发布与订阅:服务端使用 Zookeeper 注册服务地址,客户端从 Zookeeper 获取可用的服务地址。
-
通信:使用 Netty 作为通信框架。
-
Spring:使用 Spring 配置服务,加载 Bean,扫描注解。
-
动态代理:客户端使用代理模式透明化服务调用。
-
消息编解码:使用 Protostuff 序列化和反序列化消息。
2.2、核心流程
-
服务消费方(client)调用以本地调用方式调用服务;
-
client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
-
client stub 找到服务地址,并将消息发送到服务端;
-
server stub 收到消息后进行解码;
-
server stub 根据解码结果调用本地的服务;
-
本地服务执行并将结果返回给 server stub;
-
server stub 将返回结果打包成消息并发送至消费方;
-
client stub 接收到消息,并进行解码;
-
服务消费方得到最终结果。
RPC 的目标就是要 2~8 这些步骤都封装起来,让用户对这些细节透明。JAVA 一般使用动态代理方式实现远程调用。
RPC调用流程
2.2.1、消息编解码
消息数据结构(接口名称+方法名+参数类型和参数值+超时时间+ requestID)
客户端的请求消息结构一般需要包括以下内容:
-
接口名称:在我们的例子里接口名是“HelloWorldService”,如果不传,服务端就不知道调用哪个接口了;
-
方法名:一个接口内可能有很多方法,如果不传方法名服务端也就不知道调用哪个方法;
-
参数类型和参数值:参数类型有很多,比如有 bool、int、long、double、string、map、list, 甚至如 struct(class);以及相应的参数值;
-
超时时间:
-
requestID,标识唯一请求 id,在下面一节会详细描述 requestID 的用处。
-
服务端返回的消息 : 一般包括以下内容。返回值+状态 code+requestID
2.2.2、通讯过程
核心问题(线程暂停、消息乱序)
如果使用 netty 的话,一般会用 channel.writeAndFlush()方法来发送消息二进制串,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说, 将请求发送出来后,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:
-
怎么让当前线程“暂停”,等结果回来后,再向后执行?
-
如果有多个线程同时进行远程方法调用,这时建立在 client server 之间的 socket 连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server 处理完结果后,将结果消息发送给 client,client 收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?如下图所示,线程A 和线程B 同时向client socket 发送请求requestA 和requestB, socket 先后将 requestB 和 requestA 发送至 server,而 server 可能将 responseB 先返回, 尽管 requestB 请求到达时间更晚。我们需要一种机制保证 responseA 丢给ThreadA,responseB 丢给 ThreadB。
image-20220117213907275
2.2.3、通讯流程
- client 线程每次通过 socket 调用一次远程接口前,生成一个唯一的 ID,即 requestID(requestID 必需保证在一个 Socket 连接里面是唯一的),一般常常使用AtomicLong 从 0 开始累计数字生成唯一 ID;
- 将 处 理 结 果 的 回 调 对 象 callback , 存 放 到 全 局 ConcurrentHashMap 里 面put(requestID, callback);
- 当线程调用 channel.writeAndFlush()发送消息后,紧接着执行 callback 的 get()方法试图获取远程返回的结果。在get()内部,则使用synchronized 获取回调对象callback 的锁,再先检测是否已经获取到结果,如果没有,然后调用 callback 的 wait()方法,释放callback 上的锁,让当前线程处于等待状态。
- 服务端接收到请求并处理后,将response 结果(此结果中包含了前面的requestID)发送给客户端,客户端 socket 连接上专门监听消息的线程收到消息,分析结果,取到requestID , 再从前面的 ConcurrentHashMap 里面 get(requestID) , 从而找到callback 对象,再用 synchronized 获取 callback 上的锁,将方法调用结果设置到callback 对象里,再调用 callback.notifyAll()唤醒前面处于等待状态的线程。
三、基于Netty 手写 Dubbo 框架
3.1、实现方案
查看官网dubbo结构图
dubbo结构图
1、首先通过register
将服务提供者的url注册到Registry
注册中心中。
2、客户端Consumer
从注册中心获取被调用服务端注册信息,如:接口名称,URL地址等信息。
3、将获取的url地址返回到Consumer
客户端,客户端通过获取的URL地址支持invoke
反射机制获取服务的实现。
3.2、整体项目结构信息
|-- netty-to-dubbo
|-- netty-dubbo-api
|-- cn.org.july.netty.dubbo.api
|-- Iservice : 对外服务暴露接口
|-- RpcRequest :服务请求对象Bean
|-- netty-dubbo-common
|-- cn.org.july.netty.dubbo.annotation
|-- RpcAnnotation : 定义一个接口标识注解
|-- netty-dubbo-server
|-- cn.org.july.netty.dubbo.registry
|-- IRegisterCenter :服务注册接口
|-- RegisterCenterImpl:服务注册实现类
|-- ZkConfig:ZK配置文件
|-- cn.org.july.netty.dubbo.rpc
|-- NettyRpcServer:基于netty实现的Rpc通讯服务
|-- RpcServerHandler:Rpc服务处理流程
|-- cn.org.july.netty.dubbo.service
|-- ServiceImpl:对外接口IService接口实现类
|-- netty-dubbo-client
|-- cn.org.july.netty.dubbo.loadbalance
|-- LoadBalance :负载均衡实现接口
|-- RandomLoadBalance:负载均衡实现类随机获取服务提供者
|-- cn.org.july.netty.dubbo.proxy
|-- RpcClientProxy:netty客户端通讯组件
|-- RpcProxyHandler:netty与服务端通讯消息处理组件
|-- cn.org.july.netty.dubbo.registry
|-- IServiceDiscover:从注册中心获取注册的服务接口
|-- ServiceDiscoverImpl:接口IServiceDiscover的实现类
|-- ZkConfig:zk配置文件。
3.3、服务提供者`Provider`端
3.3.1、实现`Iservice`接口
首先我们看下Iservice
接口的内容:
package cn.org.july.netty.dubbo.api;
/**
* @author july_whj
*/
public interface IService {
/**
* 计算加法
*/
int add(int a, int b);
/**
* @param msg
*/
String sayHello(String msg);
}
我们编写ServiceImpl
实现以上两个接口类。
package cn.org.july.netty.dubbo.service;
import cn.org.july.netty.dubbo.annotation.RpcAnnotation;
import cn.org.july.netty.dubbo.api.IService;
/**
* @author july_whj
*/
@RpcAnnotation(IService.class)
public class ServiceImpl implements IService {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public String sayHello(String msg) {
System.out.println("rpc say :" + msg);
return "rpc say: " + msg;
}
}
该类实现比较简单,不做多处理,下面分析服务注册。
3.3.2、服务注册到ZK
首先我们定义一个接口类`IRegisterCenter`,里面定义一个`registry`方法,该方法实现服务注册。服务注册需要将服务的名称、服务的地址注册到注册中心中,我们定义接口如下:
package cn.org.july.netty.dubbo.registry;
/**
* @author july_whj
*/
public interface IRegisterCenter {
/**
* 服务注册
* @param serverName 服务名称(实现方法路径)
* @param serviceAddress 服务地址
*/
void registry(String serverName,String serviceAddress);
}
第二,我们使用zookeerper作为服务注册中心,在`netty-dubbo-server`模块中引入zk的客户端操作类,pom文件如下:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.5.0</version>
</dependency>
注意:这里版本使用的2.5.0,我使用的zk版,
第三,实现该接口编写接口实现类`RegisterCenterImpl`。
package cn.org.july.netty.dubbo.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @author july_whj
*/
public class RegisterCenterImpl implements IRegisterCenter {
private CuratorFramework curatorFramework;
{
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(ZkConfig.addr).sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
curatorFramework.start();
}
@Override
public void registry(String serverName, String serviceAddress) {
String serverPath = ZkConfig.ZK_REGISTER_PATH.concat("/").concat(serverName);
try {
if (curatorFramework.checkExists().forPath(serverPath) == null) {
curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(serverPath, "0".getBytes());
}
String addr = serverPath.concat("/").concat(serviceAddress);
String rsNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL)
.forPath(addr, "0".getBytes());
System.out.println("服务注册成功," + rsNode);
} catch (Exception e) {
e.printStackTrace();
}
}
}
我们分析下以上代码:
定义一个`CuratorFramework`对象,通过代码块来实例化该对象,并通过`curatorFramework.start();`来连接ZKConfig中配置好的地址连接ZK。
使用zk作为注册中心,我们了解下ZK的存储结构。zookeeper的命名空间的结构和文件系统很像。一个名字和文件一样使用/的路径表现,zookeeper的每个节点都是被路径唯一标识的。
在这里插入图片描述
分析一下registry
方法,首先从ZkConfig中获取要注册数据的根节点信息,并将该信息和服务名称进行拼接,判断该路径是否存在,如果不存在使用PERSISTENT
方式创建该服务名称路径信息。PERSISTENT
方式为持久方式,我们使用这种方式创建因为服务名称不是动态变化的,不用每次去监听它的变化。而我们服务的地址是有可能存在多个,并且有可能发生变化,我们使用EPHEMERAL
方式来创建服务的实现地址。
我们将`ServiceImpl`服务注册到zk上,我们首先获取这个服务的服务名称,和服务实现的地址,将该服务的服务名称和服务地址注册到zk上,下面看下我们的注册服务的测试类`RegTest`。
import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import cn.org.july.netty.dubbo.registry.RegisterCenterImpl;
import java.io.IOException;
public class RegTest {
public static void main(String[] args) throws IOException {
IRegisterCenter registerCenter = new RegisterCenterImpl();
registerCenter.registry("cn.org.july.test", "127.0.0.1:9090");
System.in.read();
}
}
我们将`cn.org.july.test`服务,和服务实现的地址127.0.0.1:9090注册到zk中。
看下服务执行效果:
在这里插入图片描述
服务端显示注册成功,我们看以下zk服务中有没有该数据,
在这里插入图片描述
最后,我们可以看到数据注册成功。
3.3.3、实现NettyRpcServer
我们要将`ServiceImpl`服务发布到zk上,并通过netty监听某个端口信息。
我们先看下
package cn.org.july.netty.dubbo.rpc;
import cn.org.july.netty.dubbo.annotation.RpcAnnotation;
import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.HashMap;
import java.util.Map;
/**
* @author july_whj
*/
public class NettyRpcServer {
private IRegisterCenter registerCenter;
private String serviceAddress;
private Map<String, Object> handlerMap = new HashMap<>(16);
public NettyRpcServer(IRegisterCenter registerCenter, String serviceAddress) {
this.registerCenter = registerCenter;
this.serviceAddress = serviceAddress;
}
/**
* 发布服务
*/
public void publisher() {
for (String serviceName : handlerMap.keySet()) {
registerCenter.registry(serviceName, serviceAddress);
}
try {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动netty服务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
channelPipeline.addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
channelPipeline.addLast(new ObjectEncoder());
channelPipeline.addLast(new RpcServerHandler(handlerMap));
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
String[] addr = serviceAddress.split(":");
String ip = addr[0];
int port = Integer.valueOf(addr[1]);
ChannelFuture future = bootstrap.bind(ip, port).sync();
System.out.println("服务启动,成功。");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 子对象的实现
*
* @param services 对象实现类
*/
public void bind(Object... services) {
//将实现类通过注解获取实现类的名称、实现类的实现放入map集合中。
for (Object service : services) {
RpcAnnotation annotation = service.getClass().getAnnotation(RpcAnnotation.class);
String serviceName = annotation.value().getName();
handlerMap.put(serviceName, service);
}
}
}
分析下以上代码:
通过bind方法,将服务提供者通过`RpcAnnotation`注解获取服务名称,并将服务名称,服务实现类放入handlerMap 中。
通过publisher方法,获取handlerMap 中的服务实现,将这些服务实现通过`registerCenter.registry(serviceName, serviceAddress)`将这些服务注册到zk注册中心中,完成服务的注册。下面代码是netty的基础代码,创建两个工作线程池,启动netty服务,通过channelPipeline定义序列化对象和RpcServerHandler实现。这里不做过多解析。
我们看下`RpcServerHandler`的代码实现。
package cn.org.july.netty.dubbo.rpc;
import cn.org.july.netty.dubbo.api.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.nio.Buffer;
import java.util.HashMap;
import java.util.Map;
public class RpcServerHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> handlerMap = new HashMap<>();
public RpcServerHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws UnsupportedEncodingException {
System.out.println("channelActive:" + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务端接收到消息:" + msg);
RpcRequest rpcRequest = (RpcRequest) msg;
Object result = new Object();
if (handlerMap.containsKey(rpcRequest.getClassName())) {
Object clazz = handlerMap.get(rpcRequest.getClassName());
Method method = clazz.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getTypes());
result = method.invoke(clazz, rpcRequest.getParams());
}
ctx.write(result);
ctx.flush();
ctx.close();
}
}
这里复写了`channelRead`方法,接收客户端传递的`RpcRequest`对象信息。下面判断handlerMap中是否存在客户端调用的实现类,如果存在通过反射机制获取服务端实现类,通过`invoke`方法调用方法实现,并将执行结果`result`对象通过`ctx.write(result);`将执行结果返回客户端。
3.3.4、编写服务启动类`ServerTest`
import cn.org.july.netty.dubbo.api.IService;
import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import cn.org.july.netty.dubbo.registry.RegisterCenterImpl;
import cn.org.july.netty.dubbo.rpc.NettyRpcServer;
import cn.org.july.netty.dubbo.service.ServiceImpl;
/**
* Created with IntelliJ IDEA.
* User: wanghongjie
* Date: 2019/5/3 - 23:03
* <p>
* Description:
*/
public class ServerTest {
public static void main(String[] args) {
IService service = new ServiceImpl();
IRegisterCenter registerCenter = new RegisterCenterImpl();
NettyRpcServer rpcServer = new NettyRpcServer(registerCenter, "127.0.0.1:8080");
rpcServer.bind(service);
rpcServer.publisher();
}
}
启动netty服务,将服务实现类service通过bind方法绑定到handlerMap中,通过publisher方法,将service、服务实现地址发布到zk,并启动netty服务,监听8080端口。
3.4、实现服务消费者
做为服务消费者,我们首先要连接zk注册中心,获取服务实现的地址,并实时监听获取最新的地址信息。通过远程调用实现该服务。如果服务实现是多个我们需实现客户端负载,选取我们的服务地址。
3.4.1、负载均衡实现
定义`loadbalance`接口.
package cn.org.july.netty.dubbo.loadbalance;
import java.util.List;
public interface LoadBalance {
String select(List<String> repos);
}
定义`select`选择方法。
通过RandomLoadBalance
实现loadbalance
接口,从实现名称可以看到Random随机获取。
package cn.org.july.netty.dubbo.loadbalance;
import java.util.List;
import java.util.Random;
public class RandomLoadBalance implements LoadBalance {
@Override
public String select(List<String> repos) {
int len = repos.size();
if (len == 0)
throw new RuntimeException("未发现注册的服务。");
Random random = new Random();
return repos.get(random.nextInt(len));
}
}
3.4.2、获取注册中心服务注册信息
定义`IServiceDiscover`接口,定义`discover`方法,进行服务发现。
package cn.org.july.netty.dubbo.registry;
public interface IServiceDiscover {
String discover(String serviceName);
}
通过ServiceDiscoverImpl
实现IServiceDiscover
接口。
package cn.org.july.netty.dubbo.registry;
import cn.org.july.netty.dubbo.loadbalance.LoadBalance;
import cn.org.july.netty.dubbo.loadbalance.RandomLoadBalance;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.ArrayList;
import java.util.List;
/**
* @author july_whj
*/
public class ServiceDiscoverImpl implements IServiceDiscover {
List<String> repos = new ArrayList<String>();
private CuratorFramework curatorFramework;
public ServiceDiscoverImpl() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(ZkConfig.addr).sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorFramework.start();
}
@Override
public String discover(String serviceName) {
String path = ZkConfig.ZK_REGISTER_PATH.concat("/").concat(serviceName);
try {
repos = curatorFramework.getChildren().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
registerWatch(path);
LoadBalance loadBalance = new RandomLoadBalance();
return loadBalance.select(repos);
}
/**
* 监听ZK节点内容刷新
*
* @param path 路径
*/
private void registerWatch(final String path) {
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
repos = curatorFramework.getChildren().forPath(path);
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
try {
childrenCache.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
和服务注册同样定义`CuratorFramework`对象,并通过`curatorFramework.start();`连接ZK。
连接成功后通过zk注册的根节点加服务名称,获取该服务的服务地址。
获取的服务地址有可能不是最新的服务地址,我们需要监听zk节点的内容刷新,通过调用`registerWatch`方法,监听该节点的数据变化。
最后,将获取到的地址集合,通过`LoadBalance`随机选出一个地址,实现该服务。
3.4.3、客户端netty实现RPC远程调用
定义客户端实现类RpcClientProxy
.
package cn.org.july.netty.dubbo.proxy;
import cn.org.july.netty.dubbo.api.RpcRequest;
import cn.org.july.netty.dubbo.registry.IServiceDiscover;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* Created with IntelliJ IDEA.
* User: wanghongjie
* Date: 2019/5/3 - 23:08
* <p>
* Description:
*/
public class RpcClientProxy {
private IServiceDiscover serviceDiscover;
public RpcClientProxy(IServiceDiscover serviceDiscover) {
this.serviceDiscover = serviceDiscover;
}
public <T> T create(final Class<T> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass}, new InvocationHandler() {
//封装RpcRequest请求对象,然后通过netty发送给服务等
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setTypes(method.getParameterTypes());
rpcRequest.setParams(args);
//服务发现,zk进行通讯
String serviceName = interfaceClass.getName();
//获取服务实现url地址
String serviceAddress = serviceDiscover.discover(serviceName);
//解析ip和port
System.out.println("服务端实现地址:" + serviceAddress);
String[] arrs = serviceAddress.split(":");
String host = arrs[0];
int port = Integer.parseInt(arrs[1]);
System.out.println("服务实现ip:" + host);
System.out.println("服务实现port:" + port);
final RpcProxyHandler rpcProxyHandler = new RpcProxyHandler();
//通过netty方式进行连接发送数据
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
channelPipeline.addLast(new ObjectEncoder());
//netty实现代码
channelPipeline.addLast(rpcProxyHandler);
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
//将封装好的对象写入
future.channel().writeAndFlush(rpcRequest);
future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
group.shutdownGracefully();
}
return rpcProxyHandler.getResponse();
}
});
}
}
我们看下`create`方法,通过动态代理newProxyInstance方法,传入待调用的接口对象,获取getClassLoader后,实现invoke方法。定义`RpcRequest`对象,封装请求参数。通过`interfaceClass`对象获取服务实现名称,调用`discover`方法获取服务提供者的地址信息,netty通过该信息连接服务,并将`RpcRequest`对象发送到服务端,服务端解析对象,获取接口请求参数等信息,执行方法,并将结果返回到客户端`RpcProxyHandler`对象接收返回结果。`RpcProxyHandler`代码实现:
package cn.org.july.netty.dubbo.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created with IntelliJ IDEA.
* User: wanghongjie
* Date: 2019/5/3 - 23:21
* <p>
* Description:
*/
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将服务端返回的内容返回
response = msg;
}
}
我们复写`channelRead`方法,获取服务端返回的结果信息`msg`,并将`msg`赋值给`response`,通过`getResponse`获取返回信息。
3.4.4、客户单调用测试
import cn.org.july.netty.dubbo.api.IService;
import cn.org.july.netty.dubbo.proxy.RpcClientProxy;
import cn.org.july.netty.dubbo.registry.IServiceDiscover;
import cn.org.july.netty.dubbo.registry.ServiceDiscoverImpl;
/**
* Created with IntelliJ IDEA.
* User: wanghongjie
* Date: 2019/5/3 - 23:06
* <p>
* Description:
*/
public class ClientTest {
public static void main(String[] args) {
IServiceDiscover serviceDiscover = new ServiceDiscoverImpl();
RpcClientProxy rpcClientProxy = new RpcClientProxy(serviceDiscover);
IService iService = rpcClientProxy.create(IService.class);
System.out.println(iService.sayHello("netty-to-dubbo"));
System.out.println(iService.sayHello("你好"));
System.out.println(iService.sayHello("成功咯,很高兴"));
System.out.println(iService.add(10, 4));
}
}
我们看下执行效果。
服务端启动:
服务端启动
客户端调用:
客户端调用
远程调用完成。