云原生监控系列—源码级剖析SkyWalking(应用层数据采集)

1 核心概念

1.1 span

span是链路跟踪的基本单元,记录了每个节点的跟踪信息,每经过一个interceptor都会去创建span,通过span把跟踪信息串联起来。整个跟踪流程类似一个有向无环图:
云原生监控系列—源码级剖析SkyWalking(应用层数据采集)
span包括三种类型:
EntrySpan:在同一个TraceSegment中,EntrySpan只有一个,作为链路的根节点。EntrySpan代表服务提供方的链路节点,比如Tomcat的入口,在TomcatInvokeInterceptor中会创建EntrySpan。
ExitSpan:代表服务消费方的链路节点,比如Feign、HTTP Client等。在远程调用服务之前,会拦截Feign(或其他rpc组件)调用并封装ExitSpan链路节点信息(其实在Feign调用前、调用后以及异常时都会对span进行相应的处理,比如记录日志、关闭span等)。
LocalSpan:对于像本地方法这种的跟踪,用LocalSpan来记录链路节点信息。
span的两种应用类型:
CHILD_OF:在同一个TraceSegment中,父span依赖子span,因此用CHILD_OF来标识span之间的关系。
FOLLOWS_OF:在跨进程的MQ、多线程下异步/批处理任务时,父span并不依赖子span的结果,所以用FOLLOWS_OF来描述span之间的关系。

1.2 TraceSegment

由于分布式链路追踪是跨多进程、多线程的,因此把链路分割成了一个一个的小片段,每个线程里只有一个TraceSegment。traceId、segment、span三者之间的关系图如下:
云原生监控系列—源码级剖析SkyWalking(应用层数据采集)
图中的ref表示引用,即依赖关系。

1.3 TracingContext

链路追踪核心控制器,保存了一些关键信息,比如segment、active spans等。创建span以及传播contextCarrier。

1.4 ContextCarrier

在跨进程、快线程的追踪中,需要将上游的信息传播到下游,比如上游的segment、endpoint、service instance、address等信息,ContextCarrier提供了这样的能力。

2 跨服务间的链路追踪流程图

基于SpringBoot+SpringCloud+Feign
云原生监控系列—源码级剖析SkyWalking(应用层数据采集)
每进入到一个服务之前会被TomcatInvokeInterceptor拦截,在这里会去解析请求里携带的carrier信息,如果有carrier会将carrier里的跟踪信息与当前服务的跟踪进行关联,比如上游的segment会作为当前服务的父segment。
如果当前服务需要调用远程服务,在Feign调用之前会被DefaultHttpClientInterceptor拦截,此处会将当前的跟踪信息封装进carrier里,然后将carrier存入请求头里。
如果segment里的所有span都完成了,该segment会被存入到队列里,由一个消费者去负责把segment里的信息推送到服务端。

3 源码剖析

3.1 数据埋点

以TomcatInvokeInterceptor举例,其他插件可根据分析的步骤自行阅读源码。与spring中的拦截器类似,有方法 执行前、方法执行后、抛异常三个拦截动作。

3.1.1 方法执行前拦截

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    Request request = (Request) allArguments[0];
    ContextCarrier contextCarrier = new ContextCarrier();

    CarrierItem next = contextCarrier.items();
    //1)解析请求头里携带的carrier信息
    while (next.hasNext()) {
        next = next.next();
        next.setHeadValue(request.getHeader(next.getHeadKey()));
    }

    //2)创建span
    AbstractSpan span = ContextManager.createEntrySpan(request.getRequestURI(), contextCarrier);
    //3)设置标签,如:url->http://www.baidu.com;request.method->GET
    Tags.URL.set(span, request.getRequestURL().toString());
    Tags.HTTP.METHOD.set(span, request.getMethod());
    //4)设置被拦截的组件,此处为Tomcat
    span.setComponent(ComponentsDefine.TOMCAT);
    //5)设置被拦截的层级,此处为HTTP层
    SpanLayer.asHttp(span);

    if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {
        collectHttpParam(request, span);
    }
}

1)解析请求头里携带的carrier信息
CarrierItem有三种:
SW8CarrierItem:解析请求体里封装的carrier信息;
SW8CorrelationCarrierItem:解析用户自定义的数据,以K->V的形式存储;
SW8ExtensionCarrierItem:解析skipAnalysis,该字段表示是否跳过所有span,即是否需要下架对当前服务的跟踪。
2)创建span

public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
    AbstractSpan span;
    AbstractTracerContext context;
    operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
    //如果有carrier并且合法
    if (carrier != null && carrier.isValid()) {
        SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
        //TracerContext有两种,一种是TracingContext,另一种是IgnoredTracerContext,后者创建的span是NoopSpan,该span没有任何操作行为。
        //此处强制创建TracingContext
        samplingService.forceSampled();
        context = getOrCreate(operationName, true);
        span = context.createEntrySpan(operationName);
        //建立当前segment与上游segment的关联关系
        context.extract(carrier);
    } 
    //如果没有直接通过TracingContext创建span
    else {
        context = getOrCreate(operationName, false);
        span = context.createEntrySpan(operationName);
    }
    return span;
}

3.1.2 方法执行后拦截

此处会调用ContextManager.stopSpan()来停止span,而改方法最终会调用TracingContext里的stopSpan方法,我们主要看这个方法

public boolean stopSpan(AbstractSpan span) {
    //获取当前span
    AbstractSpan lastSpan = peek();
    if (lastSpan == span) {
        if (lastSpan instanceof AbstractTracingSpan) {
            AbstractTracingSpan toFinishSpan = (AbstractTracingSpan) lastSpan;
            //将当前span归档,存入segment里
            if (toFinishSpan.finish(segment)) {
                //将当前span在活动span的列表里移除
                pop();
            }
        } else {
            //将当前span在活动span的列表里移除
            pop();
        }
    } else {
        throw new IllegalStateException("Stopping the unexpected span = " + span);
    }
	//在该方法里会去判断当前segment下的所有span是否都已完成,如果是会将segment放入队列,该队列的数据会被消费者消费并上传至服务端。
    finish();

    return activeSpanStack.isEmpty();
}

3.1.3 抛异常

此处比较简单,只将异常日志记录到span里,日志的数据模型是string key和a string value的键值对。

3.2 数据推送

所有的消费者服务都继承BootService,在启动类SkyWalkingAgent里会调用ServiceManager.INSTANCE.boot()来启动所有的消费者服务。
本文主要讲的是应用程序的追踪监控,也就是上面提到的TraceSegment追踪,所以下面主要分析TraceSegmentServiceClient,至于对Jvm指标数据的采集不在本文介绍。
启动程序

public void boot() {
    lastLogTime = System.currentTimeMillis();
    segmentUplinkedCounter = 0;
    segmentAbandonedCounter = 0;
    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
    carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
    //创建demon线程,用于消费队列中的segment,并将当前实例作为消费者传入,在线程获取到数据时会调用当前实例的consume方法,参见下面的代码段。
    carrier.consume(this, 1);
}
public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            //通过GRPC调用远程服务端接口上传数据
            StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
                Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
            ).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    ServiceManager.INSTANCE.findService(CommandService.class)
                                           .receiveCommand(commands);
                }
            });

            try {
                for (TraceSegment segment : data) {
                    SegmentObject upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
            }
            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }
    }
上一篇:SylixOS中的动态内存分配【3】--- 内存堆操作库接口及实现原理


下一篇:shell用sh执行报错而bash执行不会报错