首先分析hbase中对于master协议的调用:
在ConnectionImplementation的方法getKeepAliveMasterService被调用时,会通过MasterServiceStubMaker.makeStub()方法构建成员变量masterServiceState.stub。
在MasterServiceStubMaker.makeStub()方法中,通过间接调用rpcClient.createBlockingRpcChannel来构建protobuf的中BlockingStub所需要的channel。该channel的实际类型是BlockingRpcChannelImplementation,他实现了BlockingRpcChannel接口的callBlockingMethod。在具体方法中的调用是通过rpcClient.callBlockingMethod来实现的。
AbstractRpcClient.callMethod
该方法由AbstractRpcClient.callBlockingMethod间接调用。
在其内部,首先构造了一个Call对象,实现对发送数据的封装以及返回时对回调函数的调用。
然后构建ConnectionId。
通过getConnection获取一个已存在的或新构建的NettyRpcConnection(默认实现)。
最后调用NettyRpcConnection中的sendRequest。
通过调用HBaseRpcController中的notifyOnCancel实现对内部构建的CancellationCallback.run的调用。如果channel为null,会对netty客户端的重新构建。
然后调用wirte方法实现对信息的发送。
hbase中对于Admin,Client协议的调用方式也都与此类似。
接下来,我们来介绍hbase服务端对于客户端请求的接受:
在NettyRpcServer的初始化过程中,实现了对netty服务端的构建。其中,会将NettyRpcServerRequestDecoder加入到pipeline。
而NettyRpcServerRequestDecoder正是实现对客户端请求的处理。
在NettyRpcServerRequestDecoder的channelRead方法,依次调用了NettyServerRpcConnection.process,ServerRpcConnection.processOneRpc,ServerRpcConnection.processRequest。
在ServerRpcConnection.processRequest方法中,主要构建了一个ServerCall,用于接下来对远程方法的服务端调用。
接下来调用SimpleRpcScheduler.dispatch,将CallRunner.callTask放入相应的队列中。
然后,在RpcExecutor中一开始创建的一批Handler中,会不断地从队列中取值。也就是调用Handler.run。接着回调CallRunner.run,最后调用RpcServer.call实现对
远程方法的本地调用Message result = call.getService().callBlockingMethod(md, controller, param)。
相比hadoop RPC的调用,hbase融入了netty的调用,实现上比hadoop的稍微复杂一点。
本人原创,使用请标明源链接。
小编以后每周会更新hbase的源码信息,大家可以把需要的方向私信我的邮箱15935152719@163.com。