Dubbo编解码(六)-Dubbo协议解码器

Dubbo编解码系列文章目录

Dubbo编解码(一)-原理
Dubbo编解码(二)-Codec2和AbstractCodec
Dubbo编解码(三)-TransportCodec
Dubbo编解码(五)-Dubbo协议编码器
Dubbo编解码(六)-Dubbo协议解码器


文章目录

ExchangeCodec#decode-解码

最终返回的是Request 或 Response

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
	// 可读字节数
	int readable = buffer.readableBytes();
	// 创建一个byte数组(已读字节数组),其长度为 头部长度 和 可读字节数 取最小值
	byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
	// 读取指定字节到header中
	buffer.readBytes(header);
	return decode(channel, buffer, readable, header);
}

继续往下看,这里处理 粘包和半包问题

// readable-可读字节数
// header-已读字节数组(尝试读取一个完整头部)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
	// check magic number.
	if (readable > 0 && header[0] != MAGIC_HIGH
		|| readable > 1 && header[1] != MAGIC_LOW) { // 已读取字节数的第一个和第二个字节不是dubbo协议魔数 -> 没有遇到完整报文
		// 已读字节数组的长度
		int length = header.length;
		// 已读字节数组的长度 < 可读字节数
		if (header.length < readable) {
			// 创建一个新数组,长度为可读字节数readable,把 已读取的字节 放在新数组中
			header = Bytes.copyOf(header, readable);
			// 从buffer中读取 (readable - length)长度个字节,放到 已读字节数组中,起始位置是length
			// 其实就是将buffer剩余字节 都读到 header中
			buffer.readBytes(header, length, readable - length);
		}
		for (int i = 1; i < header.length - 1; i++) {
			// 判断header[]的第n个和第n+1个字节不是dubbo协议魔数
			if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
				// 将buffer的读索引位置 指向回 dubbo报文开头处
				// 此时buffer.readerIndex()就是buffer末尾,buffer.readerIndex() - header.length=0
				buffer.readerIndex(buffer.readerIndex() - header.length + i);
				// 创建一个新数组,长度为i,截取header中开头 ~ 下一个Dubbo报文位置(i) 之间的数据 放到新数组中
				header = Bytes.copyOf(header, i);
				break;
			}
		}
		// 解析header数据,例如TelnetCodec#decode
		return super.decode(channel, buffer, readable, header);
	}
	// check length.
	// 如果 可读字节的长度readable 小于 协议头部的长度
	if (readable < HEADER_LENGTH) {
		// 需要更多数据
		return DecodeResult.NEED_MORE_INPUT;
	}

	// get data length.
	// 从 已读字节数组 中提取 报文体长度
	int len = Bytes.bytes2int(header, 12);

	// When receiving response, how to exceed the length, then directly construct a response to the client.
	// see more detail from https://github.com/apache/dubbo/issues/7021.
	Object obj = finishRespWhenOverPayload(channel, len, header);
	if (null != obj) {
		return obj;
	}

	// 校验报文体长度
	checkPayload(channel, len);

	// 总消息大小tt = len + body_length
	// 判断此次解码是否可以处理 整个报文,如果 可读取数据readable < tt, 说明数据不够,需要更多数据
	int tt = len + HEADER_LENGTH;
	if (readable < tt) {
		return DecodeResult.NEED_MORE_INPUT;
	}

	// limit input stream.
	// 这里面包含ChannelBuffer
	ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

	try {
		// 解码消息体,is流 是 完整的 RPC调用报文
		return decodeBody(channel, is, header);
	} finally {
		if (is.available() > 0) {
			try {
				if (logger.isWarnEnabled()) {
					logger.warn("Skip input stream " + is.available());
				}
				StreamUtils.skipUnusedStream(is);
			} catch (IOException e) {
				logger.warn(e.getMessage(), e);
			}
		}
	}
}

DubboCodec#decodeBody

解码 请求报文体

// 1
protected static final int SERIALIZATION_MASK = 0x1f;

@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
	// 从header中第3个字节获取 请求标识 + 序列化标识
	byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
	// get request id.
	// 从header中获取请求id
	long id = Bytes.bytes2long(header, 4);
	if ((flag & FLAG_REQUEST) == 0) {
		// ...
	} else {
		// decode request.
		// 解码 请求消息体
		Request req = new Request(id);
		req.setVersion(Version.getProtocolVersion());
		// 双向传输
		req.setTwoWay((flag & FLAG_TWOWAY) != 0);
		if ((flag & FLAG_EVENT) != 0) {
			// 心跳事件
			req.setEvent(true);
		}
		try {
			Object data;
			if (req.isEvent()) {
				// TODO getPayload
				byte[] eventPayload = CodecSupport.getPayload(is);
				// TODO isHeartBeat
				if (CodecSupport.isHeartBeat(eventPayload, proto)) {
					// heart beat response data is always null;
					// 心跳的响应总是为null
					data = null;
				} else {
					// 心跳事件的反序列化
					ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
					data = decodeEventData(channel, in, eventPayload);
				}
			} else {
				DecodeableRpcInvocation inv;
				// 获取url中decode.in.io参数值,默认为false
				if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
					// 如果url中有decode.in.io=true,则在I/O线程中直接解码
					// req在decode方法中只是设置了一下dubbo版本号
					// proto-serializationType
					inv = new DecodeableRpcInvocation(channel, req, is, proto);
					// 接下来看这里,DecodeableRpcInvocation#decode
					inv.decode();
				} else {
					// 交给Dubbo业务线程池解码
					inv = new DecodeableRpcInvocation(channel, req,
						new UnsafeByteArrayInputStream(readMessageData(is)), proto);
				}
				// 不要漏掉这行
				data = inv;
			}
			// 将 RpcInvocation 作为 request的对象体(body部分)
			req.setData(data);
		} catch (Throwable t) {
			if (log.isWarnEnabled()) {
				log.warn("Decode request failed: " + t.getMessage(), t);
			}
			// bad request
			req.setBroken(true);
			req.setData(t);
		}

		// 最后返回请求消息体
		return req;
	}
}

解码 响应报文体

和 “解码 请求报文体”差不多

@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
	// 从header中第3个字节获取 请求标识 + 序列化标识
	byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
	// get request id.
	// 从header中获取请求id
	long id = Bytes.bytes2long(header, 4);
	if ((flag & FLAG_REQUEST) == 0) {
		// decode response.
		// 解码响应消息体
		Response res = new Response(id);
		if ((flag & FLAG_EVENT) != 0) {
			res.setEvent(true);
		}
		// get status.
		byte status = header[3];
		res.setStatus(status);
		try {
			if (status == Response.OK) {
				Object data;
				if (res.isEvent()) {
					byte[] eventPayload = CodecSupport.getPayload(is);
					if (CodecSupport.isHeartBeat(eventPayload, proto)) {
						// heart beat response data is always null;
						data = null;
					} else {
						ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
						data = decodeEventData(channel, in, eventPayload);
					}
				} else {
					DecodeableRpcResult result;
					if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
						// 接下来看这里
						result = new DecodeableRpcResult(channel, res, is,
							(Invocation) getRequestData(id), proto);
						// DecodeableRpcResult#decode
						result.decode();
					} else {
						result = new DecodeableRpcResult(channel, res,
						new UnsafeByteArrayInputStream(readMessageData(is)),
							(Invocation) getRequestData(id), proto);
					}
					data = result;
				}
				res.setResult(data);
			} else {
				ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
				res.setErrorMessage(in.readUTF());
			}
		} catch (Throwable t) {
			if (log.isWarnEnabled()) {
				log.warn("Decode response failed: " + t.getMessage(), t);
			}
			res.setStatus(Response.CLIENT_ERROR);
			res.setErrorMessage(StringUtils.toString(t));
		}
		return res;
	} else {
		// ...
	}
}	

DecodeableRpcInvocation

成员变量及构造方法

// 通道
private Channel channel;

// 序列化类型
private byte serializationType;

// 包含ChannelBuffer的输入流
private InputStream inputStream;

// 请求对象
private Request request;

// 是否已解码
private volatile boolean hasDecoded;

public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id) {
	Assert.notNull(channel, "channel == null");
	Assert.notNull(request, "request == null");
	Assert.notNull(is, "inputStream == null");
	this.channel = channel;
	this.request = request;
	this.inputStream = is;、
	this.serializationType = id;
}

decode

上面的inv.decode();走到了这里

@Override
public void decode() throws Exception {
	if (!hasDecoded && channel != null && inputStream != null) {
		try {
			// 主要看这里
			decode(channel, inputStream);
		} catch (Throwable e) {
			if (log.isWarnEnabled()) {
				log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
			}
			request.setBroken(true);
			request.setData(e);
		} finally {
			hasDecoded = true;
		}
	}
}

// input实际上是ChannelBufferInputStream
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
	// 根据url获取序列化实现并反序列化(其实就是把input放到ObjectInput中)
	ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
		.deserialize(channel.getUrl(), input);
	// 放入本地缓存
	// serialization_id-serializationType
	this.put(SERIALIZATION_ID_KEY, serializationType);

	// 这里能readUTF的顺序和 DubboCodec#encodeRequestData的顺序是一致的
	// 获取dubbo版本号
	String dubboVersion = in.readUTF();
	request.setVersion(dubboVersion);
	// 设置 dubboVersion 到 隐式参数中
	setAttachment(DUBBO_VERSION_KEY, dubboVersion);

	// 获取请求服务
	String path = in.readUTF();
	setAttachment(PATH_KEY, path);
	// 获取服务版本号
	String version = in.readUTF();
	setAttachment(VERSION_KEY, version);

	// 获取方法名
	setMethodName(in.readUTF());

	// 获取参数类型
	String desc = in.readUTF();
	setParameterTypesDesc(desc);

	try {
		// 获取系统配置中 是否校验系列化 配置项,默认为false
		if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
			// 如果配置为true,则对 要请求的服务、本本、序列化类型进行校验
			CodecSupport.checkSerialization(path, version, serializationType);
		}
		// 参数数组
		Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
		// 参数类型数组
		Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
		// 如果有参数
		if (desc.length() > 0) {
//                if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
//                    pts = ReflectUtils.desc2classArray(desc);
//                } else {
			// 获取 服务仓库
			ServiceRepository repository = ApplicationModel.getServiceRepository();
			// 从 服务仓库中 获取 对应服务path的描述
			ServiceDescriptor serviceDescriptor = repository.lookupService(path);
			if (serviceDescriptor != null) {
				// 获取 要调用的方法 的描述
				MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
				if (methodDescriptor != null) {
					// 获取参数类型
					pts = methodDescriptor.getParameterClasses();
					// 设置方法返回类型
                        this.setReturnTypes(methodDescriptor.getReturnTypes());
				}
			}
			if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
				if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) {
					throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
				}
				pts = ReflectUtils.desc2classArray(desc);
			}
//                }

			args = new Object[pts.length];
			for (int i = 0; i < args.length; i++) {
				try {
					// 依次读取方法参数值 TODO 要弄清楚encodeInvocationArgument是怎么写入的
					args[i] = in.readObject(pts[i]);
				} catch (Exception e) {
					if (log.isWarnEnabled()) {
						log.warn("Decode argument failed: " + e.getMessage(), e);
					}
				}
			}
		}
		// 设置参数类型
		setParameterTypes(pts);

		// 读取隐式参数
		Map<String, Object> map = in.readAttachments();
		if (map != null && map.size() > 0) {
			Map<String, Object> attachment = getObjectAttachments();
			if (attachment == null) {
				attachment = new HashMap<>();
			}
			attachment.putAll(map);
			setObjectAttachments(attachment);
		}

		//decode argument ,may be callback
		for (int i = 0; i < args.length; i++) {
			// 处理异步参数回调 TODO
			args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
		}

		// 设置参数
		setArguments(args);
		// 目标服务名称
		String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
				getAttachment(GROUP_KEY),
				getAttachment(VERSION_KEY));
		setTargetServiceUniqueName(targetServiceName);
	} catch (ClassNotFoundException e) {
		throw new IOException(StringUtils.toString("Read invocation data failed.", e));
	} finally {
		if (in instanceof Cleanable) {
			((Cleanable) in).cleanup();
		}
	}
	return this;
}

DecodeableRpcResult

成员变量及构造方法

// 通道
private Channel channel;

// 序列化类型
private byte serializationType;

// 包含ChannelBuffer的输入流
private InputStream inputStream;

// 响应对象
private Response response;

// Request.getData()
private Invocation invocation;

// 是否已解码
private volatile boolean hasDecoded;

public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
	Assert.notNull(channel, "channel == null");
	Assert.notNull(response, "response == null");
	Assert.notNull(is, "inputStream == null");
	this.channel = channel;
	this.response = response;
	this.inputStream = is;
	this.invocation = invocation;
	this.serializationType = id;
}

decode

@Override
public void decode() throws Exception {
	if (!hasDecoded && channel != null && inputStream != null) {
		try {
			if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
				// 校验序列化类型
				Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
				if (serializationType_obj != null) {
					if ((byte) serializationType_obj != serializationType) {
						throw new IOException("Unexpected serialization id:" + serializationType + " received from network, please check if the peer send the right id.");
					}
				}
			}
			decode(channel, inputStream);
		} catch (Throwable e) {
			if (log.isWarnEnabled()) {
				log.warn("Decode rpc result failed: " + e.getMessage(), e);
			}
			response.setStatus(Response.CLIENT_ERROR);
			response.setErrorMessage(StringUtils.toString(e));
		} finally {
			hasDecoded = true;
		}
	}
}

@Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        if (log.isDebugEnabled()) {
            Thread thread = Thread.currentThread();
            log.debug("Decoding in thread -- [" + thread.getName() + "#" + thread.getId() + "]");
        }

        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 这个读出来的是encodeResponseData中的out.writeByte写入的字节标识
        // 不同标识对应不同处理
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE: // 返回结果标记为NULL
                break;
            case DubboCodec.RESPONSE_VALUE: // 返回结果标记为有返回值
                handleValue(in);
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION: // 返回记过标记为异常
                // 处理异常
                handleException(in);
                break;
             // 下面三种情况分别是上面3中基础上 + 有隐式参数
            case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                handleAttachment(in);
                break;
            case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                handleValue(in);
                handleAttachment(in);
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                handleException(in);
                handleAttachment(in);
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
        }
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
        return this;
    }

handleValue

private void handleValue(ObjectInput in) throws IOException {
	try {
		// 返回类型
		Type[] returnTypes;
		if (invocation instanceof RpcInvocation) {
			returnTypes = ((RpcInvocation) invocation).getReturnTypes();
		} else {
			returnTypes = RpcUtils.getReturnTypes(invocation);
		}
		// 返回值
		Object value = null;
		if (ArrayUtils.isEmpty(returnTypes)) {
			// This almost never happens?
			value = in.readObject();
		} else if (returnTypes.length == 1) {
			// DubboCodec中out.writeObject(ret);
			value = in.readObject((Class<?>) returnTypes[0]);
		} else {
			value = in.readObject((Class<?>) returnTypes[0], returnTypes[1]);
		}
		setValue(value);
	} catch (ClassNotFoundException e) {
		rethrow(e);
	}
}

handleException

private void handleException(ObjectInput in) throws IOException {
	try {
		// 读取异常信息
		setException(in.readThrowable());
	} catch (ClassNotFoundException e) {
		rethrow(e);
	}
}

handleAttachment

private void handleAttachment(ObjectInput in) throws IOException {
	try {
		// 读取隐式参数
		addObjectAttachments(in.readAttachments());
	} catch (ClassNotFoundException e) {
		rethrow(e);
	}
}
上一篇:8.2.1---Python入门之邮箱登录查询


下一篇:UIButton的titleLabe setAttributeSting 首次不起作用