Kafka常见问题及解决方法

前言

Apache Kafka是一款优秀的开源消息中间件,主要应用于活动跟踪、消息穿透、日志、流处理等场景。我们使用该产品时,首先应当需要了解该产品的特性,以及产品的说明
但是由于官方文档较多,实际在使用的过程中,quick start往往是我们接触的第一步,但是quick start的配置实在是太过简陋,从而在实际的使用过程甚至在生产环境中发生一些严重的问题。话不多说,以下内容为实践中遇到的问题及解决办法详解。

正文

1、Kafka集群无法在跨网络的环境中正常工作

kafka在实际使用的过程中,同样的配置,在不同的网络环境下,可能会导致无法正常工作,我们应该分清内网和外网的区别。

在Kafka中涉及到网络的核心配置参数主要为两个

参数名 配置参考 说明 注意
listeners PLAINTEXT://hostname:9092 主要用来定义Kafka Broker的Listener的配置项。hostname如果设置为0.0.0.0则绑定所有的网卡地址;如果hostname为空则绑定默认的网卡。如果没有配置hostname则默认为java.net.InetAddress.getCanonicalHostName() 如果只设置该参数,那么就无法进行跨网络的访问,只有内网中的服务可以用
advertised.listeners PLAINTEXT://hostname:9092 作用主要是向生产者和消费者公布主机名和端口。如果不设置该值,那么将会默认使用listeners的值 在docker中或者在虚拟机上部署kafka集群,这种情况下是需要用到 advertised.listeners,这样可以确保外部的生产者和消费者能够正确的使用kafka服务。

核心配置示例

listeners: INSIDE://:9092,OUTSIDE://0.0.0.0:9094  
advertised.listeners: INSIDE://:9092,OUTSIDE://<网卡ip或主机名>:端口   
listener.security.protocol.map: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"  
inter.broker.listener.name: "INSIDE"

说明:listener.security.protocol.mapinter.broker.listener.name分别表示的是协议的传输方式(以上为明文)和监听的borker。

2、Kafka集群数据节点不均衡

错误的创建topic,可能会导致生产者无法均衡或随机的往kafka集群中推送数据,消费者无法均衡的消费数据,kafka集群中部分节点数据不均衡,系统不稳定,无法有效的高可用。

创建topic主要有两种方式

  • 手动创建
    bin/kafka-topics.sh --create --zookeeper {hostname:port} --replication-factor 3 --partitions 6 --topic test
  1. replication-factor表示副本数量,建议至少设置为2,一般是3,最高设置为4。合理的设置该数量可以保证系统更稳定(允许N-1个broker宕机),但是更多的副本(如果acks=all,则会造成较高的延时),系统磁盘的使用率会更高(一般若是为3,则相对于为2时,会占据更多50% 的磁盘空间)
  2. partitions表示分区,kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。一般来说 Kafka 不能有太多的 Partition,一个broker不应该承载超过2000 到 4000 个partitions(考虑此broker上所有来自不同topics的partitions)。
    同时,一个Kafka集群上brokers中所有的partitions总数最多不应超过20,000个,集群节点数量低于6的时候,我们通常设置的值为2* broker数。此准则基于的原理是:在有broker宕机后,zookeeper需要重新做选举。若是partitions数目过多,则需要执行大量的选举策略。
  • 自动创建
    需要将auto.create.topics.enable设置为true,kafka发现该topic不存在的话,会按默认配置自动创建topic。
    配置示例:
num.partitions=6 #自动创建的partitions值为6
default.replication.factor=3 #自动创建的replication_factor值为3
auto.create.topics.enable=true # 开启自动创建

具体的数值设置,可以参考手动创建的中的详细说明,然后通过测试结果进行调整。

3、Kafka集群日志数据过大堆积磁盘

生产者在推送数据后,kafka通常会在data目录下对应的topic数据目录中生成日志信息,随着时间的增长,如果没有对日志作清理动作,那么必定导致磁盘的不可用。

清理策略

  • log.cleanup.policy=delete,kafka日志的清理策略,默认是delete,就是根据配置的时间空间来清理日志;还可以配置成compact,当旧数据的回收时间或者尺寸限制到达时,会进行日志压缩。
# 需要自己根据实际情况设置
log.retention.bytes=-1
# 默认的保留时间是7天
log.retention.hours=168

值得注意的是log.retention.bytes表示的是每个topic下每个partition保存数据的总量;注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。这项设置可以由每个topic设置时进行覆盖。

4、Kafka客户端的注意事项

以上是kafka常见的问题及解决办法,想要用好kafka,还必须要了解相对应的客户端提供的各种参数含义。
以下以go作为示例,采用的是sarama客户端中值得关注的如下:
生产者配置Producer.Partitioner分区器的选择策略,同步异步模块的实现方式差异。

生产者: 主要分为同步模式和异步模式

  1. 同步模式:需等待返回成功后,阻塞其他逻辑执行
  config := sarama.NewConfig()  //实例化sarama的Config
  config.Producer.Return.Successes = true  //开启消息发送成功后通知
  config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
  client,err := sarama.NewClient([]string{"127.0.0.1:9092"}, config) //初始化
  defer client.Close()
  if err != nil {panic(err)}
  producer,err := sarama.NewSyncProducerFromClient(client) // 同步
  if err!=nil {panic(err)}
  partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go sync message")})
  if err != nil {
     panic(err)
  }
  1. 异步模式:生产者不等待成功直接返回,不阻塞其他逻辑执行
   config := sarama.NewConfig() //实例化sarama的Config
   config.Producer.Return.Successes = true //开启消息发送成功后通知
   config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
   client, err := sarama.NewClient([]{"127.0.0.1:9092"}, config)
   if err != nil {
       panic(err)
   }
   producer, err := sarama.NewAsyncProducerFromClient //异步
   if err != nil {
       panic(err)
   }
   defer producer.Close()
   producer.Input() <- &sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go async message")}
   select {
           case msg := <-producer.Successes(): // 如果config.Producer.Return.Successes 设置为false 那么无需获取报告状态,否则必须获取该状态
               fmt.Printf("message successes: [%s]\n",msg.Value)
           case err := <-producer.Errors():
               fmt.Println("message failure: ", err)
           default:
               fmt.Println("message default",)
   }

消费者

消费者配置Offsets.Initial偏移量的选择,从最旧还是最新的消息开始消费,只能是sarama.OffsetOldestsarama.OffsetNewest
Offsets.ResetOffsets :如果程序重启,true表示不从上次中断的位置消费,false表示从上次中断的位置消费。

总结

本文主要是记录在kafka开发过程中常见的问题解决办法,通过不断总结积累问题解决办法,在遇到同类问题时能够快速的复盘,以最快的速度解决问题。

上一篇:[Golang] kafka集群搭建和golang版生产者和消费者


下一篇:【解决了一个问题】腾讯云中使用ckafka生产消息时出现“kafka server: Message contents does not match its CRC.”错误