从架构的角度看Kafka(四)

本文章内容皆出自作者阅读胡夕著Apache Kafka 实战一书的总结,可能有理解错误,仅作为参考。如有侵权,笔者将会删除它们。

  注:这篇文章是Kafka文章的第三篇,笔者建议从头看,如果读者感兴趣可以先看第三篇
从架构的角度看Kafka(三)
  注:这篇文章是基于Kafka 10.0.0,如果读者的版本不一致,请查看版本是否支持这些。

 1.1 自定义分区

     对于Java版本的producer,它会对消息的Key进行murmur2哈希运算,然后再对分区数取模。murmur是一个很优秀的hash算法,具有很好地散列分布,我们也就不展开讲了。如果我们想自定义分区的话我们需要自己写一个分区策略类然后实现partitioner接口,重写partition方法,然后在构建producer时,我们将我们写好的策略类的全限定名传入properties的partitioner.class属性中,具体用法我们在第一篇文章中讲过,我们在这里就不再赘述了。大致代码如下。xx为我们自定义类的全限定名,值得注意的是我们只传这个参数并不能使用这个producer,详情请看第一篇文章。

		Properties pro = new Properties();
        pro.put("partitioner.class","xx.xx.xx.xx");
        Producer kafkaProducer = new KafkaProducer(pro);

 1.2 拦截器

    我个人认为拦截器是一个很重要的功能,它可以在我们发消息时和收到相应时进行一些特殊化处理。而且我们可以定义多个拦截器,形成一条处理链。这里用了两种很重要的设计模式,策略模式和责任链模式,我个人感觉这两个设计模式是很常用很优雅的。
    如果我们想添加拦截器,我们需要自己建一个类并且实现ProducerInterceptor类,然后重写以下方法。
  onSend(ProducerRecord)它被调用在send方法中,在消息被序列化计算分区前执行,用户可以在方法中对消息进行任意修改,但是作者不建议修改topic信息和分区信息,否则可能会影响分区的计算。
  onAcknowledgement(RecordMetatedata,Exception)。该方法被调用在消息被响应之前调用,它执行在IO线程,所以用户尽量不要写太繁琐的逻辑以免影响生产端性能。
  close() 用来关闭资源。
作者这里给大家简单举个栗子,代码如下。xxx为我们自己定义的拦截器类的全限定名。

		Properties pro = new Properties();
        List<String> interceptors = new ArrayList<>();
        interceptors.add("xx.xx.xxx");
        pro.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
        Producer kafkaProducer = new KafkaProducer(pro);
        //生产消息

		//只有我们调用close时拦截器的close才会调用
        kafkaProducer.close();

 1.3 无消息丢失配置

    问题1:上篇文章我们讲了消息的生产和消息的发送时异步的,我们生产消息时,主线程会将消息先放入缓冲区,如果producer异常关闭了,这样就会导致缓冲区数据丢失了。
    这个问题也很好解决,我们直接同步执行,这样就不会出现上述问题的了,但这是极端的,会导致producer的吞吐变低。
    问题2:因为Kafka生产者需要通过网络给Kafka发送消息,这样难免会因为网络波动导致消息发送失败,再加上Kafka是有重试机制的,这样可能会使我们的消息乱序。在很多场景中都需要保证很强的有序性,所以这也是一个严重的问题。
    综上问题书中给出了一套安全且效率较高的配置如下。
从架构的角度看Kafka(四)
从架构的角度看Kafka(四)
    我们逐一来讲解这几个参数的意思,让读者更加容易理解。

1.3.1 producer参数

  1、block.on.buffer.full
    这是一个过时的配置,但是书中还是建议设置为true,这样当缓冲区数据满了,生产线程阻塞停止接收消息,而不会抛异常。在Kafka10.0.0以后可以用max.block.ms代替。
  2、asks和retries
    上篇文章我们已经详细讲过这个参数了,可以在顶部查看上一篇文章链接。这里retries设置了最大整数,可以理解为无限重试。值得注意的是Kafka只会对可恢复的异常,所以不会造成很大的不必要的浪费。
  3、max.in.flight.request.per.connection
    这个参数是为了防止topic的同一分区消息乱序的,可以理解为一个broker可以允许连接几个未响应的请求。这样使得当一个消息发送后无限重试不会有其他请求发送。
    值得注意的是再调用send时一定要调用带回调的方法,不然producer不会理会这些配置。

1.3.2 broker参数

  1、unclean.leader.election.enable
    这个是控制非ISR中的节点是否可以被选举为leader,显然非ISR集合的节点数据是不完整的,如果要保证消息的可靠性,最好关掉。
  2、replication.factor
    设置备份数据的份数,书中根据hadoop的三备份原则,所以设置了>=3
  3、min.insyc.replicas
    规定当消息写入成功几个备份才能返回成功,当然只有在ack=all或者-1时他才会生效。并且第二个参数必须大于这个参数。不然永远达不到这个设定,这个值一般设置为replication.factor-1这样可以容忍一个副本挂掉。不至于使集群可用性太差。

 2、消息压缩

    众所周知压缩消息可以磁盘和带宽占用,因为他们传输的数据大小变小了,肯定带来的是更快的速度。但是压缩需要消耗CPU,所以这个就像我们算法中空间换时间一样。它是用cpu换时间的。Kafka在producer端对消息进行压缩,然后broker端通常是不需要解压的,等consumer消费消息时再解压。不通常的情况笔者目前只知道两个。broker端的压缩算法和producer的压缩算法不统一,还有就是broker需要对消息格式进行转换兼容其他版本。所以尽量去配置好不要去消耗不必要的资源。
    Kafka支持三种压缩算法GZIP、Snappy和Lz4,他们通过conpression.type来配置。producer和broker都有这个参数,如果broker没有配置默认使用producer的压缩算法。对于三个算法的速度比较LZ4>Snappy>GZIP,当然这仅仅是在Kafka中是这样的,实际上这三种算法各有各的优点,我们就不一一赘述了。这里作者也贴出书中给出的batch大小对压缩时间的影响和吞吐量的比较
从架构的角度看Kafka(四)

从架构的角度看Kafka(四)
    图中可以看到LZ4无论从哪个角度看都是比较优秀的。但是对于是否开启压缩需要根据服务器的情况考虑,上面我们也说了压缩是需要消耗很多CPU时间片的。如果你的服务器CPU使用率很低,从而提升Kafka的吞吐量,防止CPU摸鱼。反之如果CPU使用率已经很高了,笔者不建议开始压缩,因为这样肯定会给CPU带来不必要的压力,而且也并不一定会提升吞吐量甚至可能降低。

 总结

    文章中总结了Kafka消息的分区策略、生产端拦截器的使用、解决消息可靠性问题以及Kafka的消息压缩功能。以上文章仅供读者参考,笔者技术一般只有一颗分享的心,不想给读者错误的引导,所以希望读者带着问题来看文章。谢谢

上一篇:rabbitMq完整通信(一)---producer


下一篇:Kafka 如果丢了消息,怎么处理的?