1 核心概念
1.1 span
span是链路跟踪的基本单元,记录了每个节点的跟踪信息,每经过一个interceptor都会去创建span,通过span把跟踪信息串联起来。整个跟踪流程类似一个有向无环图:
span包括三种类型:
EntrySpan:在同一个TraceSegment中,EntrySpan只有一个,作为链路的根节点。EntrySpan代表服务提供方的链路节点,比如Tomcat的入口,在TomcatInvokeInterceptor中会创建EntrySpan。
ExitSpan:代表服务消费方的链路节点,比如Feign、HTTP Client等。在远程调用服务之前,会拦截Feign(或其他rpc组件)调用并封装ExitSpan链路节点信息(其实在Feign调用前、调用后以及异常时都会对span进行相应的处理,比如记录日志、关闭span等)。
LocalSpan:对于像本地方法这种的跟踪,用LocalSpan来记录链路节点信息。
span的两种应用类型:
CHILD_OF:在同一个TraceSegment中,父span依赖子span,因此用CHILD_OF来标识span之间的关系。
FOLLOWS_OF:在跨进程的MQ、多线程下异步/批处理任务时,父span并不依赖子span的结果,所以用FOLLOWS_OF来描述span之间的关系。
1.2 TraceSegment
由于分布式链路追踪是跨多进程、多线程的,因此把链路分割成了一个一个的小片段,每个线程里只有一个TraceSegment。traceId、segment、span三者之间的关系图如下:
图中的ref表示引用,即依赖关系。
1.3 TracingContext
链路追踪核心控制器,保存了一些关键信息,比如segment、active spans等。创建span以及传播contextCarrier。
1.4 ContextCarrier
在跨进程、快线程的追踪中,需要将上游的信息传播到下游,比如上游的segment、endpoint、service instance、address等信息,ContextCarrier提供了这样的能力。
2 跨服务间的链路追踪流程图
基于SpringBoot+SpringCloud+Feign
每进入到一个服务之前会被TomcatInvokeInterceptor拦截,在这里会去解析请求里携带的carrier信息,如果有carrier会将carrier里的跟踪信息与当前服务的跟踪进行关联,比如上游的segment会作为当前服务的父segment。
如果当前服务需要调用远程服务,在Feign调用之前会被DefaultHttpClientInterceptor拦截,此处会将当前的跟踪信息封装进carrier里,然后将carrier存入请求头里。
如果segment里的所有span都完成了,该segment会被存入到队列里,由一个消费者去负责把segment里的信息推送到服务端。
3 源码剖析
3.1 数据埋点
以TomcatInvokeInterceptor举例,其他插件可根据分析的步骤自行阅读源码。与spring中的拦截器类似,有方法 执行前、方法执行后、抛异常三个拦截动作。
3.1.1 方法执行前拦截
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Request request = (Request) allArguments[0];
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
//1)解析请求头里携带的carrier信息
while (next.hasNext()) {
next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
//2)创建span
AbstractSpan span = ContextManager.createEntrySpan(request.getRequestURI(), contextCarrier);
//3)设置标签,如:url->http://www.baidu.com;request.method->GET
Tags.URL.set(span, request.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, request.getMethod());
//4)设置被拦截的组件,此处为Tomcat
span.setComponent(ComponentsDefine.TOMCAT);
//5)设置被拦截的层级,此处为HTTP层
SpanLayer.asHttp(span);
if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {
collectHttpParam(request, span);
}
}
1)解析请求头里携带的carrier信息
CarrierItem有三种:
SW8CarrierItem:解析请求体里封装的carrier信息;
SW8CorrelationCarrierItem:解析用户自定义的数据,以K->V的形式存储;
SW8ExtensionCarrierItem:解析skipAnalysis,该字段表示是否跳过所有span,即是否需要下架对当前服务的跟踪。
2)创建span
public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
AbstractSpan span;
AbstractTracerContext context;
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
//如果有carrier并且合法
if (carrier != null && carrier.isValid()) {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
//TracerContext有两种,一种是TracingContext,另一种是IgnoredTracerContext,后者创建的span是NoopSpan,该span没有任何操作行为。
//此处强制创建TracingContext
samplingService.forceSampled();
context = getOrCreate(operationName, true);
span = context.createEntrySpan(operationName);
//建立当前segment与上游segment的关联关系
context.extract(carrier);
}
//如果没有直接通过TracingContext创建span
else {
context = getOrCreate(operationName, false);
span = context.createEntrySpan(operationName);
}
return span;
}
3.1.2 方法执行后拦截
此处会调用ContextManager.stopSpan()来停止span,而改方法最终会调用TracingContext里的stopSpan方法,我们主要看这个方法
public boolean stopSpan(AbstractSpan span) {
//获取当前span
AbstractSpan lastSpan = peek();
if (lastSpan == span) {
if (lastSpan instanceof AbstractTracingSpan) {
AbstractTracingSpan toFinishSpan = (AbstractTracingSpan) lastSpan;
//将当前span归档,存入segment里
if (toFinishSpan.finish(segment)) {
//将当前span在活动span的列表里移除
pop();
}
} else {
//将当前span在活动span的列表里移除
pop();
}
} else {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
//在该方法里会去判断当前segment下的所有span是否都已完成,如果是会将segment放入队列,该队列的数据会被消费者消费并上传至服务端。
finish();
return activeSpanStack.isEmpty();
}
3.1.3 抛异常
此处比较简单,只将异常日志记录到span里,日志的数据模型是string key和a string value的键值对。
3.2 数据推送
所有的消费者服务都继承BootService,在启动类SkyWalkingAgent里会调用ServiceManager.INSTANCE.boot()来启动所有的消费者服务。
本文主要讲的是应用程序的追踪监控,也就是上面提到的TraceSegment追踪,所以下面主要分析TraceSegmentServiceClient,至于对Jvm指标数据的采集不在本文介绍。
启动程序
public void boot() {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
//创建demon线程,用于消费队列中的segment,并将当前实例作为消费者传入,在线程获取到数据时会调用当前实例的consume方法,参见下面的代码段。
carrier.consume(this, 1);
}
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
//通过GRPC调用远程服务端接口上传数据
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class)
.receiveCommand(commands);
}
});
try {
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
}