Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)

Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)

Kafka生产者是整个Kafka架构中的一个角色,可以是不同集成了Kafka的组件,KafkaProducer是线程安全的,可以同时给多个线程使用。

1 如何构建一个KafkaProducer

构建一个KafkaProducer的构造方法有2种:

//首先配置Producer必要配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"fast practice producer");

//根据配置创建生产者实例,这是常用方式
KafkaProducer producer = new KafkaProducer<K,V>(properties)
  
//假如在properties中没有配置序列化器,也可以在构造器中指定,实际底层原理都一样
KafkaProducer producer = new KafkaProducer<K,V>(properties,new StringSerializer(),new StringSerializer())

2 创建生产者消息ProducerRecord并发送消息

ProducerRecord的构造方法有很多种,根据不同的需求可以选择不同的构造器。

//创建一个生产者消息对象
ProducerRecord<String, String> message = new ProducerRecord<String, String>(
  "topic1",
  "hello_" + System.currentTimeMillis());

//通过生产者发送消息
try{
    //Future代表一个任务的生命周期,从Future中可以获取该消息的源数据信息,如partition,offset等
    Future<RecordMetadata> future = producer.send(message);
} catch (Exception e) {
  	e.printStackTrace();
} finally {
  	//关闭生产者
  	producer.close();
}

3 发送消息的3种模式

Kafka生产者发送消息有3种模式

  • fire-and-forget 发送即忘

    • 只管往Broker中发送消息,不管消息是否正确到达,这种模式在大多数场景下是没有问题;
    • 在遇到不可重试异常时可能会造成数据丢失;
    • 这是模式性能最高,可靠性最差。

    实现方式

    try{
    //Future代表一个任务的生命周期
    Future<RecordMetadata> future = producer.send(message);
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
  • sync 同步

    • 可以配合get()方式阻塞kafka的响应,实现消息的同步发送,可靠性很高,但是性能较低,因为需要阻塞等待上一条消息发送完;
    • 该方法的调用要么消息发送成功,要么发送异常,当发送异常时需要使用外部逻辑进行处理。

    实现方式

    try{
    //通过get()阻塞来等待kafka的响应,要么发送成功,要么异常,异常的话交给外部逻辑处理
    producer.send(message).get();
      
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
    try{
    //这种方式实际上与上面的方式是一样的,如果你需要用到消息的元数据信息,就可以选择这种方式,否则使用上面这个方式更省事。
    Future<RecordMetadata> future = producer.send(message);
    RecordMetaData metaData = future.get()
    metaData.offset() //获取元数据信息
    metaData.partition()
    }catch(ExecutionException | InterruptedException e)){
      e.printStackTrace();
    }
    
  • async 异步

    • send()方法本身就是异步的,该方法返回的Future对象代表发送一个消息的生命周期,它可以使调用方在消息发送之后获取发送结果;
    • Future.get()方法可以获取被成功发送消息的元数据信息,如offset、partition
    //实现消息的异步发送一般使用send(message,Callback cb)这种回调函数的重载方式,其中Callback是在kafka有响应之后调用,假如没有响应,那么不会掉用。
    
    try{
      	//使用回调函数
        producer.send(message, new Callback() {
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
              //处理
              e.printStackTrace();
            } else {
              System.out.println(recordMetadata.topic()
                                 + ":" + recordMetadata.partition()
                                 + ":" + recordMetadata.offset());
            }
    
          }
        });
    }catch{
      //handle this
    }
    

    注意:回调函数Callback{}中onComplete()方法中的2个参数是互斥的,如果Exception为Null,那么RecordMetadata就不为Null,反之亦然

4 KafkaProducer中的异常

4.1异常分类

KafkaProducer中一般会发生2种异常

  • 可重试的异常
    • NetworkException
      • 网络瞬间故障导致的异常,重试可以恢复
    • LeaderNotAvailableException
      • 标示分区的leader不可用,通常是发生在旧的leader下线而新的leader被选举出来之前,重试可恢复。
    • UnknownTopicOrPartitionException
    • NotEnoughReplicasException
    • NotCoordinatorException
  • 不可重试的异常
    • RecordTooLargeException
      • 代表生产者发送的消息太大,超过了配置中设置的最大900+kb,kafka对此不会进行任何重试,直接抛出异常!

4.2 可重试异常解决

//设置重试次数,但是重试10次之后没有恢复还是会抛出异常,此时需要外部逻辑处理
props.put(ProducerConfig.RETRIES_CONFIG, 10);

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常 ,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条信息。

5 Key-Value的序列化器

因为Producer的数据按照序列化的方式传输到Kafka,所以需要指定KV的序列化方式。源码如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 *  String类型的编码方式默认是UTF-8,编码方式可以通过在properties中指定:
 *  key.serializer.encoding,value.serializer.encoding 或者 serializer.encoding. 
 *  前2个编码方式比后者优先级高。
 */
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8"; //默认编码UTF-8s

    @Override //configure方法确定key、value的编码方式
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override //将String类型按照编码方式转换成字节数组,如果为null,返回null
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

注意,假如Kafka自带的序列化方式都不能满足业务需求,那么可以Avro、JSON Thrift、 ProtoBuf、 Protostuff 等通用的序列化工具来实现,或者使用自定义序列化器的方式实现,在properties配置参数中指定就好。

6 分区器Partitioner

KafkaProducer的消息在send()之后,可能会经过序列化器、拦截器、分区器之后才会真正到达Kafka-Broker中的Partition中!~以下是send()之后数据的流向。

Interceptor => Serializer => Partitioner

6.1 具体源码如下

// step-1
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
  return send(record, null);
}

//step-2
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  // 此处拦截消息记录, 这个拦截之后将数据操作之后然后发送到下一层,onsend()方法就是将record修改之后生成拦截修改之后的interceptedRecord。
  ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  return doSend(interceptedRecord, callback); //该方法doSend()也是kafkaProducer中的方法,下一次进入该方法
}

//step-3
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // 首先确认topic的元数据是可以用的
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey; //定义Key的序列化结果
            try {
              	//将key进行序列化
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
              //将value进行序列化
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
          	//该方法根据传的参数,按照分区器DefaultPartioner为每条Record指定所属分区。
            int partition = partition(record, serializedKey, serializedValue, cluster);
						........
         		//and so on 
              
//step-4
//我们点进去partition()方法,发现这个方法来自于一个接口Partitioner,Partitioner implements Configuable,是可以配置的
//Kafka源码中该Partitioner的实现类只有一个DefaultPartitioner,点进该类,查看重写的partition()方法,如下:
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //为每个序列化之后的key的字节数组按照Murmur2算法进行hash,然后找到对应的Record的分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
            

7 生产者-拦截器Interceptor

Kafka的拦截器在Kafka-0.10.0.0被引入,Kafka有2种拦截器

  • 生产者拦截器
  • 消费者拦截器

我们自定义拦截器只需要implements ProducerInterceptor就行,ProducerInterceptor与Partitioner一样,都继承自同一个父接口Configurable。

ProducerInterceptor中有3种方法,在Kafka中拦截器没有默认的实现类。

onSend(ProducerRecord<K, V> record); //
onAcknowledgement(RecordMetadata metadata, Exception exception);
close()

7.1 简单的拦截器示例

public class ProducerinterceptorPrefix implements 
Producerinterceptor<String, String>{
  //to do something that you aim to 
}

7.2 拦截器部署加载

在自定义完拦截器之后,我们需要在Producer的配置参数中设置。

//这里可以指定多个类,类的全类名之间通过`,`进行分割
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.shufang.interceptor.ProducerinterceptorPrefix"+","+"com.shufang.interceptor.ProducerinterceptorPrefix2");

上一篇:python协程


下一篇:RocketMq核心概念