SOFARPC源码解析系列:
6.源码分析---和dubbo相比SOFARPC是如何实现负载均衡的?
8.源码分析---从设计模式中看SOFARPC中的EventBus?
大家在看链路追踪之前可以先去看看官方的这篇文章SOFARPC 链路追踪剖析,有助于理解。
我先把官方的一个图拿过来,这张图表示的是整个RPC调用框架的一个事件发送的先后顺序。
图中分别对应的事件是:
- ClientStartInvokeEvent
- ClientBeforeSendEvent
- ClientAfterSendEvent
- ServerReceiveEvent
- ServerSendEvent
- ServerEndHandleEvent
- ClientSyncReceiveEvent
- ClientAsyncReceiveEvent
- ClientEndInvokeEvent
其中因为ClientAfterSendEvent、ClientSyncReceiveEvent和ClientAsyncReceiveEvent以及ClientEndInvokeEvent有重叠,所以就删除了。
接下来我们把订阅者的onEvent方法写下来,看看是怎么处理这些事件的。
SofaTracerSubscriber#onEvent
public void onEvent(Event originEvent) {
if (!Tracers.isEnable()) {
return;
}
Class eventClass = originEvent.getClass();
if (eventClass == ClientStartInvokeEvent.class) {
ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
Tracers.startRpc(event.getRequest());
}
else if (eventClass == ClientBeforeSendEvent.class) {
ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
Tracers.clientBeforeSend(event.getRequest());
}
else if (eventClass == ClientAsyncReceiveEvent.class) {
ClientAsyncReceiveEvent event = (ClientAsyncReceiveEvent) originEvent;
// 拿出tracer信息 让入Tracer自己的上下文
Tracers.clientAsyncReceivedPrepare();
// 记录收到返回
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}
else if (eventClass == ClientEndInvokeEvent.class) {
ClientEndInvokeEvent event = (ClientEndInvokeEvent) originEvent;
if (!event.getRequest().isAsync()) {
// 因为同步调用重试行为,需要放到最后才能算 received
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}
// 检查下状态
Tracers.checkState();
}
else if (eventClass == ServerReceiveEvent.class) {
ServerReceiveEvent event = (ServerReceiveEvent) originEvent;
// 接到请求
Tracers.serverReceived(event.getRequest());
}
else if (eventClass == ServerSendEvent.class) {
// 发送响应
ServerSendEvent event = (ServerSendEvent) originEvent;
Tracers.serverSend(event.getRequest(), event.getResponse(), event.getThrowable());
}
else if (eventClass == ServerEndHandleEvent.class) {
// 检查下状态
Tracers.checkState();
}
}
所以接下面按照各个事件顺序来讲
ClientStartInvokeEvent
我们先来看看发送ClientStartInvokeEvent的地方:
我们在分析3. 源码分析---SOFARPC客户端服务调用的时候提到了在客服端引用的时候会进入到ClientProxyInvoker#invoke中,然后在调用服务端cluster的invoke方法之前会产生开始调用事件。
事件发送前的处理
ClientProxyInvoker#invoke
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
Throwable throwable = null;
try {
RpcInternalContext.pushContext();
RpcInternalContext context = RpcInternalContext.getContext();
context.setProviderSide(false);
// 包装请求
decorateRequest(request);
try {
// 产生开始调用事件
if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
EventBus.post(new ClientStartInvokeEvent(request));
}
// 得到结果
response = cluster.invoke(request);
} catch (SofaRpcException e) {
throwable = e;
throw e;
} finally {
// 产生调用结束事件
if (!request.isAsync()) {
if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
}
}
}
// 包装响应
decorateResponse(response);
return response;
} finally {
RpcInternalContext.removeContext();
RpcInternalContext.popContext();
}
}
由上面的代码中可以看出在调用产生开始调用事件的方法之前会调用DefaultClientProxyInvoker#decorateRequest方法封装request的参数。
protected void decorateRequest(SofaRequest request) {
// 公共的设置
super.decorateRequest(request);
// 缓存是为了加快速度
request.setTargetServiceUniqueName(serviceName);
request.setSerializeType(serializeType == null ? 0 : serializeType);
if (!consumerConfig.isGeneric()) {
// 找到调用类型, generic的时候类型在filter里进行判断
request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName()));
}
RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
RpcInternalContext internalContext = RpcInternalContext.getContext();
if (invokeCtx != null) {
// 如果用户设置了调用级别回调函数
SofaResponseCallback responseCallback = invokeCtx.getResponseCallback();
if (responseCallback != null) {
request.setSofaResponseCallback(responseCallback);
invokeCtx.setResponseCallback(null); // 一次性用完
invokeCtx.put(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN,
isSendableResponseCallback(responseCallback));
}
// 如果用户设置了调用级别超时时间
Integer timeout = invokeCtx.getTimeout();
if (timeout != null) {
request.setTimeout(timeout);
invokeCtx.setTimeout(null);// 一次性用完
}
// 如果用户指定了调用的URL
String targetURL = invokeCtx.getTargetURL();
if (targetURL != null) {
internalContext.setAttachment(HIDDEN_KEY_PINPOINT, targetURL);
invokeCtx.setTargetURL(null);// 一次性用完
}
// 如果用户指定了透传数据
if (RpcInvokeContext.isBaggageEnable()) {
// 需要透传
BaggageResolver.carryWithRequest(invokeCtx, request);
internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
}
}
if (RpcInternalContext.isAttachmentEnable()) {
internalContext.setAttachment(INTERNAL_KEY_APP_NAME, consumerConfig.getAppName());
internalContext.setAttachment(INTERNAL_KEY_PROTOCOL_NAME, consumerConfig.getProtocol());
}
// 额外属性通过HEAD传递给服务端
request.addRequestProp(RemotingConstants.HEAD_APP_NAME, consumerConfig.getAppName());
request.addRequestProp(RemotingConstants.HEAD_PROTOCOL, consumerConfig.getProtocol());
}
decorateRequest方法里面我们只需要关注setTargetServiceUniqueName,INTERNAL_KEY_APP_NAME和INTERNAL_KEY_PROTOCOL_NAME与ClientStartInvokeEvent事件处理有关。这里会设置appName和Protocol设置到internalContext中。
事件的处理
对应ClientStartInvokeEvent事件的处理逻辑是:
if (eventClass == ClientStartInvokeEvent.class) {
ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
Tracers.startRpc(event.getRequest());
}
//Tracers#startRpc
public static void startRpc(SofaRequest request) {
if (openTrace) {
try {
//目前只有一个子类RpcSofaTracer
tracer.startRpc(request);
} catch (Exception e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("", e);
}
}
}
}
我们先看一下RpcSofaTracer的构造器是怎样的。
public RpcSofaTracer() {
//构造 client 的日志打印实例
SpanEncoder<SofaTracerSpan> clientEncoder = getClientSpanEncoder();
SofaTracerStatisticReporter clientStats = generateClientStatReporter(RpcTracerLogEnum.RPC_CLIENT_STAT);
//默认构造的是 DiskReporterImpl 实例
Reporter clientReporter = generateReporter(clientStats, RpcTracerLogEnum.RPC_CLIENT_DIGEST, clientEncoder);
//构造 server 的日志打印实例
SpanEncoder<SofaTracerSpan> serverEncoder = getServerSpanEncoder();
SofaTracerStatisticReporter serverStats = generateServerStatReporter(RpcTracerLogEnum.RPC_SERVER_STAT);
//默认构造的是 DiskReporterImpl 实例
Reporter serverReporter = generateReporter(serverStats, RpcTracerLogEnum.RPC_SERVER_DIGEST, serverEncoder);
//构造 RPC 的 tracer 实例
//设置tracer type是RPC_TRACER
sofaTracer = new SofaTracer.Builder(RPC_TRACER_TYPE)
//为sofaTracer设置客户端Reporter和服务端Reporter
.withClientReporter(clientReporter).withServerReporter(serverReporter)
//new sofaTracer实例
.build();
}
RpcSofaTracer构造器中主要是构造一个sofaTracer实例,并为其设置tracer type为RPC_TRACER,以及设置设置客户端Reporter和服务端Reporter,两个reporter都是DiskReporterImpl实例,即都是落盘的,默认保存7天。
接下来我们进入到startRpc方法中
RpcSofaTracer#startRpc
public void startRpc(SofaRequest request) {
//客户端的启动
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
SofaTracerSpan serverSpan = sofaTraceContext.pop();
//新建一个clientSpan
//buildSpan 设置operationName
SofaTracerSpan clientSpan = (SofaTracerSpan) this.sofaTracer.buildSpan(request.getInterfaceName())
//用来设置references参数
.asChildOf(serverSpan)
//设置span.kind 为client
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
//创建span
.start();
//如果允许线程上下文携带自定义参数
if (RpcInternalContext.isAttachmentEnable()) {
RpcInternalContext context = RpcInternalContext.getContext();
//设置local.app
clientSpan
.setTag(RpcSpanTags.LOCAL_APP, (String) context.getAttachment(RpcConstants.INTERNAL_KEY_APP_NAME));
//设置protocol
clientSpan.setTag(RpcSpanTags.PROTOCOL,
(String) context.getAttachment(RpcConstants.INTERNAL_KEY_PROTOCOL_NAME));
SofaTracerSpanContext spanContext = clientSpan.getSofaTracerSpanContext();
//设置trace_id 和 span_id
if (spanContext != null) {
context.setAttachment(RpcConstants.INTERNAL_KEY_TRACE_ID, spanContext.getTraceId());
context.setAttachment(RpcConstants.INTERNAL_KEY_SPAN_ID, spanContext.getSpanId());
}
}
//设置service,method,和threadName
clientSpan.setTag(RpcSpanTags.SERVICE, request.getTargetServiceUniqueName());
clientSpan.setTag(RpcSpanTags.METHOD, request.getMethodName());
clientSpan.setTag(RpcSpanTags.CURRENT_THREAD_NAME, Thread.currentThread().getName());
//需要主动缓存自己的 serverSpan,原因是:asChildOf 关注的是 spanContext
clientSpan.setParentSofaTracerSpan(serverSpan);
//push
sofaTraceContext.push(clientSpan);
}
startRpc方法里面主要做了那么几件事:
- 创建一个SofaTracerSpan实例
- 为新建的clientSpan实例设值
- 把clientSpan推入到sofaTraceContext中
我们先来看一下创建SofaTracerSpan实例这段代码:
//this.sofaTracer实例是构造器里面创建SofaTracer实例
SofaTracerSpan clientSpan = (SofaTracerSpan)
this.sofaTracer.buildSpan(request.getInterfaceName())
//用来设置references参数
.asChildOf(serverSpan)
//设置span.kind 为client
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
//创建span
.start();
SofaTracer#buildSpan方法:
public SpanBuilder buildSpan(String operationName) {
return new SofaTracerSpanBuilder(operationName);
}
这段代码主要是返回一个SofaTracerSpanBuilder实例。
SofaTracerSpanBuilder是SofaTracer内部类,用于在 Tracer 内部构建 Span。
SofaTracerSpanBuilder#asChildOf
public Tracer.SpanBuilder asChildOf(Span parentSpan) {
if (parentSpan == null) {
return this;
}
return addReference(References.CHILD_OF, parentSpan.context());
}
由于serverSpan是null,所以asChildOf这个方法直接返回,并不往下走。
SofaTracerSpanBuilder#withTag
public Tracer.SpanBuilder withTag(String key, String value) {
this.tags.put(key, value);
return this;
}
withTag方法主要是给SofaTracerSpanBuilder的变量赋值。
最后调用start方法,创建sofaTracerSpan实例:
SofaTracerSpanBuilder#start
public Span start() {
SofaTracerSpanContext sofaTracerSpanContext = null;
//由于asChildOf中直接返回,所以references为空
if (this.references != null && this.references.size() > 0) {
//存在父上下文
sofaTracerSpanContext = this.createChildContext();
} else {
//从新开始新的节点
sofaTracerSpanContext = this.createRootSpanContext();
}
//默认为-1,所以begin = 当前时间
long begin = this.startTime > 0 ? this.startTime : System.currentTimeMillis();
//operationName 为被调用的接口名
SofaTracerSpan sofaTracerSpan = new SofaTracerSpan(SofaTracer.this, begin,
this.references, this.operationName, sofaTracerSpanContext, this.tags);
return sofaTracerSpan;
}
我们再看看createRootSpanContext是怎么创建sofaTracerSpanContext的:
SofaTracerSpanBuilder#createRootSpanContext
private SofaTracerSpanContext createRootSpanContext() {
//生成 traceId
String traceId = TraceIdGenerator.generate();
//默认不采样
boolean isSampled = false;
//sampler为空
if (sampler != null) {
SamplingStatus samplingStatus = sampler.sample(this.operationName, traceId);
if (samplingStatus.isSampled()) {
isSampled = true;
//发生采样后,将相关属性记录
this.tags.putAll(samplingStatus.getTags());
}
}
//traceId为上面生成的随机数,spanId为ROOT_SPAN_ID默认为“0“
return new SofaTracerSpanContext(traceId, ROOT_SPAN_ID, StringUtils.EMPTY_STRING,
isSampled);
}
到这里为止SofaTracerSpan创建完毕。
小结
startRpc主要是创建SofaTracerSpan实例clientSpan到SofaTraceContext中,并为clientSpan设置各种参数:
LOCAL_APP对应的是客户端config里设置的APP_NAME
PROTOCOL是客户端客户端config里设置的PROTOCOL
INTERNAL_KEY_TRACE_ID是在创建clientSpan的时候会创建SofaTracerSpanContext,并创建一个随机的TRACE_ID
INTERNAL_KEY_SPAN_ID是SofaTracerSpanContext中的SPAN_ID,默认是0
SERVICE
METHOD
CURRENT_THREAD_NAME
ClientBeforeSendEvent
ClientBeforeSendEvent事件的调用过程如下:
ConsumerInvoker.invoke->AbstractCluster.doSendMsg->BoltClientTransport#send->BoltClientTransport.beforeSend
客户端首先会在调用过滤器链的时候调用ConsumerTracerFilter#invoke
ConsumerTracerFilter#invoke
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
SofaTracerSpan clientSpan = sofaTraceContext.getCurrentSpan();
clientSpan.setTag(RpcSpanTags.INVOKE_TYPE, request.getInvokeType());
RpcInternalContext context = RpcInternalContext.getContext();
clientSpan.setTag(RpcSpanTags.ROUTE_RECORD,
(String) context.getAttachment(RpcConstants.INTERNAL_KEY_ROUTER_RECORD));
ProviderInfo providerInfo = context.getProviderInfo();
if (providerInfo != null) {
clientSpan.setTag(RpcSpanTags.REMOTE_APP, providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME));
clientSpan.setTag(RpcSpanTags.REMOTE_IP, providerInfo.getHost() + ":" + providerInfo.getPort());
}
return invoker.invoke(request);
// 因为异步的场景,所以received不写在这里
}
这个过滤器会拿到clientSpan并为其设值。
然后会发送ClientBeforeSendEvent事件。
ClientBeforeSendEvent的处理逻辑是
SofaTracerSubscriber#onEvent
if (eventClass == ClientBeforeSendEvent.class) {
ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
Tracers.clientBeforeSend(event.getRequest());
}
最后会跳到RpcSofaTracer#clientBeforeSend
RpcSofaTracer#clientBeforeSend
public void clientBeforeSend(SofaRequest request) {
//客户端的启动
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
//获取并不弹出
SofaTracerSpan clientSpan = sofaTraceContext.getCurrentSpan();
if (clientSpan == null) {
SelfLog.warn("ClientSpan is null.Before call interface=" + request.getInterfaceName() + ",method=" +
request.getMethodName());
return;
}
SofaTracerSpanContext sofaTracerSpanContext = clientSpan.getSofaTracerSpanContext();
//获取 RPC 上下文
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();
ProviderInfo providerInfo;
if ((providerInfo = rpcInternalContext.getProviderInfo()) != null &&
providerInfo.getRpcVersion() >= 50100) { // 版本>5.1.0
//新调用新:缓存在 Request 中
String serializedSpanContext = sofaTracerSpanContext.serializeSpanContext();
request.addRequestProp(RemotingConstants.NEW_RPC_TRACE_NAME, serializedSpanContext);
} else {
//新调用老
Map<String, String> oldTracerContext = new HashMap<String, String>();
oldTracerContext.put(TracerCompatibleConstants.TRACE_ID_KEY, sofaTracerSpanContext.getTraceId());
oldTracerContext.put(TracerCompatibleConstants.RPC_ID_KEY, sofaTracerSpanContext.getSpanId());
// 将采样标记解析并传递
oldTracerContext.put(TracerCompatibleConstants.SAMPLING_MARK,
String.valueOf(sofaTracerSpanContext.isSampled()));
//业务
oldTracerContext.put(TracerCompatibleConstants.PEN_ATTRS_KEY,
sofaTracerSpanContext.getBizSerializedBaggage());
//系统
oldTracerContext.put(TracerCompatibleConstants.PEN_SYS_ATTRS_KEY,
sofaTracerSpanContext.getSysSerializedBaggage());
Map<String, Object> attachments = rpcInternalContext.getAttachments();
oldTracerContext.put(TracerCompatibleConstants.CALLER_APP_KEY,
getEmptyStringIfNull(attachments, RpcSpanTags.REMOTE_APP));
oldTracerContext.put(TracerCompatibleConstants.CALLER_ZONE_KEY,
getEmptyStringIfNull(attachments, RpcSpanTags.REMOTE_ZONE));
oldTracerContext.put(TracerCompatibleConstants.CALLER_IDC_KEY,
getEmptyStringIfNull(attachments, RpcSpanTags.REMOTE_IDC));
oldTracerContext.put(TracerCompatibleConstants.CALLER_IP_KEY,
getEmptyStringIfNull(attachments, RpcSpanTags.REMOTE_IP));
request.addRequestProp(RemotingConstants.RPC_TRACE_NAME, oldTracerContext);
}
// 异步callback同步
if (request.isAsync()) {
//异步,这个时候除了缓存spanContext clientBeforeSendRequest() rpc 已经调用
//还需要这个时候需要还原回父 span
//弹出;不弹出的话当前线程就会一直是client了
clientSpan = sofaTraceContext.pop();
if (clientSpan != null) {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
//将当前 span 缓存在 request 中,注意:这个只是缓存不需要序列化到服务端
rpcInternalContext.setAttachment(RpcConstants.INTERNAL_KEY_TRACER_SPAN, clientSpan);
//这个时候ParentSofaTracerSpan为null,所以如果是异步的sofaTraceContext里面目前就没有span了
if (clientSpan != null && clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
} else {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
}
这个方法里面主要做以下几件事:
- 根据provider的版本设置不同的参数
- 如果是异步调用的话,需要把clientSpan弹出,然后设置进入rpcInternalContext的INTERNAL_KEY_TRACER_SPAN中
这里为什么是异步调用的话,要把clientSpan弹出,然后设置进入rpcInternalContext呢?
因为clientSpan实际上是存储在ThreadLocal,异步线程是无法拿到主线程里面ThreadLocal的数据的,所以需要把clientSpan设置到rpcInternalContext中。
ServerReceiveEvent
ServerReceiveEvent对应的事件处理如下:
if (eventClass == ServerReceiveEvent.class) {
ServerReceiveEvent event = (ServerReceiveEvent) originEvent;
// 接到请求
Tracers.serverReceived(event.getRequest());
}
最后会通过Tracers调用到RpcSofaTracer#serverReceived
RpcSofaTracer#serverReceived
public void serverReceived(SofaRequest request) {
Map<String, String> tags = new HashMap<String, String>();
//server tags 必须设置
tags.put(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER);
//根据调用端去拿参数
String spanStrs = (String) request.getRequestProp(RemotingConstants.NEW_RPC_TRACE_NAME);
SofaTracerSpanContext spanContext = null;
//如果为空,说明是老的版本
if (StringUtils.isBlank(spanStrs)) {
//老
Object oldInstanceMap = request.getRequestProp(RemotingConstants.RPC_TRACE_NAME);
//根据oldInstanceMap给tags设值,并创建SofaTracerSpanContext实例
spanContext = this.saveSpanContextAndTags(tags, oldInstanceMap);
} else {
//新
spanContext = SofaTracerSpanContext.deserializeFromString(spanStrs);
}
if (spanContext == null) {
SelfLog.error("SpanContext created error when server received and root SpanContext created.");
//创建一个SofaTracerSpanContext实例
spanContext = SofaTracerSpanContext.rootStart();
}
SofaTracerSpan serverSpan = new SofaTracerSpan(this.sofaTracer, System.currentTimeMillis(),
request.getInterfaceName()
, spanContext, tags);
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
// Record server receive event
serverSpan.log(LogData.SERVER_RECV_EVENT_VALUE);
//放到线程上下文
sofaTraceContext.push(serverSpan);
//rpc 上下文
if (RpcInternalContext.isAttachmentEnable()) {
RpcInternalContext context = RpcInternalContext.getContext();
context.setAttachment(RpcConstants.INTERNAL_KEY_TRACE_ID, spanContext.getTraceId());
context.setAttachment(RpcConstants.INTERNAL_KEY_SPAN_ID, spanContext.getSpanId());
}
}
这个方法里面主要是根据客户端发送来的请求创建一个SofaTracerSpan实例,然后放入到sofaTraceContext中。
ServerSendEvent
ServerSendEvent事件调用前,服务端会调用过滤器链,其中会调用一个ProviderTracerFilter的过滤器。
ProviderTracerFilter#invoke
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaTracerSpan serverSpan = null;
try {
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
serverSpan = sofaTraceContext.getCurrentSpan();
if (serverSpan != null) {
RpcInternalContext context = RpcInternalContext.getContext();
serverSpan.setTag(RpcSpanTags.SERVICE, request.getTargetServiceUniqueName());
serverSpan.setTag(RpcSpanTags.METHOD, request.getMethodName());
serverSpan.setTag(RpcSpanTags.REMOTE_IP, context.getRemoteHostName()); // 客户端地址
// 从请求里获取ConsumerTracerFilter额外传递的信息
serverSpan.setTag(RpcSpanTags.REMOTE_APP, (String) request.getRequestProp(HEAD_APP_NAME));
serverSpan.setTag(RpcSpanTags.PROTOCOL, (String) request.getRequestProp(HEAD_PROTOCOL));
serverSpan.setTag(RpcSpanTags.INVOKE_TYPE, (String) request.getRequestProp(HEAD_INVOKE_TYPE));
ProviderConfig providerConfig = (ProviderConfig) invoker.getConfig();
serverSpan.setTag(RpcSpanTags.LOCAL_APP, providerConfig.getAppName());
//rpc线程池等待时间
serverSpan.setTag(RpcSpanTags.SERVER_THREAD_POOL_WAIT_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME));
}
return invoker.invoke(request);
} finally {
if (serverSpan != null) {
//业务处理耗时
serverSpan.setTag(RpcSpanTags.SERVER_BIZ_TIME,
(Number) RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE));
}
}
}
在ProviderTracerFilter过滤器里面会把很多从request获取的信息传入到serverSpan中。其中rpc线程池等待时间是在调用BoltServerProcessor#handleRequest方法的时候设置;业务处理耗时是在ProviderInvoker#invoke方法里面反射调用完service后计算。
然后会在发送完response给客户端后向总线发送一个ServerSendEvent事件。
ServerSendEvent在订阅者中的处理过程如下:
if (eventClass == ServerSendEvent.class) {
// 发送响应
ServerSendEvent event = (ServerSendEvent) originEvent;
Tracers.serverSend(event.getRequest(), event.getResponse(), event.getThrowable());
}
最终会调用到RpcSofaTracer#serverSend这个方法中:
RpcSofaTracer#serverSend
public void serverSend(SofaRequest request, SofaResponse response, Throwable exception) {
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
SofaTracerSpan serverSpan = sofaTraceContext.pop();
if (serverSpan == null) {
return;
}
// Record server send event
serverSpan.log(LogData.SERVER_SEND_EVENT_VALUE);
RpcInternalContext context = RpcInternalContext.getContext();
//响应序列化时间
serverSpan.setTag(RpcSpanTags.RESP_SERIALIZE_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SERIALIZE_TIME));
//请求反序列化时间
serverSpan.setTag(RpcSpanTags.REQ_DESERIALIZE_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_DESERIALIZE_TIME));
//响应序列化大小
serverSpan.setTag(RpcSpanTags.RESP_SIZE, (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE));
//请求反序列化大小
serverSpan.setTag(RpcSpanTags.REQ_SIZE, (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE));
//当前线程名
serverSpan.setTag(RpcSpanTags.CURRENT_THREAD_NAME, Thread.currentThread().getName());
Throwable throwableShow = exception;
String tracerErrorCode = StringUtils.EMPTY;
String errorSourceApp = StringUtils.EMPTY;
String resultCode = StringUtils.EMPTY;
if (throwableShow != null) {
//当前即服务端应用
errorSourceApp = serverSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
// 结果码(00=成功/01=业务异常/02=RPC逻辑错误)
// 不会业务异常
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_UNKNOWN_ERROR;
} else if (response != null) {
// 判断是否是业务异常
if (response.isError()) {
errorSourceApp = serverSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_UNKNOWN_ERROR;
//改变打印的 throwable
throwableShow = new SofaRpcException(RpcErrorType.SERVER_UNDECLARED_ERROR, response.getErrorMsg());
} else {
Object ret = response.getAppResponse();
if (ret instanceof Throwable) {
throwableShow = (Throwable) ret;
errorSourceApp = serverSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
// 业务异常
resultCode = TracerResultCode.RPC_RESULT_BIZ_FAILED;
tracerErrorCode = TracerResultCode.RPC_RESULT_BIZ_FAILED;
} else {
resultCode = TracerResultCode.RPC_RESULT_SUCCESS;
}
}
}
if (throwableShow != null) {
// 打印错误
// result code
Map<String, String> errorContext = new HashMap<String, String>();
//记录的上下文信息
this.generateServerErrorContext(errorContext, request, serverSpan);
//report
serverSpan.reportError(tracerErrorCode, errorContext, throwableShow,
errorSourceApp, ERROR_SOURCE);
}
// 结果码(00=成功/01=业务异常/02=RPC逻辑错误)
serverSpan.setTag(RpcSpanTags.RESULT_CODE, resultCode);
//异步打印日志落盘
serverSpan.finish();
}
这个方法里面会最后统计一下数据,然后调用serverSpan#finish落盘数据。
ServerEndHandleEvent
服务端会在最后处理完响应数据后发送ServerEndHandleEvent事件到总线。
ServerEndHandleEvent所对应的处理:
if (eventClass == ServerEndHandleEvent.class) {
// 检查下状态
Tracers.checkState();
}
最后会调用RpcSofaTracer#checkState
public void checkState() {
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();
//tracer 上下文
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
....
if (rpcInternalContext.isProviderSide()) {
//服务端 tracer 堆栈中应该为 0 个
if (sofaTraceContext.getThreadLocalSpanSize() > 0) {
SelfLog.error("Pay attention,stack size error.Tracer provider stack size more than zero.");
SelfLog.flush();
}
}
}
//SofaTracerThreadLocalTraceContext#getThreadLocalSpanSize
public int getThreadLocalSpanSize() {
SofaTracerSpan sofaTracerSpan = threadLocal.get();
return sofaTracerSpan == null ? 0 : 1;
}
由于在ServerSendEvent事件中,已经将SofaTracerSpan弹出,所以ThreadLocalSpanSize应该为0,否则报错。
ClientAsyncReceiveEvent
ClientAsyncReceiveEvent的处理过程:
if (eventClass == ClientAsyncReceiveEvent.class) {
ClientAsyncReceiveEvent event = (ClientAsyncReceiveEvent) originEvent;
// 拿出tracer信息 让入Tracer自己的上下文
Tracers.clientAsyncReceivedPrepare();
// 记录收到返回
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}
Tracers#clientAsyncReceivedPrepare最后会走到RpcSofaTracer#clientAsyncReceivedPrepare中。
RpcSofaTracer#clientAsyncReceivedPrepare
public void clientAsyncReceivedPrepare() {
//新的线程
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();
SofaTracerSpan clientSpan = (SofaTracerSpan)
rpcInternalContext.getAttachment(RpcConstants.INTERNAL_KEY_TRACER_SPAN);
if (clientSpan == null) {
return;
}
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
sofaTraceContext.push(clientSpan);
}
由于ClientAsyncReceiveEvent事件是异步处理的,sofaTraceContext是threadLocal存储数据的,所以需要从Attachment中得到SofaTracerSpan,然后放入到sofaTraceContext中。
然后通过Tracers#clientReceived会调用到RpcSofaTracer#clientReceived中。
RpcSofaTracer#clientReceived
public void clientReceived(SofaRequest request, SofaResponse response, Throwable exceptionThrow) {
//客户端的启动
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
SofaTracerSpan clientSpan = sofaTraceContext.pop();
if (clientSpan == null) {
return;
}
// Record client receive event
clientSpan.log(LogData.CLIENT_RECV_EVENT_VALUE);
//rpc 上下文
RpcInternalContext context = null;
if (RpcInternalContext.isAttachmentEnable()) {
context = RpcInternalContext.getContext();
//记录路由方式
if (!clientSpan.getTagsWithStr().containsKey(RpcSpanTags.ROUTE_RECORD)) {
clientSpan.setTag(RpcSpanTags.ROUTE_RECORD,
(String) context.getAttachment(RpcConstants.INTERNAL_KEY_ROUTER_RECORD));
}
//客户端请求序列化时间
clientSpan.setTag(RpcSpanTags.REQ_SERIALIZE_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SERIALIZE_TIME));
//客户端接收响应反序列化时间
clientSpan.setTag(RpcSpanTags.RESP_DESERIALIZE_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_DESERIALIZE_TIME));
//响应大小
clientSpan.setTag(RpcSpanTags.RESP_SIZE,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE));
//请求大小
clientSpan.setTag(RpcSpanTags.REQ_SIZE, (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE));
//客户端创建长连接时间
clientSpan.setTag(RpcSpanTags.CLIENT_CONN_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_CONN_CREATE_TIME));
//客户端总耗时,如果是同步调用则AbstractCluster#doSendMsg中调用,异步则在callback调用
Long ce = (Long) context.getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE);
if (ce != null) {
clientSpan.setTag(RpcSpanTags.CLIENT_ELAPSE_TIME, ce);
}
InetSocketAddress address = context.getLocalAddress();
if (address != null) {
clientSpan.setTag(RpcSpanTags.LOCAL_IP, NetUtils.toIpString(address));
clientSpan.setTag(RpcSpanTags.LOCAL_PORT, address.getPort());
}
}
Throwable throwableShow = exceptionThrow;
// 区分出各个异常信息
String resultCode = StringUtils.EMPTY;
//当前应用或者目标应用
String errorSourceApp = StringUtils.EMPTY;
String tracerErrorCode = StringUtils.EMPTY;
if (throwableShow != null) {
// 客户端异常
if (throwableShow instanceof SofaRpcException) {
SofaRpcException exception = (SofaRpcException) throwableShow;
//摘要打印
int errorType = exception.getErrorType();
switch (errorType) {
case RpcErrorType.CLIENT_TIMEOUT:
resultCode = TracerResultCode.RPC_RESULT_TIMEOUT_FAILED;
//filter 已经存放
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_TIMEOUT_ERROR;
break;
case RpcErrorType.CLIENT_ROUTER:
resultCode = TracerResultCode.RPC_RESULT_ROUTE_FAILED;
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_ADDRESS_ROUTE_ERROR;
break;
case RpcErrorType.CLIENT_SERIALIZE:
case RpcErrorType.CLIENT_DESERIALIZE:
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_SERIALIZE_ERROR;
break;
default:
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
errorSourceApp = ExceptionUtils.isServerException(exception) ?
clientSpan.getTagsWithStr().get(RpcSpanTags.REMOTE_APP) : clientSpan.getTagsWithStr().get(
RpcSpanTags.LOCAL_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_UNKNOWN_ERROR;
break;
}
} else {
// 这里是客户端的未知异常,目前不会走到这里
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.LOCAL_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_UNKNOWN_ERROR;
}
} else if (response != null) {
// 服务端rpc异常
if (response.isError()) {
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.REMOTE_APP);
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_UNKNOWN_ERROR;
resultCode = TracerResultCode.RPC_RESULT_RPC_FAILED;
//客户端服务端均打印
throwableShow = new SofaRpcException(RpcErrorType.SERVER_UNDECLARED_ERROR, response.getErrorMsg());
} else {
Object ret = response.getAppResponse();
//for server throw exception ,but this class can not be found in current
if (ret instanceof Throwable ||
"true".equals(response.getResponseProp(RemotingConstants.HEAD_RESPONSE_ERROR))) {
errorSourceApp = clientSpan.getTagsWithStr().get(RpcSpanTags.REMOTE_APP);
// 业务异常
resultCode = TracerResultCode.RPC_RESULT_BIZ_FAILED;
tracerErrorCode = TracerResultCode.RPC_ERROR_TYPE_BIZ_ERROR;
} else {
resultCode = TracerResultCode.RPC_RESULT_SUCCESS;
}
}
}
if (throwableShow != null) {
Map<String, String> contextMap = new HashMap<String, String>();
this.generateClientErrorContext(contextMap, request, clientSpan);
clientSpan.reportError(tracerErrorCode, contextMap,
throwableShow,
errorSourceApp,
ERROR_SOURCE);
}
clientSpan.setTag(RpcSpanTags.RESULT_CODE, resultCode);
//finish client
clientSpan.finish();
if (context != null) {
context.setAttachment(RpcConstants.INTERNAL_KEY_RESULT_CODE, resultCode);
}
//client span
if (clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
}
这个方法里面就是对客户端统计的信息进行收集,存放到clientSpan中,然后调用clientSpan#finish进行异步落盘。
ClientEndInvokeEvent
最后会到ClientEndInvokeEvent事件中:
if (eventClass == ClientEndInvokeEvent.class) {
ClientEndInvokeEvent event = (ClientEndInvokeEvent) originEvent;
if (!event.getRequest().isAsync()) {
// 因为同步调用重试行为,需要放到最后才能算 received
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}
// 检查下状态
Tracers.checkState();
}
因为同步事件不会发送ClientAsyncReceiveEvent事件到总线,所以需要在ClientEndInvokeEvent事件总进行一下判断,然后调用clientReceived方法进行统计数据。
最后会调用checkState方法检查一下状态。
RpcSofaTracer#checkState
public void checkState() {
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();
//tracer 上下文
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
if (rpcInternalContext.isConsumerSide()) {
//TODO 这个校验没有意义,sofaTraceContext.getThreadLocalSpanSize只能返回0或1
//客户端 tracer 堆栈中最多有 1 个(客户端 span 完毕,服务端 span 压栈所以最多一个)
if (sofaTraceContext.getThreadLocalSpanSize() > 1) {
SelfLog.error("Pay attention,stack size error.Tracer consumer stack size more than one.");
SelfLog.flush();
}
}
}
// SofaTracerThreadLocalTraceContext#getThreadLocalSpanSize
public int getThreadLocalSpanSize() {
SofaTracerSpan sofaTracerSpan = threadLocal.get();
return sofaTracerSpan == null ? 0 : 1;
}
在这个方法里面主要是做一下校验,sofaTracerSpan的size最多只能有一个。