问题描述&模拟
线上登录接口,通过监控查看,有类型转换异常,具体报错如下图
此报错信息是dubbo consumer端显示,且登录大部分是正常,有少量部分会报类型转换异常,同事通过更换方法名+显示指定序列化id解决此问题,但是产生这个问题的真正原因是什么呢?没有指定序列化id吗?还是dubbo方法重载问题?为什么服务端不显示此错误信息呢?,下面根据错误模拟下情况。
线上运行情况说明,报错的这台客户端部署在容器内,jdk版本
服务方是混跑,有虚拟机和容器,容器的jdk版本相同,虚拟机jdk版本
一开始认为是由于没有显示指定序列化id导致容器调用虚拟机的服务,由于jvm版本不一致导致的解码问题,但是分析和试验后,发现并非如此,模拟情况如下:
定义一个dubbo服务,方法重载且入参不显示指定序列化id,代码如下
//定义dubbo服务
public interface ProductService {
Result<ProductVO> findProduct(String data);
Result<ProductVO> findProduct(ProductDTO product);
}
//入参
@Data
public class ProductDTO implements Serializable {
//不显示指定序列化id
private Integer productId;
private String sn;
private String code;
}
//出参
@Data
public class ProductVO implements Serializable{
private static final long serialVersionUID = 4529782262922750326L;
private Integer productId;
private String productName;
}
dubbo客户端调用ProductService.findProduct(ProductDTO product)
,并使用jdk1.8.0_202版本,服务方使用jdk1.8.0_73版本,经过试验(jmeter压测),发现并未出现类型转换异常,现在通过代码分析来排除。
分析&dubbo provider处理请求流程
采用逆序方法,使用arthas进行反编译dubbo生成的代理类,ProductService生成的代理类是Wrapper2,内容如下
public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
throws InvocationTargetException {
ProductService productService;
try {
productService = (ProductService) object;
} catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("findProduct".equals(name) && classArray.length == 1
&& classArray[0].getName().equals("java.lang.String")) {
return productService.findProduct((String) objectArray[0]);
}
if ("findProduct".equals(name) && classArray.length == 1
&& classArray[0].getName().equals("org.pangu.dto.ProductDTO")) {
return productService.findProduct((ProductDTO) objectArray[0]);
}
} catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
.append("\" in class org.pangu.api.ProductService.").toString());
}
}
通过查看反编译后的代码,得知dubbo方法重载,会根据方法类型和参数个数找到对应的目标方法执行。对于我这个线上问题,参数是ProductDTO,如果调用的是findProduct(String data),说明classArray[0]即参数类型是String类型,那么参数类型是如何得来的呢?根据自己之前写的dubbo流程分析,查看源码,在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke(Invocation invocation)
,代码内容如下
方法名称+方法类型+方面参数都封装在Invocation内,接着查找Invocation的来源,在DubboProtocol的匿名内部类DubboProtocol$1内发现,具体是reply(ExchangeChannel channel, Object message)
方法内,参数message就是Invocation。
接着看哪里调用DubboProtocol$1.reply(ExchangeChannel channel, Object message)
方法,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest(ExchangeChannel channel, Request req)
方法内,com.alibaba.dubbo.remoting.exchange.Request.getData()
获取此Invocation,即DecodeableRpcInvocation,那么接着看Request 以及Request.mData的来源;
接着向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received(Channel channel, Object message)
的入参message就是Request ;
继续向上找,com.alibaba.dubbo.remoting.transport.DecodeHandler#received(Channel channel, Object message)
的入参就是Request ,其中会对Request.mData即Invocation进行解码(默认在IO线程已经解码过,这里实际并不会再执行解码DecodeableRpcInvocation#hasDecoded=true)。
继续向上找,com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run()
线程,message属性就是Request,那么接着只能找ChannelEventRunnable是如何创建并提交的
继续向上找,在com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received(Channel channel, Object message)
方法内创建ChannelEventRunnable并提交到线程池执行。
继续向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(Channel channel, Object message)
,入参message就是Request
继续向上找,com.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(Channel channel, Object message)
继续向上找,com.alibaba.dubbo.remoting.transport.AbstractPeer.received(Channel ch, Object msg)
继续向上找,com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler.channelRead(ChannelHandlerContext ctx, Object msg)
,看到这个就说明是netty的work线程,NettyServerHandler是个inbound & outbound事件
dubbo service netty启动添加的inbound&outbound即pipeline chain[HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext]
,说明前面肯定有执行InternalDecoder 的channelRead事件。此时入参message就是Request。
下面着重分析InternalDecoder 的channelRead事件,执行堆栈依次为:
InternalDecoder(io.netty.handler.codec.ByteToMessageDecoder).channelRead(ChannelHandlerContext ctx, Object msg)
InternalDecoder(io.netty.handler.codec.ByteToMessageDecoder).callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
DecodeableRpcInvocation.decode()
DecodeableRpcInvocation.decode(Channel channel, InputStream input)
InternalDecoder是netty pipeline的inboud事件,执行的是channelRead,具体逻辑在InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
内,代码如下
接着触发下一个inbound的channelRead动作,传入的就是Request了,代码说明如下
接着看DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
,这里进行解码
//com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode(Channel channel, ChannelBuffer buffer)
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();//获取读位置
MultiMessage result = MultiMessage.create();//MultiMessage是Request的集合
do {
Object obj = codec.decode(channel, buffer);//使用DubboCodec进行解码,下面根据解码结果进行不同处理
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//说明发生了tcp粘包,退出循环
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);//把obj即Request添加到集合MultiMessage
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();//设置新的buffer读位置,继续使用DubboCodec进行解码
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {//如果MultiMessage只有一个元素,则说明本次没有发生粘包
return result.get(0);//返回Request
}
return result;//返回MultiMessage,在后续的MultiMessagehandler内获取Request的集合遍历处理
}
接着看DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)解码过程,如何对dubbo协议解码的,先看下dubbo协议的报文结构
接着看代码,对着报文结构进行解码
//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);//把缓冲区字节存放到header
return decode(channel, buffer, readable, header);
}
//DubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {//非魔数,说明非dubbo报文的开头,说明发生了tcp拆包/粘包
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {//为什么是小于16呢?因为dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)+bodyLenght(4)就是16字节了,小于16字节,肯定发生了拆包,本次接收到的数据并没有body
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
int len = Bytes.bytes2int(header, 12);//12的原因是dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)等于12,从12位后取4位,转换为int,就是body的长度
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
if (readable < tt) {//可读取数少于bodylen+16,说明tcp拆包,需要继续进网络读取
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);//解码body内容
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
接着看解码dubbo body,在com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody
//com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {//是响应,编码
//省略
} else {//请求,解码
// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {//心跳
data = decodeHeartbeatData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
} else if (req.isEvent()) {//事件
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//默认是在netty work线程进行解码
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();//解码dubbo body,解码结果保存在DecodeableRpcInvocation
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);//否则在业务线程ChannelEventRunnable进行解码
}
data = inv;
}
req.setData(data);//把Invocation保存到Request.mData
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
接着看DecodeableRpcInvocation解码dubbo body
//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode()
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);//解码
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;//解码后置位已经解码,这样在ChannelEventRunnable线程内就不会再进行解码
}
}
}
//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);//根据序列化标识获取反序列对象,dubbo spi的自适应
String dubboVersion = in.readUTF();//从输入流读取dubbo version
request.setVersion(dubboVersion);
setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
setAttachment(Constants.PATH_KEY, in.readUTF());//从输入流读path
setAttachment(Constants.VERSION_KEY, in.readUTF());//从输入流读版本
setMethodName(in.readUTF());//从输入流读 调用的目标方法名
try {
Object[] args;
Class<?>[] pts;
String desc = in.readUTF();//从输入流读 参数描述符,即参数的类型 比如[Ljava/lang/String
if (desc.length() == 0) {//dubbo调用方法不存在入参
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {//dubbo调用方法存在入参
pts = ReflectUtils.desc2classArray(desc);//类型描述符转换为类型,比如[Ljava/lang/String => Ljava.lang.String
args = new Object[pts.length];//参数长度
for (int i = 0; i < args.length; i++) {
try {
args[i] = in.readObject(pts[i]);//从输入流读取参数,这里是readObject,执行反序列化
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);//把参数类型保存到Invocation对象,即parameterTypes属性上
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);//从输入流读取隐式参数并解码
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
从解码dubbo body看出,从输入流解码获取调用的目标方法名称、方法类型、方法入参、隐式参数都保存到Invocation对象(即DecodeableRpcInvocation),其中读取入参和隐式参数使用到了序列化解码(需要使用到序列化id),而从输入流获取方法名称+参数类型并没有使用对象的反序列化。
dubbo provider处理接收总结
dubbo prodiver端从网络到dubbo业务线程池调用以及如何解码流程分析完,现在总结下:
dubbo provider接收并处理consumer请求分两步
1.网络通信,在io线程上解码,解码结果保存到Request。
2.IO线程调起dubbo业务线程,传入解码结果Request,通过Invoker调用目标方法,传入要执行目标方法的对象、方法名、参数类型、参数进行调用目标方法。
该问题分析
解决2个问题
问题1:为什么在服务端报错ClassCastException,在服务端没有任何error日志呢?只有在客户端才有error日志
由于在dubbo代理类Wrapper2调用目标方法导致ClassCastException,异常被捕捉封装为InvocationTargetException向上抛,接着在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
内异常被捕捉,封装为RpcResult,继而在ExceptionFilter内异常信息被封装为RuntimeException返回客户端。这中间并没有日志打印,因此不产生error日志,所以服务端看不到。
问题2:dubbo方法重载会导致问题吗?
结论,基本不会,dubbo的动态代理类WrapperX会根据Invocation的methodName+参数类型+参数进行调用目标方法,因此不会。网上有个大佬说dubbo方法重载在某种情况会导致问题,但是他写的语句有些不通顺且凌乱,而且蓝绿是流量隔离的,不会调错,我认为他的举例不合适,感兴趣的可以参考dubbo同名方法的问题及思考。
问题3:是否是未显式指定序列化id导致的呢?
经过前面分析,是由于判断参数类型是String(本来应该是DTO类型),导致执行目标方法时候把参数转换为String导致的异常,参数类型来源于Invocation对象(即RpcInvocation.parameterTypes),而Invocation来源于Request.mData,而Request是网络通信解码得来,其中在com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)
内String desc = in.readUTF();
从输入流读取字节流并解码为参数类型描述符,这个地方并不涉及到对象的序列化和反序列化。
看客户端编码代码InternalEncoder,编码参数类型代码如下图
而客户端发送建立Request是在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
,而Invocation对象是在dubbo调用的入口InvokerInvocationHandler内(new RpcInvocation(method, args)
封装方法名+参数创建Invocation对象,继而参数类型就保存在了Invocation对象。
这样分析得来,不显示指定序列化id并不会导致这个问题。
排除了jdk版本、不显示指定序列化ID等原因,具体是什么原因导致的dubbo方法重载导致调用ClassCastException呢?线上预发环境和生产网络是互通,是否是预发环境同事手工部署的应用只有入参String的方法呢(未和生产同步版本)?同事也记不清了,也无法查,这个问题暂时是无法知道答案了。
据我猜测,问题可能出现是预发环境部署的服务没有和生产版本同步(缺少findProduct(ProductDTOdata)导致),我们预发和生成网络是互通的,应该是生产客户端调用到了预发环境服务,而预发环境部署的此服务没有findProduct(ProductDTOdata)。
为什么需要显示指定序列化id
rpc调用使用的tcp通信,需要把对象转换为二进制流进行发送(编码)和接收(解码),那么就需要有套规则需要把内存中的java对象转换为二进制流,序列化就是做这个事情的。
在使用原生序列化的时候,serialVersionUID起到了一个类似版本号的作用,在反序列化的时候判断serialVersionUID如果不相同,会抛出InvalidClassException。
如果在使用原生序列化方式的时候官方是强烈建议指定一个serialVersionUID的,如果没有指定,在序列化过程中,jvm会自动计算出一个值作为serialVersionUID,由于这种运行时计算serialVersionUID的方式依赖于jvm的实现方式,如果序列化和反序列化的jvm实现方式不一样可能会导致抛出异常InvalidClassException,所以强烈建议指定serialVersionUID。
不显示指定序列化ID实际会导致问题吗?
定义一个dubbo的入参,不显示指定序列化id,客户端运行不变更,服务端入参进行增加或删除字段(类结构发生变化),发现均能正常请求,并非像网上所说的不显示指定序列化id情况下rpc参数类结构变化,并没有导致什么问题,当然我只是在jdk8版本下进行了此测试(当然现在都是jdk8),这样情况下,实际使用过程中,不显示指定序列化id好像也不会影响什么呢。
网上有说法,不显示指定序列化id会导致一种情况出现问题:举个例子:比如该入参没有显示指定序列化id,后面有个需求需要在这个入参增加个字段,而且看没有显示指定序列化id,顺手就增加了个序列化id,这样线上运行的客户端应用由于引用的还是旧jar,新的服务部署上去,就会发送序列化失败(客户端jvm生成的序列化id和服务端显示指定的序列化id不同),好像这种情况是无法避免的。但是我经过测试,不显示指定序列化id情况下 对dubbo参数进行增加字段、删除字段、增加方法等都不会造成反序列化问题(jdk8, dubbo2.6.8下测试),请求均正常。验证结果说明jvm生成序列化id和类的结构没有关系。可以参考别人测试结果,和我测试结果相同。
那么是否就可以大胆的不指定序列化id呢?还是建议不要,鬼知道jvm生成序列化id的实现方式呢,不指定万一线上哪天出现幺蛾子。
验证了半天,得到一个不指定序列化id也没关系的实际验证结论,但是又不敢完全放心大胆不显示指定序列化id,抓狂。。。
最终结论
根据实际验证(jdk8, dubbo2.6.8下测试),不显示指定序列化id时,dubbo的传输对象在增加字段、删除字段、增加方法等都不会造成反序列化问题,但是还是强烈建议显示指定序列化id,万一jvm生成序列化id不兼容了呢
结尾
分析了这么长,最终也没找到这个问题的产生原因,但是对dubbo的通信层又加深了理解,下面一篇记录下总结的dubbo通信层