提示:阅读本文前最好先阅读:
- 《Spark2.1.0之内置RPC框架》
- 《spark2.1.0之源码分析——RPC配置TransportConf》
- 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
- 《spark2.1.0之源码分析——RPC服务器TransportServer》
- 《spark2.1.0之源码分析——RPC管道初始化》
TransportChannelHandler实现了Netty的ChannelInboundHandler[1],以便对Netty管道中的消息进行处理。《spark2.1.0之源码分析——RPC管道初始化》一文所展示的图1中的Handler(除了MessageEncoder)由于都实现了ChannelInboundHandler接口,作为自定义的ChannelInboundHandler,因而都要重写channelRead方法。Netty框架使用工作链模式来对每个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法见代码清单1。
代码清单1 TransportChannelHandler的channelRead实现
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
从代码清单1看到,当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。
MessageHandler的继承体系
TransportRequestHandler与TransportResponseHandler都继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义见代码清单2。
代码清单2 MessageHandler规范
public abstract class MessageHandler<T extends Message> {
public abstract void handle(T message) throws Exception;
public abstract void channelActive();
public abstract void exceptionCaught(Throwable cause);
public abstract void channelInactive();
}
MessageHandler中定义的各个方法的作用分别为:
- handle:用于对接收到的单个消息进行处理;
- channelActive:当channel激活时调用;
- exceptionCaught:当捕获到channel发生异常时调用;
- channelInactive:当channel非激活时调用;
Spark中MessageHandler类的继承体系如图1所示。
Message的继承体系
根据代码清单2,我们知道MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义见代码清单3。
代码清单3 Message的定义
public interface Message extends Encodable {
Type type();
ManagedBuffer body();
boolean isBodyInFrame();
Message中定义的三个接口方法的作用分别为:
- type:返回消息的类型;
- body:返回消息中可选的内容体;
- isBodyInFrame:用于判断消息的主体是否包含在消息的同一帧中。
Message接口继承了Encodable接口,Encodable的定义见代码清单4。
代码清单4 Encodable的定义
public interface Encodable {
int encodedLength();
void encode(ByteBuf buf);
}
实现Encodable接口的类将可以转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf,所以这里的encodedLength用于返回转换的对象数量。下面一起来看看Message的类继承体系,如图2所示。
从图2看到最终的消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有四种,分别是:
- ChunkFetchRequest:请求获取流的单个块的序列。
- RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
- OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是不需要服务端向客户端回复。
- StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。
由于OneWayMessage 不需要响应,所以ResponseMessage的对于成功或失败状态的实现各有三种,分别是:
- ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息;
- ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息;
- RpcResponse:处理RpcRequest成功后返回的消息;
- RpcFailure:处理RpcRequest失败后返回的消息;
- StreamResponse:处理StreamRequest成功后返回的消息;
- StreamFailure:处理StreamRequest失败后返回的消息;
ManagedBuffer的继承体系
回头再看看代码清单3中对body接口的定义,可以看到其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是说ManagedBuffer并不存储数据,也不是数据的实际来源,这同关系型数据库的视图类似)。我们先来看看抽象类ManagedBuffer中对行为的定义,见代码清单5。
代码清单5 ManagedBuffer的定义
public abstract class ManagedBuffer {
public abstract long size();
public abstract ByteBuffer nioByteBuffer() throws IOException;
public abstract InputStream createInputStream() throws IOException;
public abstract ManagedBuffer retain();
public abstract ManagedBuffer release();
public abstract Object convertToNetty() throws IOException;
}
ManagedBuffer中定义了六个方法,分别为:
- size:返回数据的字节数。
- nioByteBuffer:将数据按照Nio的ByteBuffer类型返回。
- createInputStream:将数据按照InputStream返回。
- retain:当有新的使用者使用此视图时,增加引用此视图的引用数。
- release:当有使用者不再使用此视图时,减少引用此视图的引用数。当引用数为0时释放缓冲区。
- convertToNetty:将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion。
ManagedBuffer的具体实现有很多,我们可以通过图3来了解。
图3中列出了ManagedBuffer的五个实现类,其中TestManagedBuffer和RecordingManagedBuffer用于测试。NettyManagedBuffer中的缓冲为io.netty.buffer.ByteBuf,NioManagedBuffer中的缓冲为java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的实现都非常简单,留给读者自行阅读。本节挑选FileSegmentManagedBuffer作为ManagedBuffer具体实现的例子进行介绍。
FileSegmentManagedBuffer的作用为获取一个文件中的一段,它一共有四个由final修饰的属性,全部都通过FileSegmentManagedBuffer的构造器传入属性值,这四个属性为:
- conf:即TransportConf。
- file:所要读取的文件。
- offset:所要读取文件的偏移量。
- length:所要读取的长度。
下面将逐个介绍FileSegmentManagedBuffer对于ManagedBuffer的实现。
NIO方式读取文件
FileSegmentManagedBuffer实现的nioByteBuffer方法见代码清单6。
代码清单6 nioByteBuffer方法的实现
@Override
public ByteBuffer nioByteBuffer() throws IOException {
FileChannel channel = null;
try {
channel = new RandomAccessFile(file, "r").getChannel();
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException(String.format("Reached EOF before filling buffer\n" +
"offset=%s\nfile=%s\nbuf.remaining=%s",
offset, file.getAbsoluteFile(), buf.remaining()));
}
}
buf.flip();
return buf;
} else {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
} catch (IOException e) {
try {
if (channel != null) {
long size = channel.size();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
}
throw new IOException("Error in opening " + this, e);
} finally {
JavaUtils.closeQuietly(channel);
}
}
nioByteBuffer的实现还是很简单的,主要利用RandomAccessFile获取FileChannel,然后使用java.nio.ByteBuffer和FileChannel的API将数据写入缓冲区java.nio.ByteBuffer中。
文件流方式读取文件
FileSegmentManagedBuffer实现的createInputStream方法见代码清单7。
代码清单7 createInputStream的实现
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
try {
if (is != null) {
long size = file.length();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
} finally {
JavaUtils.closeQuietly(is);
}
throw new IOException("Error in opening " + this, e);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(is);
throw e;
}
}
createInputStream的实现还是很简单的,这里不多作介绍。
将数据转换为Netty对象
FileSegmentManagedBuffer实现的convertToNetty方法见代码清单8。
代码清单8 convertToNetty的实现
@Override
public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}
其他方法的实现
其他方法由于实现非常简单,所以这里就不一一列出了,感兴趣的读者可以自行查阅。
[1] ChannelInboundHandler接口的实现及原理不属于本书要分析的内容,感兴趣的同学可以阅读Netty的官方文档或者研究Netty的源码。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下: