Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)
Kafka生产者是整个Kafka架构中的一个角色,可以是不同集成了Kafka的组件,KafkaProducer是线程安全的,可以同时给多个线程使用。
1 如何构建一个KafkaProducer
构建一个KafkaProducer的构造方法有2种:
//首先配置Producer必要配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"fast practice producer");
//根据配置创建生产者实例,这是常用方式
KafkaProducer producer = new KafkaProducer<K,V>(properties)
//假如在properties中没有配置序列化器,也可以在构造器中指定,实际底层原理都一样
KafkaProducer producer = new KafkaProducer<K,V>(properties,new StringSerializer(),new StringSerializer())
2 创建生产者消息ProducerRecord并发送消息
ProducerRecord的构造方法有很多种,根据不同的需求可以选择不同的构造器。
//创建一个生产者消息对象
ProducerRecord<String, String> message = new ProducerRecord<String, String>(
"topic1",
"hello_" + System.currentTimeMillis());
//通过生产者发送消息
try{
//Future代表一个任务的生命周期,从Future中可以获取该消息的源数据信息,如partition,offset等
Future<RecordMetadata> future = producer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭生产者
producer.close();
}
3 发送消息的3种模式
Kafka生产者发送消息有3种模式
-
fire-and-forget 发送即忘
- 只管往Broker中发送消息,不管消息是否正确到达,这种模式在大多数场景下是没有问题;
- 在遇到不可重试异常时可能会造成数据丢失;
- 这是模式性能最高,可靠性最差。
实现方式:
try{ //Future代表一个任务的生命周期 Future<RecordMetadata> future = producer.send(message); }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
-
sync 同步
- 可以配合get()方式阻塞kafka的响应,实现消息的同步发送,可靠性很高,但是性能较低,因为需要阻塞等待上一条消息发送完;
- 该方法的调用要么消息发送成功,要么发送异常,当发送异常时需要使用外部逻辑进行处理。
实现方式:
try{ //通过get()阻塞来等待kafka的响应,要么发送成功,要么异常,异常的话交给外部逻辑处理 producer.send(message).get(); }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
try{ //这种方式实际上与上面的方式是一样的,如果你需要用到消息的元数据信息,就可以选择这种方式,否则使用上面这个方式更省事。 Future<RecordMetadata> future = producer.send(message); RecordMetaData metaData = future.get() metaData.offset() //获取元数据信息 metaData.partition() }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
-
async 异步
- send()方法本身就是异步的,该方法返回的Future对象代表发送一个消息的生命周期,它可以使调用方在消息发送之后获取发送结果;
- Future.get()方法可以获取被成功发送消息的元数据信息,如offset、partition
//实现消息的异步发送一般使用send(message,Callback cb)这种回调函数的重载方式,其中Callback是在kafka有响应之后调用,假如没有响应,那么不会掉用。 try{ //使用回调函数 producer.send(message, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { //处理 e.printStackTrace(); } else { System.out.println(recordMetadata.topic() + ":" + recordMetadata.partition() + ":" + recordMetadata.offset()); } } }); }catch{ //handle this }
注意:回调函数Callback{}中
onComplete()
方法中的2个参数是互斥的,如果Exception为Null,那么RecordMetadata就不为Null,反之亦然
。
4 KafkaProducer中的异常
4.1异常分类
KafkaProducer中一般会发生2种异常
- 可重试的异常
-
NetworkException
- 网络瞬间故障导致的异常,重试可以恢复
-
LeaderNotAvailableException
- 标示分区的leader不可用,通常是发生在旧的leader下线而新的leader被选举出来之前,重试可恢复。
- UnknownTopicOrPartitionException
- NotEnoughReplicasException
- NotCoordinatorException
-
NetworkException
- 不可重试的异常
-
RecordTooLargeException
- 代表生产者发送的消息太大,超过了配置中设置的最大900+kb,kafka对此不会进行任何重试,直接抛出异常!
-
RecordTooLargeException
4.2 可重试异常解决
//设置重试次数,但是重试10次之后没有恢复还是会抛出异常,此时需要外部逻辑处理
props.put(ProducerConfig.RETRIES_CONFIG, 10);
同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常 ,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条信息。
5 Key-Value的序列化器
因为Producer的数据按照序列化的方式传输到Kafka,所以需要指定KV的序列化方式。源码如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* String类型的编码方式默认是UTF-8,编码方式可以通过在properties中指定:
* key.serializer.encoding,value.serializer.encoding 或者 serializer.encoding.
* 前2个编码方式比后者优先级高。
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8"; //默认编码UTF-8s
@Override //configure方法确定key、value的编码方式
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override //将String类型按照编码方式转换成字节数组,如果为null,返回null
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
注意,假如Kafka自带的序列化方式都不能满足业务需求,那么可以Avro、JSON Thrift、 ProtoBuf、 Protostuff 等通用的序列化工具来实现,或者使用自定义序列化器的方式实现,在properties配置参数中指定就好。
6 分区器Partitioner
KafkaProducer的消息在send()之后,可能会经过序列化器、拦截器、分区器之后才会真正到达Kafka-Broker中的Partition中!~以下是send()之后数据的流向。
Interceptor => Serializer => Partitioner
6.1 具体源码如下
// step-1
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
//step-2
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 此处拦截消息记录, 这个拦截之后将数据操作之后然后发送到下一层,onsend()方法就是将record修改之后生成拦截修改之后的interceptedRecord。
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback); //该方法doSend()也是kafkaProducer中的方法,下一次进入该方法
}
//step-3
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// 首先确认topic的元数据是可以用的
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey; //定义Key的序列化结果
try {
//将key进行序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
//将value进行序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
//该方法根据传的参数,按照分区器DefaultPartioner为每条Record指定所属分区。
int partition = partition(record, serializedKey, serializedValue, cluster);
........
//and so on
//step-4
//我们点进去partition()方法,发现这个方法来自于一个接口Partitioner,Partitioner implements Configuable,是可以配置的
//Kafka源码中该Partitioner的实现类只有一个DefaultPartitioner,点进该类,查看重写的partition()方法,如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//为每个序列化之后的key的字节数组按照Murmur2算法进行hash,然后找到对应的Record的分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
7 生产者-拦截器Interceptor
Kafka的拦截器在Kafka-0.10.0.0被引入,Kafka有2种拦截器
- 生产者拦截器
- 消费者拦截器
我们自定义拦截器只需要implements ProducerInterceptor就行,ProducerInterceptor与Partitioner一样,都继承自同一个父接口Configurable。
ProducerInterceptor中有3种方法,在Kafka中拦截器没有默认的实现类。
onSend(ProducerRecord<K, V> record); //
onAcknowledgement(RecordMetadata metadata, Exception exception);
close()
7.1 简单的拦截器示例
public class ProducerinterceptorPrefix implements
Producerinterceptor<String, String>{
//to do something that you aim to
}
7.2 拦截器部署加载
在自定义完拦截器之后,我们需要在Producer的配置参数中设置。
//这里可以指定多个类,类的全类名之间通过`,`进行分割
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.shufang.interceptor.ProducerinterceptorPrefix"+","+"com.shufang.interceptor.ProducerinterceptorPrefix2");