文章目录
前言
这是作为sofarpc学习的子篇,主要学习netty在rpc里面的使用。
netty
很重要的三个类NettyByteBuffer(直接跳过没啥好讲),NettyChannel,NettyHelper(也直接跳过,比较简单)
NettyHelper
简单介绍:保存服务端boss,work线程池,还有客户端对应io线程池 EventLoopGroup
我们来看下在哪里set进去
com.alipay.sofa.rpc.transport.http.AbstractHttp2ServerTransport#start
com.alipay.sofa.rpc.client.AbstractCluster
很多功能
- 连接init
- 调用的时候进行负载均衡
- 调用的方式,同步异步,或者future
- 心跳,重试等等
调用的实现
com.alipay.sofa.rpc.client.AbstractCluster#doSendMsg
同步
Callback调用
request.setSofaResponseCallback(methodResponseCallback)一个重点关注点
Future调用
调用实现方式
有很多种协议,主要看下http
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport
基本参数
异步调用
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#asyncSend
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#doInvokeAsync
会分有回调callback还是future调用,最终通过doSend(request, callback, timeoutMillis);去调用
过程是这样,会去生成一个requestId,然后放到map里头,搞个超时机制去remove,如果超时的时候还在里头就会塞入异常信息
requestId是AtomicInteger去递增,channel.write(httpRequest)输出到netty服务端。
responseChannelHandler.put(requestId,channel.write(httpRequest),callback);
把streamId跟future,AbstractHttpClientHandler塞到map里头
这里处理链读到数据之后怎么一个处理过程
com.alipay.sofa.rpc.transport.http.Http2ClientChannelHandler#channelRead0
会根据header streamId去拿到对应的AbstractHttpClientHandler,处理器
callback.receiveHttpResponse(msg);
序列化,然后去setData
在这里实现前面callback或者future等等
com.alipay.sofa.rpc.transport.http.FutureInvokeClientHandler#doOnResponse
com.alipay.sofa.rpc.message.AbstractResponseFuture#setSuccess
往future里头塞result
整个过程流程图
- 客户端----future,callback调用,会保存requestId,跟对应的future或者callback
- 在服务端发送消息之后,客户端----进行解析,从header里头拿到streamId,还有future进行塞入数据
- 这时也是会从private final Map<Integer, Entry<ChannelFuture, AbstractHttpClientHandler>> streamIdPromiseMap;去踢出这个请求,不会造成超时报错
同步调用
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#syncSend
同步调用有个很骚的操作,就是它实际还是一个future去调用,作为一个callback
跟dubbo一样骚操作
就是同步帮你get一个结果,异步丢给你个future,你自己去get结果,哈哈哈,你好骚~
同步异步对比
最后还是调用doSend方法
同步方法是框架请求完直接帮你调用DefaultFuture的get方法阻塞获取结果,
异步方法是把DefaultFuture返回给用户,让用户决定获取调用get()方法的时机
优雅关闭(计算器作用)
这里不是线程池优雅关闭,是调用线程,它会一直等到计数器为0,再关闭
负载均衡
com.alipay.sofa.rpc.client.AbstractCluster#select(com.alipay.sofa.rpc.core.request.SofaRequest, java.util.List<com.alipay.sofa.rpc.client.ProviderInfo>)
com.alipay.sofa.rpc.client.AbstractLoadBalancer#select