本文较长,如果想直接看代码可以查看项目源码地址: https://github.com/hetutu5238/rpc-demo.git
要想实现分布式服务调用框架,我们需要了解分布式服务框架一般需要的功能有哪些。目前要想实现一个最简单的服务调用框架要做到的有以下的功能。
服务注册与发现,调用过程封装,消费负载均衡,序列化与反序列化,网关(可以用nginx实现)等。本文则从实现这些功能点的步骤出发来模拟一个
简单的服务调用框架
1.idea中创建父项目rpc-parent,子项目 rpc-common ,rpc-client(测试客户端),rpc-server(测试服务端)
项目基于maven ,项目创建过程略,效果如下即可
rpc-parent的pom.xml如下即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.maglith</groupId>
<artifactId>rpc-parent</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>rpc-server</module>
<module>rpc-client</module>
<module>rpc-common</module>
</modules>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.下载并启动nacos
本项目使用nacos作为注册中心
这个在 https://www.cnblogs.com/hetutu-5238/p/11089577.html 中的第2步已有说明
现在开始编写核心的common项目
在common项目下创建com.rpc包 ,以下cmmon项目的操作均已该包为基础
1.创建util包,并创建Assert类
public class Assert { private Assert(){}; public static void notNull(Object obj, String message) {
if (obj == null) {
throw new RuntimeException(message);
}
} public static void on(boolean flag, String message) {
if (flag) {
throw new RuntimeException(message);
}
}
}
2.创建anno包,并创建RpcService注解 该注解的主要用来标注提供服务 标注该接口的类必须实现接口
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService { Class value();
}
3. 创建Respository包
ClientRepository 用来存储服务调用连接
public class ClientRepository { private static Map<String, Channel> repo = new ConcurrentHashMap<>(); public static void put(String key , Channel channel) {
if ( key == null || channel == null ) {
System.err.println("channel or it's name can't be null ");
return;
}
repo.put(key , channel);
} public static void remove(String key) {
repo.remove(key);
} public static Channel getChannel(String key) {
if ( repo.get(key) != null && !repo.get(key).isActive() ) {
remove(key);
}
return repo.get(key);
}
}
ServiceRepository 用来存储可提供服务信息
public class ServiceRepository { private static final Map<String, Object> repo = new HashMap<>(); public static void put(String serviceName , Object service) {
repo.put(serviceName , service);
} public static void remove(String key) {
repo.remove(key);
} public static Object getService(String serviceName) {
return repo.get(serviceName);
} public static Set<String> getAllServiceName() {
return repo.keySet();
}
}
ServiceConnectionFactory用来创建新的调用连接
public class ServiceConnectionFactory { public static Channel createConnection(String host , int port) throws InterruptedException {
String key = host + ":" + port;
System.out.println("创建新连接:" + key);
if ( ClientRepository.getChannel(key) != null ) {
ClientRepository.remove(key);
}
Bootstrap server = new Bootstrap();
Channel c = server.group(workerGroup())
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host , port))
.handler(new MyClientChannelInitializer())
.connect()
.sync()
.channel();
ClientRepository.put(key , c);
return c;
} private static EventLoopGroup workerGroup() {
return new NioEventLoopGroup(10);
}
}
该类中用的MyClientChannelInitializer会在后面写到
4. 创建config包。并创建RegisterConfig
package com.rpc.config; import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService; import com.rpc.repository.ServiceRepository;
import com.rpc.util.Assert;
import org.apache.commons.lang3.StringUtils; import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.util.*; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 16:24
*/
public class RegisterConfig {
//配置文件
private static Properties register;
//nacos注册服务
private static NamingService namingService; private static volatile boolean init = false; private static volatile boolean serverInit = false; public static boolean getInit() {
return init;
} public static boolean getServerInit() {
return serverInit;
} /**
* serverFlag为true则代表服务端启动 serverFlag为false则代表客户端启动
* @param serverFlag
*/
public static void init(boolean serverFlag) {
if ( (serverFlag && getServerInit()) || (!serverFlag && getInit()) ) {
return;
}
System.out.println("服务端初始化开始" + new Date());
Properties properties = new Properties();
try (InputStream resourceAsStream = RegisterConfig.class.getClassLoader().getResourceAsStream("register.properties");) {
Assert.notNull(resourceAsStream,"register.properties is required");
properties.load(resourceAsStream);
register = properties;
//获取nacos服务
namingService = NamingFactory.createNamingService(String.valueOf(register.get("register.host")));
if ( serverFlag ){
regis();
serverInit = true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("服务端初始化结束" + new Date());
init = true;
} public static Integer getPort() {
Assert.notNull(register,"RegisterConfig must be initialized");
return Integer.valueOf(Objects.toString(register.get("server.port")));
} public static NamingService getNamingService() {
Assert.notNull(namingService,"RegisterConfig must be initialized");
return namingService;
} /**
* 注册到注册中心
*/
private static void regis() {
try {
String scanpkgs = String.valueOf(register.get("scanpkgs"));
Assert.on(StringUtils.isBlank(scanpkgs),"scan service package can't be null");
initService(scanpkgs.split(","));
//将服务注册到注册中心
Set<String> serviceNames = ServiceRepository.getAllServiceName();
if ( !serviceNames.isEmpty() ) {
for (String serviceName : serviceNames) {
//将服务注册到注册中心
namingService.registerInstance(serviceName , String.valueOf(register.get("server.host")) , Integer.valueOf(Objects.toString(register.get("server.port"))));
System.out.println(String.format("已注册服务:%s,服务地址为:%s" , register.get("server.host") , register.get("server.port")));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} /**
* 初始化服务
* @param pkgs 需要扫描的包
*/
private static void initService(String[] pkgs) {
List<Class<?>> classes = new ArrayList<>();
//获取所有包名下的符合条件的服务
try {
for (String pkg : pkgs) {
classes.addAll(getRpcClass(pkg));
}
for (Class<?> c : classes) {
Object o = c.newInstance();
//将服务存储到服务仓库
ServiceRepository.put(c.getAnnotation(com.rpc.anno.RpcService.class).value().getName() , o);
}
} catch (Exception e) {
throw new RuntimeException(e);
} } private static List<Class<?>> getRpcClass(String pkg) throws ClassNotFoundException {
List<Class<?>> result = new ArrayList<>();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Assert.notNull(classLoader,"Can't get ClassLoader,Thread: " + Thread.currentThread().getName());
//获取目录
String packageName = pkg.replace('.' , '/');
URL resource = classLoader.getResource(packageName);
Assert.notNull(resource,"Can't get package,package doesn't exist: " + packageName);
String fileUrl = resource.getFile();
File f = new File(fileUrl);
if ( f.exists() ) {
File[] files = f.listFiles();
if ( files != null ) {
for (File file : files) {
String fileName;
//如果是文件
if ( !file.isDirectory() && (fileName = file.getName()).endsWith(".class") ) {
Class<?> c = Class.forName(pkg + "." + fileName.substring(0 , fileName.length() - 6));
if ( c.getAnnotation(com.rpc.anno.RpcService.class) != null ) {
result.add(c);
}
} else if ( file.isDirectory() ) {
List<Class<?>> rpcClass = getRpcClass(pkg + "." + file.getName());
if ( rpcClass != null && !rpcClass.isEmpty() ) {
result.addAll(rpcClass);
}
}
}
}
} else {
throw new RuntimeException("Can't get package,package doesn't exist: " + pkg);
}
return result; } }
该类用来初始化客户端或者服务端,init()方法传入的参数即标识初始化类型
register.properties主要为配置类,在后面的客户端或者服务端项目添加 不要在common项目下添加,内容如下
#必须设置注册中心地址
register.host=127.0.0.1:8848
#如果为服务端则必须设置 设置扫描包 多个包以","分割
scanpkgs=com.rpc
#如果为服务端则必须设置 设置本地服务地址
server.host=127.0.0.1
#如果为服务端则必须设置 设置本地服务端口
server.port=8080
5.创建support包 该包下的类主要为传输类
RpcRequest 服务请求类
package com.rpc.support; import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 10:52
*/
public class RpcRequest implements Serializable {
//任务id生成器
private static final AtomicLong REQUEST_ID = new AtomicLong(0);
//请求id
private long id;
//请求的服务名 该项目为接口的全限定名
private String serviceName;
//请求的方法名
private String methodName;
//请求的参数类型
private Class<?>[] paramsTypes;
//请求的参数
private Object[] params; public long getId() {
return id;
} public void setId(long id) {
this.id = id;
} public void newId() {
this.id = REQUEST_ID.getAndIncrement();
} public String getServiceName() {
return serviceName;
} public void setServiceName(String serviceName) {
this.serviceName = serviceName;
} public String getMethodName() {
return methodName;
} public void setMethodName(String methodName) {
this.methodName = methodName;
} public Class<?>[] getParamsTypes() {
return paramsTypes;
} public void setParamsTypes(Class<?>[] paramsTypes) {
this.paramsTypes = paramsTypes;
} public Object[] getParams() {
return params;
} public void setParams(Object[] params) {
this.params = params;
} @Override
public String toString() {
return "RpcRequest{" +
"id=" + id +
", serviceName='" + serviceName + '\'' +
", methodName='" + methodName + '\'' +
'}';
}
}
RpcResponse类 即请求的回应类
package com.rpc.support; import java.io.Serializable; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 10:52
*/
public class RpcResponse implements Serializable { //错误信息
private Throwable error;
//返回数据
private Object response;
//对应的请求id
private long id; public long getId() {
return id;
} public void setId(Long id) {
this.id = id;
} public Throwable getError() {
return error;
} public void setError(Throwable error) {
this.error = error;
} public Object getResponse() {
return response;
} public void setResponse(Object response) {
this.response = response;
} }
RpcFuture 该类主要用来存储发送的请求并获得返回结果 ,这儿借鉴了dubbo的封装思路
package com.rpc.support; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 10:18
*/
public class RpcFuture {
//存储所有已发送 但还未得到响应的请求
private static final Map<Long, RpcFuture> REPO = new ConcurrentHashMap<>();
//每次请求对应的锁
private final Object lock = new Object();
//超时时间
private final int timeOut = 10;
//任务id
private long id;
//请求内容
private RpcRequest rpcRequest;
//请求对应的相应内容
private volatile RpcResponse rpcResponse; public RpcFuture(long id , RpcRequest rpcRequest) {
this.id = id;
this.rpcRequest = rpcRequest;
} public static void receive(RpcResponse resp) {
RpcFuture remove = REPO.remove(resp.getId());
remove.doRecieve(resp); } public static void putFuture(Long id , RpcFuture rpcFuture) {
REPO.put(id , rpcFuture);
} private boolean done() {
return this.rpcResponse != null;
} private void doRecieve(RpcResponse resp) {
synchronized (lock) {
this.rpcResponse = resp;
//唤醒请求时的等待
lock.notifyAll();
}
} public Object getResponse() {
long millis = System.currentTimeMillis();
//也可以使用BlockingQueue代替锁 但是会使线程阻塞 java8的话竞争不激烈情况下建议synchronized代替显式锁
synchronized (lock) {
if ( !done() ) {
try {
while ( !done() ) {
lock.wait(timeOut * 1000);
//被唤醒后判断下完成或者超时
if ( done() || System.currentTimeMillis() - millis > timeOut * 1000 ) {
break;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
if ( !done() ) {
throw new RuntimeException("服务已超时 服务id:" + id + " 服务内容:" + rpcRequest);
}
}
} return rpcResponse.getResponse();
}
}
6.创建client包 该包下的类供客户端或服务端调用
RpcClient 主要为客户端角色时使用,用来传输请求信息
package com.rpc.client; import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.rpc.config.RegisterConfig;
import com.rpc.repository.ServiceConnectionFactory;
import com.rpc.support.RpcRequest;
import com.rpc.repository.ClientRepository;
import com.rpc.support.RpcFuture;
import com.rpc.util.Assert;
import io.netty.channel.Channel; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 11:41
*/
public class RpcClient { public static Object transfer(RpcRequest rpcRequest) {
try {
//配置中心已实现负载均衡
RegisterConfig.init(false);
//服务注册中心拿服务 服务中心已实现负载均衡 也可以自己实现负载均衡(轮询/hash方式)
// List<Instance> instances = RegisterConfig.getNamingService().selectInstances(rpcRequest.getServiceName() , true);
// int one = rpcRequest.hashCode()%instances.size();
// Instance ins = instances.get(one);
Instance ins = RegisterConfig.getNamingService().selectOneHealthyInstance(rpcRequest.getServiceName());
Assert.notNull(ins,"service not be find :"+rpcRequest.getServiceName());
String key = ins.getIp() + ":" + ins.getPort();
Channel c = ClientRepository.getChannel(key);
if ( c == null ) {
//如果系统没有缓存这个服务的连接则创建
String[] split = key.split(":");
c = ServiceConnectionFactory.createConnection(split[ 0 ] , Integer.valueOf(split[ 1 ]));
}
Long id = rpcRequest.getId();
RpcFuture rpcFuture = new RpcFuture(id , rpcRequest);
RpcFuture.putFuture(id , rpcFuture);
c.writeAndFlush(rpcRequest);
return rpcFuture.getResponse();
} catch (NacosException e) {
throw new RuntimeException("register connection error" , e);
} catch (InterruptedException e) {
throw new RuntimeException("register connection error" , e);
} }
}
Server类主要为服务端使用 用来创建服务 监听信息
package com.rpc.client; import com.rpc.config.RegisterConfig;
import com.rpc.netty.MyServerChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler; /**
* @Description: 客户1
* @author: zhoum
* @Date: 2019-11-14
* @Time: 10:41
*/
public class Server { private static ServerBootstrap server; private static volatile boolean start = false; public static void init() throws InterruptedException {
if ( start ) {
return;
}
System.err.println("开始初始化 端口:");
//注册服务
RegisterConfig.init(true);
if ( server == null ) {
server = new ServerBootstrap();
start = true;
}
Channel channel = server.group(parentGroup() , workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new MyServerChannelInitializer())
.option(ChannelOption.SO_BACKLOG , 100)
.childOption(ChannelOption.SO_KEEPALIVE , true)
.bind(RegisterConfig.getPort())
.sync()
.channel(); channel.closeFuture().sync(); } private static EventLoopGroup parentGroup() {
return new NioEventLoopGroup(1);
} private static EventLoopGroup workerGroup() {
return new NioEventLoopGroup(10);
} }
该类中用到的MyServerChannelInitializer类会在后面说明
7.创建netty包 该包主要用来存放 对请求和响应的数据进行序列化,反序列化,以及编码解码工具类,以及对请求的处理和返回结果
在netty下创建serilaze包,创建序列化接口以及实现类 ,对传输数据进行序列化,本项目目前使用json序列化
package com.rpc.netty.serilaze; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 11:21
*/
public interface Serilazier {
byte[] serialize(Object msg); <T> T deserialize(byte[] bytes , Class<T> clz);
}
package com.rpc.netty.serilaze; import com.alibaba.fastjson.JSONObject; import java.nio.charset.Charset; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 11:22
*/
public class JsonSerilizer implements Serilazier { @Override
public byte[] serialize(Object msg) {
return JSONObject.toJSONString(msg).getBytes(Charset.defaultCharset());
} @Override
public <T> T deserialize(byte[] bytes , Class<T> clz) {
return JSONObject.parseObject(new String(bytes , Charset.defaultCharset()) , clz);
}
}
* 当然也可以使用其他序列化方式,比如想使用hessian序列化的话可以如下 首先引入hessian包
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.63</version>
</dependency>
然后新建hessian序列化类实现序列化接口
public class HessianSerlizer implements Serilazier {
@Override
public byte[] serialize(Object msg) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Hessian2Output hessian2Output = new Hessian2Output(out);
try {
hessian2Output.writeObject(msg);
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
try {
hessian2Output.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
} return out.toByteArray();
} @Override
public <T> T deserialize(byte[] bytes , Class<T> clz) {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
Hessian2Input hessian2Input = new Hessian2Input(in);
try {
T result = (T)hessian2Input.readObject(clz);
return result;
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
hessian2Input.close();
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
}
在netty包下创建codec包,在codec包下创建编码解码器 用来对传输的信息进行编码解码
package com.rpc.netty.codec; import com.rpc.netty.serilaze.Serilazier;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 11:45
*/
public class RpcEncoder extends MessageToByteEncoder<Object> { private Serilazier serilazier; public RpcEncoder(Serilazier serilazier) {
this.serilazier = serilazier;
} @Override
protected void encode(ChannelHandlerContext ctx , Object msg , ByteBuf out) {
byte[] serialize = serilazier.serialize(msg);
out.writeInt(serialize.length);
out.writeBytes(serialize);
}
}
package com.rpc.netty.codec; import com.rpc.netty.serilaze.Serilazier;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 11:47
*/
public class RpcDecoder extends ByteToMessageDecoder { private Serilazier serilazier; private Class<?> c; public RpcDecoder(Serilazier serilazier , Class<?> c) {
this.serilazier = serilazier;
this.c = c;
} @Override
protected void decode(ChannelHandlerContext ctx , ByteBuf in , List<Object> out) throws Exception {
if ( in.readableBytes() < 4 ) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if ( dataLength < 0 ) {
ctx.close();
}
if ( in.readableBytes() < dataLength ) {
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[ dataLength ];
in.readBytes(bytes);
Object obj = serilazier.deserialize(bytes , c);
out.add(obj);
}
}
在netty包下创建RpcInvoker消息处理接口,用来处理消息,并在netty包下创建invoke包,创建RpcRequest请求消息处理器,RpcResponse响应消息处理器
package com.rpc.netty; import io.netty.channel.Channel; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-28
* @Time: 17:46
*/
public interface RpcInvoker { void handle(Channel channel , Object object);
}
package com.rpc.netty.invoke; import com.rpc.netty.RpcInvoker;
import com.rpc.support.RpcRequest;
import com.rpc.support.RpcResponse; import com.rpc.repository.ServiceRepository;
import io.netty.channel.Channel; import java.lang.reflect.Method; /**
* RpcRequest请求处理器
*
* @author: zhoum
* @Date: 2019-11-26
* @Time: 11:50
*/
public class RpcRequestInvoker implements RpcInvoker { @Override
public void handle(Channel channel , Object object) {
RpcResponse res = new RpcResponse();
try {
RpcRequest request;
if ( !(object instanceof RpcRequest) ) {
res.setError(new Exception("params must be instance of com.rpc.support.RpcRequest"));
channel.writeAndFlush(res);
return;
}
request = (RpcRequest) object;
//将请求的id直接赋给响应实体
res.setId(request.getId());
//找到该服务类的处理类
Object service = ServiceRepository.getService(request.getServiceName());
if ( service == null ) {
res.setError(new Exception("can't find service for " + request.getServiceName()));
channel.writeAndFlush(res);
return;
}
//执行方法
Method method = service.getClass().getMethod(request.getMethodName() , request.getParamsTypes());
Object invoke = method.invoke(service , request.getParams());
res.setResponse(invoke);
channel.writeAndFlush(res);
} catch (Exception e) {
res.setError(e);
channel.writeAndFlush(res);
}
} }
package com.rpc.netty.invoke; import com.rpc.netty.RpcInvoker;
import com.rpc.support.RpcResponse;
import com.rpc.support.RpcFuture;
import io.netty.channel.Channel; /**
* RpcResponse响应处理器
* @author: zhoum
* @Date: 2019-11-26
* @Time: 11:50
*/
public class RpcResponseInvoker implements RpcInvoker { @Override
public void handle(Channel channel , Object object) {
if ( object instanceof RpcResponse ) {
RpcResponse resp = (RpcResponse) object;
//处理响应数据
RpcFuture.receive(resp);
}
} }
在netty包下创建AbstractHandleAdapter类继承ChannelInboundHandlerAdapter来约束必须传入消息处理器,并且创建netty请求处理器与响应处理器。并交由构造函数中的消息处理器处理
package com.rpc.netty; import io.netty.channel.ChannelInboundHandlerAdapter; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-29
* @Time: 9:39
*/
public abstract class AbstractHandleAdapter extends ChannelInboundHandlerAdapter { protected RpcInvoker rpcInvoker; public AbstractHandleAdapter(RpcInvoker rpcInvoker) {
this.rpcInvoker = rpcInvoker;
}
}
package com.rpc.netty; import io.netty.channel.ChannelHandlerContext; /**
* @Description: 请求处理适配器
* @author: zhoum
* @Date: 2019-11-14
* @Time: 11:40
*/
public class ChannelServerMessageHandler extends AbstractHandleAdapter { public ChannelServerMessageHandler(RpcInvoker rpcInvoker) {
super(rpcInvoker);
} /**
* 有新连接
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String address = ctx.channel().remoteAddress().toString();
System.out.println("收到信息: " + address);
super.channelActive(ctx);
} /**
* 连接断开
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("断开连接: " + ctx.channel().remoteAddress().toString());
super.channelInactive(ctx);
} /**
* 读取到的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {
System.out.println("服务器收到信息" + msg);
//处理器处理
rpcInvoker.handle(ctx.channel() , msg);
} /**
* 出现异常
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception {
System.err.println("出现异常: " + ctx.channel().remoteAddress().toString());
super.exceptionCaught(ctx , cause);
}
}
package com.rpc.netty; import com.rpc.repository.ClientRepository;
import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; /**
* netty响应处理适配器
* @Description:
* @author: zhoum
* @Date: 2019-11-14
* @Time: 11:40
*/
public class ChannelClientMessageHandler extends AbstractHandleAdapter { public ChannelClientMessageHandler(RpcInvoker rpcInvoker) {
super(rpcInvoker);
}
/**
* 有新连接
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String address = ctx.channel().remoteAddress().toString();
System.out.println("有新连接:"+address);
super.channelActive(ctx);
} /**
* 连接断开
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
//移除连接
ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort());
System.out.println("连接断开");
super.channelInactive(ctx);
} /**
* 读取到的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) {
//处理器处理
rpcInvoker.handle(ctx.channel() , msg);
} /**
* 出现异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception {
System.err.println("出现异常: " + ctx.channel().remoteAddress().toString());
InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
//移除连接
ClientRepository.remove(ipSocket.getAddress().getHostAddress()+":"+ipSocket.getPort());
super.exceptionCaught(ctx , cause);
}
}
最后在netty包下创建 MyClientChannelInitializer ,MyServerChannelInitializer (填之前的坑) ,这个在netty初始化客户端或者服务端的时候用上
package com.rpc.netty; import com.rpc.netty.serilaze.JsonSerilizer;
import com.rpc.netty.codec.RpcDecoder;
import com.rpc.netty.codec.RpcEncoder;
import com.rpc.netty.invoke.RpcRequestInvoker;
import com.rpc.support.RpcRequest;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; /**
* @Description: 服务端处理器
* @author: zhoum
* @Date: 2019-11-14
* @Time: 11:25
*/
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//添加编解码器
pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcRequest.class));
pipeline.addLast(new RpcEncoder(new JsonSerilizer()));
//添加消息处理器
pipeline.addLast(new ChannelServerMessageHandler(new RpcRequestInvoker()));
} }
package com.rpc.netty; import com.rpc.netty.serilaze.JsonSerilizer;
import com.rpc.netty.codec.RpcDecoder;
import com.rpc.netty.codec.RpcEncoder;
import com.rpc.netty.invoke.RpcResponseInvoker;
import com.rpc.support.RpcResponse;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; /**
* @Description: 客户端处理器
* @author: zhoum
* @Date: 2019-11-14
* @Time: 11:25
*/
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//添加编解码器
pipeline.addLast(new RpcEncoder(new JsonSerilizer()));
pipeline.addLast(new RpcDecoder(new JsonSerilizer() , RpcResponse.class));
//添加消息处理器
pipeline.addLast(new ChannelClientMessageHandler(new RpcResponseInvoker())); } }
8.在util包下创建ProxyUtil 用来构造代理类
package com.rpc.util; import com.alibaba.fastjson.JSONObject;
import com.rpc.client.RpcClient;
import com.rpc.support.RpcRequest; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:09
*/
public class ProxyUtil implements InvocationHandler { public <T> T getProxy(Class<T> t) {
T o = (T) Proxy.newProxyInstance(t.getClassLoader() , new Class<?>[]{t} , this);
return o;
} @Override
public Object invoke(Object proxy , Method method , Object[] args) throws Throwable {
//创建请求
RpcRequest request = new RpcRequest();
//每次请求构造新的id
request.newId();
request.setServiceName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParams(args);
request.setParamsTypes(method.getParameterTypes());
Object resp = RpcClient.transfer(request);
Assert.on(resp == null || resp == null,"返回结果序列化错误");
if ( resp instanceof JSONObject ) {
Class<?> returnType = method.getReturnType();
JSONObject jobj = (JSONObject) resp;
Object o = jobj.toJavaObject(returnType);
return o;
} return resp;
}
}
至此 框架代码已经开发完毕,测试如下
测试阶段
在依旧在common项目下建立test包 然后创建测试接口和测试类
package com.rpc.test; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:38
*/
public interface TestService { TestEntity getTest(String username , String password); TestEntity getTest1(String username , String password);
}
package com.rpc.test; import java.io.Serializable; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:38
*/
public class TestEntity implements Serializable { private String username; private String password; public String getUsername() {
return username;
} public void setUsername(String username) {
this.username = username;
} public String getPassword() {
return password;
} public void setPassword(String password) {
this.password = password;
} @Override
public String toString() {
return "TestEntity{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
服务端
然后打开rpc-server项目的pom.xml 引入common项目,内容如下即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc-parent</artifactId>
<groupId>com.maglith</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion> <artifactId>rpc-server</artifactId>
<dependencies>
<dependency>
<artifactId>rpc-common</artifactId>
<groupId>com.maglith</groupId>
<version>1.0-SNAPSHOT</version>
</dependency> </dependencies> </project>
在rpc-server项目中的resource下创建register.properties文件 内容如下
#必须设置注册中心地址
register.host=127.0.0.1:8848
#如果为服务端则必须设置 设置扫描包 多个包以","分割
scanpkgs=com.rpc
#如果为服务端则必须设置 设置本地服务地址
server.host=127.0.0.1
#如果为服务端则必须设置 设置本地服务端口
server.port=8080
创建上面测试接口的实现类
package com.rpc.netty.invoke; import com.rpc.anno.RpcService;
import com.rpc.test.TestEntity;
import com.rpc.test.TestService; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:40
*/
@RpcService(TestService.class)
public class TestServiceImpl implements TestService {
@Override
public TestEntity getTest(String username , String password) {
TestEntity testEntity = new TestEntity();
testEntity.setUsername(username);
testEntity.setPassword(password);
return testEntity;
} @Override
public TestEntity getTest1(String username , String password) {
TestEntity testEntity = new TestEntity();
testEntity.setUsername(username);
testEntity.setPassword(password);
return testEntity;
}
}
创建测试主类
package com.rpc.util; import com.rpc.client.Server; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:44
*/
public class RpcServerMain { public static void main(String[] args) throws InterruptedException {
Server.init();
}
}
点击运行后出现如下即代表服务端初始化成功
客户端
打开rpc-client项目 依旧引入cmmon项目 内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc-parent</artifactId>
<groupId>com.maglith</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion> <artifactId>rpc-client</artifactId>
<dependencies>
<dependency>
<artifactId>rpc-common</artifactId>
<groupId>com.maglith</groupId>
<version>1.0-SNAPSHOT</version>
</dependency> </dependencies> </project>
在rpc-client项目中的resource下创建register.properties文件 内容如下
#必须设置注册中心地址
register.host=127.0.0.1:8848
创建调用测试类
package com.rpc; import com.rpc.util.ProxyUtil;
import com.rpc.test.TestEntity;
import com.rpc.test.TestService; /**
* @Description:
* @author: zhoum
* @Date: 2019-11-27
* @Time: 14:41
*/
public class Consumer { public static void main(String[] args) {
ProxyUtil proxyUtil = new ProxyUtil();
TestService proxy = proxyUtil.getProxy(TestService.class);
int i = 1;
while ( i < 500 ) {
TestEntity test = proxy.getTest("你好" + i++ , "helloword" + i);
TestEntity test1 = proxy.getTest1("你好" + i++ , "helloword" + i);
System.out.println("收到信息:" + test);
System.out.println("收到信息:" + test1);
}
}
}
然后点击运行
这时候切换到server项目的控制台,会出现如下信息
到此 整个框架已经编写完毕并且测试成功
主要思想即使用netty构造一个rpc调用框架,并使用nacos作为服务注册与发现中心,也可以使用zookeeper,看个人喜好,可以用来理解分布式微服务的思想。
项目的数据流转如下图