kafka|使用Interceptors实现消息端到端跟踪

kafka|使用Interceptors实现消息端到端跟踪

大哥你先走 浪尖聊大数据

现在,Kafka指标的采集都仅包括客户端或broker,这使得用户跟踪消息在集群内的传递路径,构建系统端到端的性能和行为画像变的困难。从技术上讲,通过修改应用以收集或跟踪额外的信息来测量系统端到端的性能是可行的,但对于关键的基础设施应用来说,这种方案并不一定是切实可行的。在生产环境中,能够快速部署工具来观察,测量和监控Kafka客户端行为(粒度直至消息级别),是非常有用的。同时,不同应用的度量指标需要的上下文元数据各异。无需重新编写代码或重新编译即可实现监控客户端的能力十分重要(在某些场景下,这种能力有助于连接到正在运行的应用程序)。
为了实现这个功能,kafka 更加倾向于增加生产者和消费者拦截器,拦截器可以在生产者和消费者处理消息的不同阶段拦截消息。在Apache Flume 拦截器接口的启发下,kafka开发了现在的机制。虽然,有很多功能都可以使用拦截器实现(例如,异常检测,数据加密,字段过滤等),但是每个功能都需要仔细的评估是否应该使用拦截器还是其他机制来完成。当这些场景有明确的使用动机时,提供明确的API是一种良好的实践。因此,kafka提供了最小化的生产者和消费者拦截器接口,旨在仅支持测量和监控。

尽管增加更多的指标或改进kafka的监控是可能的,但是基于以下原因我们认为提供灵活的,用户可定制的接口更加有益:

  1. 构建通用监控工具。在一家大公司,不同的团队合作构建系统。通常来说,随着时间的推移,不同的团队开发部署不同的组件。此外,组织对于通用的指标、数据格式和数据系统希望实现标准化。对于一个组织,我们认为开发部署通用的Kafka客户端监控工具并在所有使用Kafka的应用中部署该工具是非常有价值的。

  2. 高昂的监控代价。向kafka添加其他指标可能会影响kafka的性能。不幸的是,有时候需要在系统性能和数据收集之间进行权衡。举个例子,考虑检测消息大小的场景。代价最低,最简单,最直接的方法是计算消息的平均大小。计算分布式系统中的百分比要比计算简单的平均值代价更高,更复杂,但是在很多应用中这是非常有用的。我们希望能够让客户使用不同的算法收集指标数据,或者不收集。

  3. 应用对指标的要求不同。例如,一个用户可能认为监控kafka中不同key的消息数非常的重要。在kafka内部提供所有的指标是不切实际的。插件化的拦截系统为指标的定制化提供了简单可行的能力。

  4. 在一个组织中kafka通常是大型基础设施的一部分,在基础设施中实现端到端的跟踪是非常有用的。拦截器提供了在相同基础设施中跟踪kafka客户端的能力。

为了支持拦截器功能,Kafka在0.10.0.0版本增加了两个全新的接口:ProducerInterceptor和ConsumerInterceptor并支持实现和配置拦截器链。拦截器API允许修改消息以支持给消息增加额外元数据实现端到端跟踪的能力。

生产者拦截器 ProducerInterceptor


public interface ProducerInterceptor<K, V> extends Configurable {

 public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

 public void onAcknowledgement(RecordMetadata metadata, Exception exception);

 public void close();
}

消费者拦截器 ConsumerInterceptor


public interface ConsumerInterceptor<K, V> extends Configurable {

 ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);

 void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

 void close();
}

下面以实现一个简单的kafka指标采集小功能为例,进一步了解kafka拦截器的功能和使用方法。采集指标包括:

  1. 生产和消费消费的线程名

  2. 生产者生产消息成功失败次数统计

3.1 修改消息,增加处理线程名
在生产端,实现ProducerInterceptor接口并覆写onSend方法,修改ProducerRecord,在Heads中增加生产者线程名:


public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {

 @Override
 public ProducerRecord<String,String> onSend(ProducerRecord<String,String > record) {
 Header producerThread = new RecordHeader("producerThread",Thread.currentThread().getName().getBytes());
 record.headers().add(producerThread);
 return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),record.headers());
 }
}

在消费端,实现ConsumerInterceptor接口并覆写onConsume方法,修改ConsumerRecord,在Heads中增加消费者线程名:


public class TraceConsumerInterceptor implements ConsumerInterceptor<String, String> {

 @Override
 public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
 byte[] currentThreadName = Thread.currentThread().getName().getBytes();
 Header header = new RecordHeader("consumer Thread", currentThreadName);
 records.forEach(record -> record.headers().add(header));
 return records;
 }
}

3.2 实现生产者消息成功失败统计
在生产端,实现ProducerInterceptor接口并覆写onAcknowledgement方法,对发送成功和失败的消息进行统计,并在拦截器关闭时将数据打印到控制台:.


public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {
 private AtomicLong successCounts = new AtomicLong(0);
 private AtomicLong failedCounts = new AtomicLong(0);

 @Override
 public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 if (null == exception) {
 successCounts.getAndIncrement();
 } else {
 failedCounts.getAndIncrement();
 }
 }

 @Override
 public void close() {
 System.out.println("success counts " + successCounts.get());
 System.out.println("failed counts " + failedCounts);
 }
}

3.3 . 拦截器配置:
生产者和消费者可以通过interceptor.classes属性配置拦截器,属性的值为一个字符串集合,集合中的元素为拦截器类的全路径名(包括包名)。

生产者只包含拦截器的配置如下:


Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.*.kafka.TraceConsumerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

消费者只包含拦截器的配置如下:


Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.*.kafka.TraceConsumerInterceptor");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

3.4. 测试
生产者使用三个线程,每个线程发送一个消息到kafka,在主线程启动消费者消费kafka的消息,收到的每条消息打印消息的Heads信息。

  • 生产者
    创建发送消息的线程池:

private static ExecutorService executor = Executors.newFixedThreadPool(3);

为了避免主线程退出导致发送消息失败,在添加任务时,将返回的Future对象保存到队列中,然后逐个检查任务是否完成,详细的代码如下:


Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("delivery.timeout.ms", 300000);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 List<String> interceptors = new ArrayList<>();
 interceptors.add("io.github.ctlove0523.*.kafka.TraceProducerInterceptor");
 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

 Producer<String, String> producer = new KafkaProducer<>(props);
 List<Future> futures = new ArrayList<>(3);
 for (int i = 0; i < 3; i++) {
 futures.add(executor.submit(() -> {
 producer.send(new ProducerRecord<>("TEST", "hello world "));
 }));
 }

 futures.forEach(future -> {
 try {
 future.get();
 } catch (Exception e) {
 System.out.println(e.getMessage());
 }
 });

 producer.close();

代码的输出结果如下:


success counts 3
failed counts 0
  • 消费者
    消费者拉取消息,打印收到消息的Heads信息以验证拦截器是否生效。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "chentong");
 props.put("enable.auto.commit", "false");
 props.put("key.deserializer",         "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 List<String> interceptors = new ArrayList<>();
 interceptors.add("io.github.ctlove0523.*.kafka.TraceConsumerInterceptor");
 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

 Consumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.assign(Arrays.asList(new TopicPartition("TEST", 0)));
 consumer.seek(new TopicPartition("TEST", 0), 0L);
 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
 records.forEach(record -> {
 record.headers().headers("producer thread")
 .forEach(header -> System.out.print("producer thread = " + new String(header.value())));
 record.headers().headers("consumer thread")
 .forEach(header -> System.out.println("\t consumer thread = " + new String(header.value())));
 });
 }

代码输出结果如下:


producer thread = pool-1-thread-2  consumer thread = main
producer thread = pool-1-thread-1  consumer thread = main
producer thread = pool-1-thread-3  consumer thread = main

本文首先介绍了kafka拦截器引入的动机,主要为了解决当前kafka指标采集和监控的痛点问题;接着简单介绍了ProducerInterceptor和ConsumerInterceptor两个接口,最后以一个实际修改kafka消息Heads的例子进一步阐述了如何使用kafka提供的拦截器功能。
转自:https://www.jianshu.com/p/a344b3bba8f0

上一篇:okhttp3.9.1源码初探


下一篇:极客时间——设计模式之美 职责链模式(下):框架中常用的过滤器、拦截器是如何实现的?