Dubbo编解码系列文章目录
Dubbo编解码(一)-原理
Dubbo编解码(二)-Codec2和AbstractCodec
Dubbo编解码(三)-TransportCodec
Dubbo编解码(五)-Dubbo协议编码器
Dubbo编解码(六)-Dubbo协议解码器
文章目录
ExchangeCodec#decode-解码
最终返回的是Request 或 Response
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 可读字节数
int readable = buffer.readableBytes();
// 创建一个byte数组(已读字节数组),其长度为 头部长度 和 可读字节数 取最小值
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 读取指定字节到header中
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
继续往下看,这里处理 粘包和半包问题
// readable-可读字节数
// header-已读字节数组(尝试读取一个完整头部)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) { // 已读取字节数的第一个和第二个字节不是dubbo协议魔数 -> 没有遇到完整报文
// 已读字节数组的长度
int length = header.length;
// 已读字节数组的长度 < 可读字节数
if (header.length < readable) {
// 创建一个新数组,长度为可读字节数readable,把 已读取的字节 放在新数组中
header = Bytes.copyOf(header, readable);
// 从buffer中读取 (readable - length)长度个字节,放到 已读字节数组中,起始位置是length
// 其实就是将buffer剩余字节 都读到 header中
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
// 判断header[]的第n个和第n+1个字节不是dubbo协议魔数
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
// 将buffer的读索引位置 指向回 dubbo报文开头处
// 此时buffer.readerIndex()就是buffer末尾,buffer.readerIndex() - header.length=0
buffer.readerIndex(buffer.readerIndex() - header.length + i);
// 创建一个新数组,长度为i,截取header中开头 ~ 下一个Dubbo报文位置(i) 之间的数据 放到新数组中
header = Bytes.copyOf(header, i);
break;
}
}
// 解析header数据,例如TelnetCodec#decode
return super.decode(channel, buffer, readable, header);
}
// check length.
// 如果 可读字节的长度readable 小于 协议头部的长度
if (readable < HEADER_LENGTH) {
// 需要更多数据
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
// 从 已读字节数组 中提取 报文体长度
int len = Bytes.bytes2int(header, 12);
// When receiving response, how to exceed the length, then directly construct a response to the client.
// see more detail from https://github.com/apache/dubbo/issues/7021.
Object obj = finishRespWhenOverPayload(channel, len, header);
if (null != obj) {
return obj;
}
// 校验报文体长度
checkPayload(channel, len);
// 总消息大小tt = len + body_length
// 判断此次解码是否可以处理 整个报文,如果 可读取数据readable < tt, 说明数据不够,需要更多数据
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
// 这里面包含ChannelBuffer
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 解码消息体,is流 是 完整的 RPC调用报文
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
DubboCodec#decodeBody
解码 请求报文体
// 1
protected static final int SERIALIZATION_MASK = 0x1f;
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 从header中第3个字节获取 请求标识 + 序列化标识
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
// 从header中获取请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// ...
} else {
// decode request.
// 解码 请求消息体
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
// 双向传输
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
// 心跳事件
req.setEvent(true);
}
try {
Object data;
if (req.isEvent()) {
// TODO getPayload
byte[] eventPayload = CodecSupport.getPayload(is);
// TODO isHeartBeat
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
// 心跳的响应总是为null
data = null;
} else {
// 心跳事件的反序列化
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in, eventPayload);
}
} else {
DecodeableRpcInvocation inv;
// 获取url中decode.in.io参数值,默认为false
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
// 如果url中有decode.in.io=true,则在I/O线程中直接解码
// req在decode方法中只是设置了一下dubbo版本号
// proto-serializationType
inv = new DecodeableRpcInvocation(channel, req, is, proto);
// 接下来看这里,DecodeableRpcInvocation#decode
inv.decode();
} else {
// 交给Dubbo业务线程池解码
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
// 不要漏掉这行
data = inv;
}
// 将 RpcInvocation 作为 request的对象体(body部分)
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
// 最后返回请求消息体
return req;
}
}
解码 响应报文体
和 “解码 请求报文体”差不多
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 从header中第3个字节获取 请求标识 + 序列化标识
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
// 从header中获取请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
// 解码响应消息体
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in, eventPayload);
}
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
// 接下来看这里
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
// DecodeableRpcResult#decode
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// ...
}
}
DecodeableRpcInvocation
成员变量及构造方法
// 通道
private Channel channel;
// 序列化类型
private byte serializationType;
// 包含ChannelBuffer的输入流
private InputStream inputStream;
// 请求对象
private Request request;
// 是否已解码
private volatile boolean hasDecoded;
public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id) {
Assert.notNull(channel, "channel == null");
Assert.notNull(request, "request == null");
Assert.notNull(is, "inputStream == null");
this.channel = channel;
this.request = request;
this.inputStream = is;、
this.serializationType = id;
}
decode
上面的inv.decode();
走到了这里
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
// 主要看这里
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
// input实际上是ChannelBufferInputStream
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
// 根据url获取序列化实现并反序列化(其实就是把input放到ObjectInput中)
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 放入本地缓存
// serialization_id-serializationType
this.put(SERIALIZATION_ID_KEY, serializationType);
// 这里能readUTF的顺序和 DubboCodec#encodeRequestData的顺序是一致的
// 获取dubbo版本号
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
// 设置 dubboVersion 到 隐式参数中
setAttachment(DUBBO_VERSION_KEY, dubboVersion);
// 获取请求服务
String path = in.readUTF();
setAttachment(PATH_KEY, path);
// 获取服务版本号
String version = in.readUTF();
setAttachment(VERSION_KEY, version);
// 获取方法名
setMethodName(in.readUTF());
// 获取参数类型
String desc = in.readUTF();
setParameterTypesDesc(desc);
try {
// 获取系统配置中 是否校验系列化 配置项,默认为false
if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
// 如果配置为true,则对 要请求的服务、本本、序列化类型进行校验
CodecSupport.checkSerialization(path, version, serializationType);
}
// 参数数组
Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
// 参数类型数组
Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
// 如果有参数
if (desc.length() > 0) {
// if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
// pts = ReflectUtils.desc2classArray(desc);
// } else {
// 获取 服务仓库
ServiceRepository repository = ApplicationModel.getServiceRepository();
// 从 服务仓库中 获取 对应服务path的描述
ServiceDescriptor serviceDescriptor = repository.lookupService(path);
if (serviceDescriptor != null) {
// 获取 要调用的方法 的描述
MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
if (methodDescriptor != null) {
// 获取参数类型
pts = methodDescriptor.getParameterClasses();
// 设置方法返回类型
this.setReturnTypes(methodDescriptor.getReturnTypes());
}
}
if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) {
throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
}
pts = ReflectUtils.desc2classArray(desc);
}
// }
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 依次读取方法参数值 TODO 要弄清楚encodeInvocationArgument是怎么写入的
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
// 设置参数类型
setParameterTypes(pts);
// 读取隐式参数
Map<String, Object> map = in.readAttachments();
if (map != null && map.size() > 0) {
Map<String, Object> attachment = getObjectAttachments();
if (attachment == null) {
attachment = new HashMap<>();
}
attachment.putAll(map);
setObjectAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
// 处理异步参数回调 TODO
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
// 设置参数
setArguments(args);
// 目标服务名称
String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
getAttachment(GROUP_KEY),
getAttachment(VERSION_KEY));
setTargetServiceUniqueName(targetServiceName);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
DecodeableRpcResult
成员变量及构造方法
// 通道
private Channel channel;
// 序列化类型
private byte serializationType;
// 包含ChannelBuffer的输入流
private InputStream inputStream;
// 响应对象
private Response response;
// Request.getData()
private Invocation invocation;
// 是否已解码
private volatile boolean hasDecoded;
public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
Assert.notNull(channel, "channel == null");
Assert.notNull(response, "response == null");
Assert.notNull(is, "inputStream == null");
this.channel = channel;
this.response = response;
this.inputStream = is;
this.invocation = invocation;
this.serializationType = id;
}
decode
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
// 校验序列化类型
Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
if (serializationType_obj != null) {
if ((byte) serializationType_obj != serializationType) {
throw new IOException("Unexpected serialization id:" + serializationType + " received from network, please check if the peer send the right id.");
}
}
}
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc result failed: " + e.getMessage(), e);
}
response.setStatus(Response.CLIENT_ERROR);
response.setErrorMessage(StringUtils.toString(e));
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
if (log.isDebugEnabled()) {
Thread thread = Thread.currentThread();
log.debug("Decoding in thread -- [" + thread.getName() + "#" + thread.getId() + "]");
}
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 这个读出来的是encodeResponseData中的out.writeByte写入的字节标识
// 不同标识对应不同处理
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE: // 返回结果标记为NULL
break;
case DubboCodec.RESPONSE_VALUE: // 返回结果标记为有返回值
handleValue(in);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION: // 返回记过标记为异常
// 处理异常
handleException(in);
break;
// 下面三种情况分别是上面3中基础上 + 有隐式参数
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
handleAttachment(in);
break;
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
handleValue(in);
handleAttachment(in);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
handleException(in);
handleAttachment(in);
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
}
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
return this;
}
handleValue
private void handleValue(ObjectInput in) throws IOException {
try {
// 返回类型
Type[] returnTypes;
if (invocation instanceof RpcInvocation) {
returnTypes = ((RpcInvocation) invocation).getReturnTypes();
} else {
returnTypes = RpcUtils.getReturnTypes(invocation);
}
// 返回值
Object value = null;
if (ArrayUtils.isEmpty(returnTypes)) {
// This almost never happens?
value = in.readObject();
} else if (returnTypes.length == 1) {
// DubboCodec中out.writeObject(ret);
value = in.readObject((Class<?>) returnTypes[0]);
} else {
value = in.readObject((Class<?>) returnTypes[0], returnTypes[1]);
}
setValue(value);
} catch (ClassNotFoundException e) {
rethrow(e);
}
}
handleException
private void handleException(ObjectInput in) throws IOException {
try {
// 读取异常信息
setException(in.readThrowable());
} catch (ClassNotFoundException e) {
rethrow(e);
}
}
handleAttachment
private void handleAttachment(ObjectInput in) throws IOException {
try {
// 读取隐式参数
addObjectAttachments(in.readAttachments());
} catch (ClassNotFoundException e) {
rethrow(e);
}
}