kafka08-生产者

1.1 消息发送

1.1.1 数据生产流程解析

kafka08-生产者

1. Producer创建时,会创建一个Sender线程并设置为守护线程。
2. 生产消息时,内部时异步的。消息经过拦截器=》序列化器=》分区器=》缓冲区
3. 缓冲区批次发送消息条件为batch.size或者linger.ms。满足其一就会批次发送数据
4. 批次消息发送后,发往指定分区,落入broker磁盘。如果生产者配置了retries大于0,则当消息发送失败,还会重试批次,重试批次的数据会在缓冲区后追加(会导致消息先后顺序错乱)
5. 落盘broker时,是否成功判断有三种形式。不管消息成功失败,返回;二,写入leader分区视为成功;3,leader和副本分区都写成功才视为成功
6. 落盘broker磁盘后,返回生产元数据给生产者,元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

1.1.2 参数

属性 解释
bootstrap.servers 生产者客户端与broker集群建立初始连接需要的broker地址列表, 由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需 要写全部的Kafka集群中broker的地址,但也不要写一个,以防该节 点宕机的时候不可用。
key.serializer 实现了接口 org.apache.kafka.common.serialization.Serializer的key 序列化类。
value.serializer 实现了接口 org.apache.kafka.common.serialization.Serializer的 value序列化类。
acks 该选项控制着已发送消息的持久性。 acks=0:生产者不等待broker的任何消息确认。只要将消息放到了 socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该 消息,retries设置也不起作用,因为客户端不关心消息是否发送 失败。客户端收到的消息偏移量永远是-1。 acks=1:leader将记录写到它本地日志,就响应客户端确认消息, 而不等待follower副本的确认。如果leader确认了消息就宕机,则可 能会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all:leader等待所有同步的副本确认该消息。保证了只要有 一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等 价于acks=-1。默认值为1,字符串。可选值:[all, -1, 0, 1]
compression.type 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的 值:none,gzip,snappy和lz4。压缩是对整个消息批次来讲 的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越 好。字符串类型的值。默认是none。
retries 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消 息。该重试与客户端收到异常重新发送并无二至。允许重试但是不 设置max.in.flight.requests.per.connection为1,存在消息 乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了 重试,第二个成功了,则第一个消息批在第二个消息批后。int类型 的值,默认:0,可选值:[0,...,2147483647]

1.1.3 序列化器

数据的序列化一般生产中使用avro。 自定义序列化器需要实org.apache.kafka.common.serialization.Serializer接口,并实现其 中的 serialize 方法。

实体

public class InfoEntity {
    int id;
    String msg;

    public void setId(int id) {
        this.id = id;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public int getId() {
        return id;
    }

    public String getMsg() {
        return msg;
    }
}

序列化实现

 @Override
    public void configure(Map<String, ?> map, boolean b) {
        //获取配置
    }

    @Override
    public byte[] serialize(String topic, InfoEntity infoEntity) {
        try {
            if (infoEntity == null) {
                return null;
            } else {
                Integer id = infoEntity.getId();
                String msg = infoEntity.getMsg();
                int length = 0;
                byte[] bytes = null;
                if (msg != null) {
                    bytes = msg.getBytes("utf-8");
                    length = bytes.length;
                }
                ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
                buffer.putInt(id);
                buffer.putInt(length);
                buffer.put(bytes);
                return buffer.array();
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new SerializationException("序列化InfoEntity失败");
        }
    }

    @Override
    public void close() {

    }

调用

 public static void main(String[] args) {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lew1:9092");
        //key的反序列化器
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //value的反序列化器
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, InfoSerializer.class);
        KafkaProducer<String, InfoEntity> producer = new KafkaProducer<>(config);
        InfoEntity infoEntity = new InfoEntity();
        infoEntity.setId(111);
        infoEntity.setMsg("张三");
        ProducerRecord<String, InfoEntity> record = new ProducerRecord<>("gc_info", infoEntity.getMsg(), infoEntity);
        producer.send(record, (recordMetadata, exception) -> {
            System.out.println(recordMetadata.topic() +
                    "\t" + recordMetadata.partition() +
                    "\t" + recordMetadata.offset());
        });
        producer.close();
    }

1.1.4 自定义分区器

如果要自定义分区器,则需要 1. 首先开发Partitioner接口的实现类 2. 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")

默认分区代码 org.apache.kafka.clients.producer.internals.DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

分析:

  • 若有key,经过murmur2 算法算出hash再取绝对值,在对全部分区numPartitions取余得到分区数
  • 若无key
    • 可用分区availablePartitions大于0,再根据自增nextValue取绝对值再对可用分区取余得到分区
    • 若无可用分区,则根据自增nextValue取绝对值再对全部分区取余得到分区

1.1.5 拦截器

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要 用于实现Client端的定制化控制逻辑。 对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。

同时,Producer允许用户指定多个Interceptor按序作用于同一条 消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线 程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息 做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发 送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在 Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发 送效率。
  • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。 另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个 Interceptor可能抛出的异常记录到错误日志中而非在向上传递。

自定义拦截器:

  1. 实现ProducerInterceptor接口
    1. 在KafkaProducer的设置中设置自定义的拦截器

1.1.6 原理图

kafka08-生产者

kafka08-生产者

上一篇:Codeforces Round #499 (Div. 1) F. Tree


下一篇:[Codeforces 553E]Kyoya and Train(期望DP+Floyd+分治FFT)