1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:
1)、按照某个规则过滤掉不符合要求的消息。
2)、修改消息的内容。
3)、统计类需求。
1 package com.demo.kafka.listener; 2 3 import java.util.Map; 4 5 import org.apache.kafka.clients.producer.ProducerInterceptor; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 import org.apache.kafka.clients.producer.RecordMetadata; 8 9 /** 10 * 生产者拦截器 11 * 12 * @author 生产者拦截器 13 * 14 */ 15 16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> { 17 18 // 发送成功计数 19 private volatile long sendSuccess = 0; 20 21 // 发送失败计数 22 private volatile long sendFailure = 0; 23 24 /** 25 * 26 */ 27 @Override 28 public void configure(Map<String, ?> configs) { 29 30 } 31 32 /** 33 * 发送消息已经操作消息的方法 34 */ 35 @Override 36 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { 37 String modifiedValue = "前缀prefix : " + record.value(); 38 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( 39 record.topic(), // 主题 40 record.partition(), // 分区 41 record.timestamp(), // 时间戳 42 record.key(), // key值 43 modifiedValue, // value值 44 record.headers()); // 消息头 45 return producerRecord; 46 } 47 48 /** 49 * ack确认的方法 50 */ 51 @Override 52 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { 53 if(exception == null) { 54 sendSuccess++; 55 }else { 56 sendFailure++; 57 } 58 } 59 60 /** 61 * 关闭的方法,发送成功之后会将拦截器关闭,调用此方法 62 */ 63 @Override 64 public void close() { 65 double succe***ation = (double)sendSuccess / (sendSuccess + sendFailure); 66 System.out.println("【INFO 】 发送成功率: " + String.format("%f", succe***ation * 100) + "%"); 67 } 68 69 }
生产者客户端要配置一下Producer的拦截器interceptor,如下所示:
1 package com.demo.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.ProducerConfig; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata; 10 import org.apache.kafka.common.serialization.StringSerializer; 11 12 import com.demo.kafka.listener.ProducerInterceptorPrefix; 13 14 public class KafkaProducerSimple { 15 16 // 设置服务器地址 17 private static final String brokerList = "192.168.110.142:9092"; 18 19 // 设置主题 20 private static final String topic = "topic-demo"; 21 22 public static void main(String[] args) { 23 Properties properties = new Properties(); 24 // 设置key的序列化器 25 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 26 27 // 设置重试次数 28 properties.put(ProducerConfig.RETRIES_CONFIG, 10); 29 30 // 设置值的序列化器 31 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 32 33 // 打印输出序列化器的路径信息 34 System.err.println(StringSerializer.class.getName()); 35 36 // 设置集群地址 37 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 38 39 // 自定义拦截器使用,可以计算发送成功率或者失败率,进行消息的拼接或者过滤操作 40 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName()); 41 42 // 将参数配置到生产者对象中 43 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); 44 45 for (int i = 0; i < 100000; i++) { 46 // 生产者消息记录 47 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i); 48 // 同步获取消息 49 // RecordMetadata recordMetadata = producer.send(record).get(); 50 producer.send(record); 51 } 52 53 // 关闭 54 producer.close(); 55 } 56 57 }
消费者代码,如下所示:
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Collections; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.ConsumerConfig; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 import org.apache.kafka.clients.producer.ProducerConfig; 12 import org.apache.kafka.common.serialization.StringDeserializer; 13 14 public class KafkaConsumerSimple { 15 16 // 设置服务器地址 17 private static final String bootstrapServer = "192.168.110.142:9092"; 18 19 // 设置主题 20 private static final String topic = "topic-demo"; 21 22 // 设置消费者组 23 private static final String groupId = "group.demo"; 24 25 public static void main(String[] args) { 26 Properties properties = new Properties(); 27 // 设置反序列化key参数信息 28 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 29 // 设置反序列化value参数信息 30 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 31 32 // 设置服务器列表信息 33 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 34 35 // 设置消费者组信息 36 properties.put("group.id", groupId); 37 38 // 将参数设置到消费者参数中 39 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 40 41 // 消息订阅 42 consumer.subscribe(Collections.singletonList(topic)); 43 44 while (true) { 45 // 每隔一秒监听一次 46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 47 // 获取到消息信息 48 for (ConsumerRecord<String, String> record : records) { 49 System.err.println(record.toString()); 50 } 51 } 52 53 } 54 55 }
2、生产者的acks参数,这个参数用来指定分区中必须有多少副本来收到这条消息,之后生产者才会认为这条消息写入成功的。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。
1)、ack等于0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了,不过因为生产者不需要等待服务器响应,所以他可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
2)、acks等于1,默认值为1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点,比如首领节点崩溃,新的首领节点还没有被选举出来,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没有来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
3)、acks等于-1,只有当所有参与复制的节点收到消息时候,生产者会收到一个来自服务器额成功响应,这种模式 最安全的,他可以保证不止一个服务器收到消息。
注意,acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常信息。
3、kafka消费者订阅主题和分区,创建完消费者后我们便可以订阅主题了,只需要调用subscribe方法即可,这个方法会接受一个主题列表,如下所示:
另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组立即对其进行消费。正则表达式在连接kafka与其他系统非常有用。比如订阅所有的测试主题。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Collections; 6 import java.util.Properties; 7 import java.util.regex.Pattern; 8 9 import org.apache.kafka.clients.consumer.ConsumerConfig; 10 import org.apache.kafka.clients.consumer.ConsumerRecord; 11 import org.apache.kafka.clients.consumer.ConsumerRecords; 12 import org.apache.kafka.clients.consumer.KafkaConsumer; 13 import org.apache.kafka.clients.producer.ProducerConfig; 14 import org.apache.kafka.common.TopicPartition; 15 import org.apache.kafka.common.serialization.StringDeserializer; 16 17 public class KafkaConsumerSimple { 18 19 // 设置服务器地址 20 private static final String bootstrapServer = "192.168.110.142:9092"; 21 22 // 设置主题 23 private static final String topic = "topic-demo"; 24 25 // 设置主题 26 private static final String topic2 = "topic-demo2"; 27 28 // 设置消费者组 29 private static final String groupId = "group.demo"; 30 31 public static void main(String[] args) { 32 Properties properties = new Properties(); 33 // 设置反序列化key参数信息 34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 35 // 设置反序列化value参数信息 36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 37 38 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个 39 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 40 41 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称 42 properties.put("group.id", groupId); 43 44 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。 45 properties.put("client.id", "consumer.client.id.demo"); 46 47 // 将参数设置到消费者参数中 48 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 49 50 // 消息订阅 51 consumer.subscribe(Collections.singletonList(topic)); 52 // 可以订阅多个主题 53 consumer.subscribe(Arrays.asList(topic, topic2)); 54 // 可以使用正则表达式进行订阅 55 consumer.subscribe(Pattern.compile("topic-demo*")); 56 57 // 指定订阅的分区 58 consumer.assign(Arrays.asList(new TopicPartition(topic, 0))); 59 60 while (true) { 61 // 每隔一秒监听一次 62 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 63 // 获取到消息信息 64 for (ConsumerRecord<String, String> record : records) { 65 System.err.println(record.toString()); 66 } 67 } 68 69 } 70 71 }