OpenTracing的使用实例(Java)

构件组织

OpenTracing API的Java构件如下:

  • opentracing-api:主要的API,无其他依赖。
  • opentracing-noop:为主要API提供无意义实现(NoopTracer),依赖于opentracing-api。
  • opentracing-util:工具类,例如GlobalTracer和默认的基于ThreadLocal存储的ScopeManager实现,依赖于上面所有的构件。
  • opentracing-mock:用于测试的mock层。包含MockTracer,简单的将Span存储在内存中,依赖于opentracing-api和opentracing-noop。

安装(Maven)

<dependency>
    <groupId>io.opentracing</groupId>
    <artifactId>opentracing-api</artifactId>
    <version>VERSION</version>
</dependency>

也可以使用opentracing-noop,opentracing-mock,opentracing-util来安装其他的构件,如果安装多个构件,需要提供一致的VERSION。

主要API

主要的OpenTracing API将所有主要组件声明为接口以及辅助类,例如Tracer,Span,SpanContext,Scope,ScopeManager,Format(用映射定义通用的SpanContext注入和提取格式)。

OpenTracing 社区贡献

除了官方的API,也有一些苦在opentracing-contribe,保管通用的辅助类像TracerResolver和框架工具库,例如 Java Web Servlet Filter and Spring Cloud,可以用于在使用这些框架工具的项目中方便的集成OpenTracing。

Quick Start

下面使用opentracing-mock中的MockTracer来进行示例:

import java.util.Map;
import io.opentracing.mock.MockTracer;
import io.opentracing.mock.MockSpan;
import io.opentracing.tags.Tags;

// Initialize MockTracer with the default values.
MockTracer tracer = new MockTracer();

// Create a new Span, representing an operation.
MockSpan span = tracer.buildSpan("foo").start();

// Add a tag to the Span.
span.setTag(Tags.COMPONENT, "my-own-application");

// do something for business logic

// Finish the Span.
span.finish();

// Analyze the saved Span.
System.out.println("Operation name = " + span.operationName());
System.out.println("Start = " + span.startMicros());
System.out.println("Finish = " + span.finishMicros());

// Inspect the Span's tags.
Map<String, Object> tags = span.tags();

使用Span

在任何时间点,OpenTracing Java API仅允许同一个线程中只存在一个活跃的Span。但是在同一个线程中允许同时存在符合下述条件的Span:

  • Started,新建的Span,但是没有在任何作用域(Scope)中激活
  • Not Finished,调用finish方法之前均处于该状态
  • Not Active,未被激活

同一个线程上可能有多个Span,如果它们:

  • 正在等待I/O操作完成
  • 被子Span阻塞
  • 或被溢出关键路径

人工地将活跃的Span从一个函数传递到另一个函数是极为不便的,所以OpenTracing要求每个Tracer包含一个作用域管理器(ScopeManager)。ScopeManager可以通过Scope来方法激活的Span,Scope来管理Span的激活与失活。ScopeManager API运行将Span传到到另一个线程或回调,而不是传递Scope。

开发这在创建新的Span时,如果当前线程的Scope中已经存在活跃的Span,则该活跃Span则会成为新创建Span的父亲,除非开发者在buildSpan()时调用ignoreActiveSpan()或者明确指定父上下文(parent context)。

访问活跃的Span

开发者可以通Scope对象访问活跃的Span

io.opentracing.Tracer tracer = ...;
...
Scope scope = tracer.scopeManager().active();
if (scope != null) {
    scope.span().log("...");
}

在线程间移动Span

使用OpenTracing API,开发者可以在多个不同的线程间传输Span。一个Span的生命周期可以在一个线程中开始在另一个线程中结束。不支持传递Scope到另一个线程或回调。Span的内部时序细节看来如下:

[ ServiceHandlerSpan                                 ]
 | FunctionA |     waiting on an RPC      | FunctionB |

---------------------------------------------------------> time

当执行FunctionA和FunctionB时ServiceHandlerSpan是活跃的,但是在等待RPC调用的过程中是失活的。RPC可能有自己的Span,但我们现在只关注ServerHandlerSpan如何从FunctionA传播到FunctionB。使用ScopeManager API可以在FunctionA中获取Span,RPC结束后在FunctionB中重新获取Span。步骤如下:

  1. 通过startManager或startActive(false)方法创建一个Span以阻止Scope失活时令Span终止。
  2. 在回调代码(闭包/Runnable/Future)中调用tracer.scopeManager().active(span,false)来重新激活Span获取一个新的Scope,当Span不再活跃时关闭Scope(或者使用try-with-resources以简化代码)
  3. 在回调代码末尾,调用tracer.scopeManager().active(span,true)来重新激活Span并得到一个自动关闭的Scope。

代码如下:

io.opentracing.Tracer tracer = ...;//通过具体的实现来创建tracer对象
...
// STEP 1 ABOVE: 开启新的Span和Scope
try (Scope scope = tracer.buildSpan("ServiceHandlerSpan").startActive(false)) {
    // Span在Scope中被激活
    final Span span = scope.span();
    doAsyncWork(new Runnable() {
        @Override
        public void run() {

            // STEP 2 ABOVE: 重新激活Span
            // 如果需要自动终止激活的Span,传递true给active方法
            try (Scope scope = tracer.scopeManager().activate(span, true)) {
                ...
            }
        }
    });
}

通过框架的拦截器能力实现HTTP请求追踪

通过上文中的代码,我们知道了如何使用Tracer对象构建Span,如何在线程中激活Span,以及如何在异步环境的不同线程间传递Span。

在实际的业务开发中,我们很难使用这种侵入的方式来实现追踪,更多的是利用各种框架提供的拦截器机制,来对各种业务调用进行自动追踪,比如Spring AOP,Servlet Filter,等等。下面一段代码展示了如何通过Servlet Filter来进行服务端的HTTP请求追踪。

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain)
        throws IOException, ServletException {

    HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
    HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;

        // 从Http Headers中提取上下文
    SpanContext extractedContext = tracer.extract(Format.Builtin.HTTP_HEADERS,
            new HttpServletRequestExtractAdapter(httpRequest));

      // 创建并激活一个新的Span,如果前面提取到的上下文不为null,则作为父SpanContext
    final Scope scope = tracer.buildSpan(httpRequest.getMethod())
            .asChildOf(extractedContext)
            .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
            .startActive(false);
        final Span span = scope.span();
    
    // 追踪请求的地址
    span.setTag(Tags.HTTP_URL, httpRequest.getRequestURI());
    try {
        // 实际执行过滤器链处理请求
        chain.doFilter(servletRequest, servletResponse);
    } finally {
         if (httpRequest.isAsyncStarted()) {
             // 如果请求是异步的,那么需要将Span对象传递到异步回调中
             httpRequest.getAsyncContext()
                     .addListener(new AsyncListener() {
                 @Override
                 public void onComplete(AsyncEvent event) throws IOException {
                     // 回调是在异步线程中执行的
                     // 需要使用Scope在异步先线程中激活Span
                     try(Scope sc = tracer.scopeManager().activite(span, true)){
                         HttpServletResponse httpResponse = (HttpServletResponse) event.getSuppliedResponse();
                         // 追踪响应状态 
                         sc.span().setTag(Tags.HTTP_STATUS, httpResponse.getStatus());
                     }
                 }

                 @Override
                 public void onTimeout(AsyncEvent event) throws IOException {
                     try(Scope sc = tracer.scopeManager().activite(span, true)){
                         // 记录错误
                         sc.span().setTag(Tags.ERROR, true)
                         sc.span().log(Maps.of(Fields.EVENT, event,
                                      Fields.ERROR_KIND, "TIMEOUT"))
                     }
                 }

                 @Override
                 public void onError(AsyncEvent event) throws IOException {
                     try(Scope sc = tracer.scopeManager().activite(span, true)){
                         // 记录错误
                         sc.span().setTag(Tags.ERROR, true)
                         sc.span().log(
                             Maps.of(Fields.EVENT, event,
                                     Fields.ERROR_KIND, event.getThrowable().getClass()))
                     }
                 }

                 @Override
                 public void onStartAsync(AsyncEvent event) throws IOException {
                 }
             });
         } else {
             // 如果是同步请求,直接终止Span
             scope.span().finish();
         }
         // 释放当前线程中的Span
         scope.close();
    }
}

利用这个过滤器,在Servlet应用中,用于追踪请求代码与业务代码解耦,并且仅需要一次编写,下面来看客户端如何追踪请求并向处理请求的服务端传递上下文,以Spring RestTemplate为例:

可以通过RestTemplate.setInterceptors注册拦截器。

@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] body,
                                    ClientHttpRequestExecution execution) throws IOException {
    
    // 创建新的Span,以当前线程中的SpanContext为父,如没有则自己成为根Span
    try (Scope scope = tracer.buildSpan(httpRequest.getMethod().toString())
            .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT).startActive(true)) {
        // 追踪请求地址
        scope.span().setTag(Tags.HTTP_URL, httpRequest.getURI().toString())
        // 将SpanContext注入到请求头中
        // 看前文中的代码可以知道,服务端通过Tracer.extract可以从请求头中提取出SpanContext
        tracer.inject(scope.span().context(), Format.Builtin.HTTP_HEADERS,
                new HttpHeadersCarrier(httpRequest.getHeaders()));
        
        // 实际执行请求
        return execution.execute(httpRequest, body);
    }

}

使用opentracing-spring-cloud

上文中通过代码示例了,如何通过框架工具提供的拦截器能力来实现请求追踪,由于Spring MVC,RestTemplate,Servlet……这些开源工具是的用户相当广泛,所以在opentracing-contrib项目中提供了非常多针对这些被广泛使用的开源工具的集成支持包。

其中java-spring-cloud子项目,为spring-cloud项目提供了opentracing-spring-cloud-starter,这个starter通过依赖了很多其他的opentrcing集成支持库,来为基于spring-cloud架构的应用提供一站式opentracing集成方案,其中包括如下组件:

利用SpringBoot的AutoConfiguration机制为用户提供了几乎无须手动配置的集成方案。

  • Spring Web (RestControllers, RestTemplates, WebAsyncTask, WebClient, WebFlux)
  • @Async, @Scheduled, Executors
  • WebSocket STOMP
  • Feign, HystrixFeign
  • Hystrix
  • JMS
  • JDBC
  • Mongo
  • Zuul
  • Reactor
  • RxJava
  • Redis
  • Standard logging - logs are added to active span
  • Spring Messaging - trace messages being sent through Messaging Channels
  • RabbitMQ

使用SpringCloud的开发者,可以简单的将opentracing-spring-cloud-starter添加到自己项目的依赖中,来体验它带来的opentracing集成。

如果不使用SpringCloud也可以其为起点,按自己的需求从其依赖中挑选自己需要的部分,或者浏览opentracing-contrib项目来寻找自己需要的支持库。

使用Jaeger

前文中描述的API以及中间件集成方案,都是对OpenTracing API的集成,仔细看代码中缺少一个必要的构建Tracer对象的步骤。在实际场景中,我们需要一种具体的OpenTracing实现,来创建Tracer对象。

Jaeger是由Uber开源的OpenTracing实现项目,它提供了追踪数据上报服务以及数据的视图,来帮助开发者解决分布式系统中的如下问题:

  1. 分布式事务监控
  2. 性能和延迟优化
  3. 分析故障源头
  4. 服务以来分析
  5. 分布式上下文传播

以来jaeger-client-java可以利用如下代码创建一个Tracer对象:

Configuration config = new io.jaegertracing.Configuration("服务名称");
// 设置数据发送方式
Configuration.SenderConfiguration sender = new Configuration.SenderConfiguration();
sender.withEndpoint("<endpoint>"); // endpoint可以是在阿里云上购买的链路追踪服务或者自己使用Jaeger搭建的服务
// 设置采样方式
config.withSampler(new Configuration.SamplerConfiguration().withType("const").withParam(1));
// 设置数据上报方式
config.withReporter(new Configuration.ReporterConfiguration().withSender(sender).withMaxQueueSize(10000));
Tracer tracer = config.getTracer();

Configuration类提供了非常多的配置功能,有兴趣的开发者可以阅读其API文档来了解更多的自定义选项,甚至扩展Jaeger的功能。

上一篇:Java websocket 服务器和客户端实践


下一篇:微服务实战之春云与刀客(三)—— 面向接口调用代码结构实例