根据send方法的源码知道,消息先进入拦截器,这里的拦截器,跟web拦截器一样,在拦截器中可以进行逻辑判断处理,可以对消息进行再次,或者统一的包装处理。
也可以做一些公共行为逻辑实现:
比如:
1:统计消息次数,成功次数,失败次数:
2:可以实现消息的统一落地(如果需要);
3:可以实现类似的aop日志,成功日志,错误日志。
自定义拦截器使用步骤:
1: 实现ProducerInterceptor接口,
2:重写onSend()方法,
3:配置自己的拦截器:
properties.put("interceptor.classes", "全类名");
如果配置多个拦截器,使用逗号隔开。
properties.put("interceptor.classes", "com.Interceptor1,com.Interceptor2");
代码:
1:producer
package com.kafka.learn2.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产者
*
* @author
* @date 2019/6/15
*/
public class Producer {
public static void main(String[] args) {
KafkaProducer<String, String> producer = new KafkaProducer(ProducerConf.initProperties());
ProducerRecord<String, String> record = new ProducerRecord<>(ProducerConf.topic, "hellotest", "hellotest");
producer.send(record);
producer.flush();
producer.close();
}
}
2:生产者配置
package com.kafka.learn2.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 生产者配置
*
* @author
* @date 2019/6/15
*/
public class ProducerConf {
public static final String url = "39.100.104.199:9092";
public static final String topic = "learn2";
public static Properties initProperties() {
Properties properties = new Properties();
properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
//自定义拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Interceptor.class.getName());
return properties;
}
}
3:生产者拦截器
拦截器的onAcknowledgement方法先于callback被调用
见 博客5-1 2.2分析
package com.kafka.learn2.producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 生产者拦截器
*
* @author
* @date 2019/6/15
*/
public class Interceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
//改造消息
Headers headers = record.headers();
HashMap<String, String> map = new HashMap<>();
map.put("token", UUID.randomUUID().toString());
String time = String.valueOf(System.currentTimeMillis());
Header header = new RecordHeader("timeStamp",time.getBytes());
//两种方式添加header
headers.add(header);
headers.add("token", UUID.randomUUID().toString().getBytes());
//ProducerRecord 其他属性改造.....
//
System.out.println("消息异步落地...");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception!=null){
//发送消息失败
System.out.println("消息失败同一记录处理 >>"+exception.getMessage());
}else{
//发送消息成功
System.out.println("消息成功同一处理");
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
/**
* 拦截用户自定义的配置
*/
System.out.println("自定义配置");
System.out.println(configs);
}
}
消费者
package com.kafka.learn2.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import java.util.Collections;
import java.util.Iterator;
/**
* 消费者
*
* @author
* @date 2019/6/15
*/
public class Consumer {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer(ConsumerConf.initProperties());
consumer.subscribe(Collections.singletonList(ConsumerConf.topic));
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, key = %s, value = %s%n", record.topic(), record.key(), record.value());
/**
* 消息头
*/
RecordHeaders headers =(RecordHeaders)record.headers();
headers.forEach(header -> {
System.out.println("key="+header.key()+"value="+new String(header.value()));
});
}
}
;
}
消费者配置
package com.kafka.learn2.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.UUID;
/**
* 消费者配置
* @author
* @date 2019/6/15
*/
public class ConsumerConf {
public static final String url = "39.100.104.199:9092";
public static final String topic = "learn2";
public static Properties initProperties() {
Properties properties = new Properties();
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
//移动到offset到 earliest
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
}