前言
Apache Kafka是一款优秀的开源消息中间件,主要应用于活动跟踪、消息穿透、日志、流处理等场景。我们使用该产品时,首先应当需要了解该产品的特性,以及产品的说明。
但是由于官方文档较多,实际在使用的过程中,quick start
往往是我们接触的第一步,但是quick start
的配置实在是太过简陋,从而在实际的使用过程甚至在生产环境中发生一些严重的问题。话不多说,以下内容为实践中遇到的问题及解决办法详解。
正文
1、Kafka集群无法在跨网络的环境中正常工作
kafka在实际使用的过程中,同样的配置,在不同的网络环境下,可能会导致无法正常工作,我们应该分清内网和外网的区别。
在Kafka中涉及到网络的核心配置参数主要为两个
参数名 | 配置参考 | 说明 | 注意 |
---|---|---|---|
listeners | PLAINTEXT://hostname :9092 |
主要用来定义Kafka Broker的Listener的配置项。hostname 如果设置为0.0.0.0 则绑定所有的网卡地址;如果hostname 为空则绑定默认的网卡。如果没有配置hostname 则默认为java.net.InetAddress.getCanonicalHostName() 。 |
如果只设置该参数,那么就无法进行跨网络的访问,只有内网中的服务可以用 |
advertised.listeners | PLAINTEXT://hostname :9092 |
作用主要是向生产者和消费者公布主机名和端口。如果不设置该值,那么将会默认使用listeners 的值 |
在docker中或者在虚拟机上部署kafka集群,这种情况下是需要用到 advertised.listeners ,这样可以确保外部的生产者和消费者能够正确的使用kafka服务。 |
核心配置示例
listeners: INSIDE://:9092,OUTSIDE://0.0.0.0:9094
advertised.listeners: INSIDE://:9092,OUTSIDE://<网卡ip或主机名>:端口
listener.security.protocol.map: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
inter.broker.listener.name: "INSIDE"
说明:
listener.security.protocol.map
和inter.broker.listener.name
分别表示的是协议的传输方式(以上为明文)和监听的borker。
2、Kafka集群数据节点不均衡
错误的创建topic,可能会导致生产者无法均衡或随机的往kafka集群中推送数据,消费者无法均衡的消费数据,kafka集群中部分节点数据不均衡,系统不稳定,无法有效的高可用。
创建topic主要有两种方式
- 手动创建
bin/kafka-topics.sh --create --zookeeper {hostname:port} --replication-factor 3 --partitions 6 --topic test
-
replication-factor
表示副本数量,建议至少设置为2,一般是3,最高设置为4。合理的设置该数量可以保证系统更稳定(允许N-1个broker宕机),但是更多的副本(如果acks=all,则会造成较高的延时),系统磁盘的使用率会更高(一般若是为3,则相对于为2时,会占据更多50% 的磁盘空间) -
partitions
表示分区,kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。一般来说 Kafka 不能有太多的 Partition,一个broker不应该承载超过2000 到 4000 个partitions(考虑此broker上所有来自不同topics的partitions)。
同时,一个Kafka集群上brokers中所有的partitions总数最多不应超过20,000个,集群节点数量低于6的时候,我们通常设置的值为2* broker数。此准则基于的原理是:在有broker宕机后,zookeeper需要重新做选举。若是partitions数目过多,则需要执行大量的选举策略。
- 自动创建
需要将auto.create.topics.enable
设置为true
,kafka发现该topic不存在的话,会按默认配置自动创建topic。
配置示例:
num.partitions=6 #自动创建的partitions值为6
default.replication.factor=3 #自动创建的replication_factor值为3
auto.create.topics.enable=true # 开启自动创建
具体的数值设置,可以参考手动创建的中的详细说明,然后通过测试结果进行调整。
3、Kafka集群日志数据过大堆积磁盘
生产者在推送数据后,kafka通常会在data目录下对应的topic数据目录中生成日志信息,随着时间的增长,如果没有对日志作清理动作,那么必定导致磁盘的不可用。
清理策略
-
log.cleanup.policy=delete
,kafka日志的清理策略,默认是delete,就是根据配置的时间空间来清理日志;还可以配置成compact,当旧数据的回收时间或者尺寸限制到达时,会进行日志压缩。
# 需要自己根据实际情况设置
log.retention.bytes=-1
# 默认的保留时间是7天
log.retention.hours=168
值得注意的是log.retention.bytes
表示的是每个topic下每个partition保存数据的总量;注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。这项设置可以由每个topic设置时进行覆盖。
4、Kafka客户端的注意事项
以上是kafka常见的问题及解决办法,想要用好kafka,还必须要了解相对应的客户端提供的各种参数含义。
以下以go作为示例,采用的是sarama客户端中值得关注的如下:
生产者配置Producer.Partitioner
分区器的选择策略,同步异步模块的实现方式差异。
生产者: 主要分为同步模式和异步模式
- 同步模式:需等待返回成功后,阻塞其他逻辑执行
config := sarama.NewConfig() //实例化sarama的Config
config.Producer.Return.Successes = true //开启消息发送成功后通知
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
client,err := sarama.NewClient([]string{"127.0.0.1:9092"}, config) //初始化
defer client.Close()
if err != nil {panic(err)}
producer,err := sarama.NewSyncProducerFromClient(client) // 同步
if err!=nil {panic(err)}
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go sync message")})
if err != nil {
panic(err)
}
- 异步模式:生产者不等待成功直接返回,不阻塞其他逻辑执行
config := sarama.NewConfig() //实例化sarama的Config
config.Producer.Return.Successes = true //开启消息发送成功后通知
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器 可选择不同的策略,如轮询等
client, err := sarama.NewClient([]{"127.0.0.1:9092"}, config)
if err != nil {
panic(err)
}
producer, err := sarama.NewAsyncProducerFromClient //异步
if err != nil {
panic(err)
}
defer producer.Close()
producer.Input() <- &sarama.ProducerMessage{Topic: "test_topic", Key: nil, Value: sarama.StringEncoder("this is go async message")}
select {
case msg := <-producer.Successes(): // 如果config.Producer.Return.Successes 设置为false 那么无需获取报告状态,否则必须获取该状态
fmt.Printf("message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
fmt.Println("message failure: ", err)
default:
fmt.Println("message default",)
}
消费者
消费者配置
Offsets.Initial
偏移量的选择,从最旧还是最新的消息开始消费,只能是sarama.OffsetOldest
或sarama.OffsetNewest
。Offsets.ResetOffsets
:如果程序重启,true
表示不从上次中断的位置消费,false
表示从上次中断的位置消费。
总结
本文主要是记录在kafka开发过程中常见的问题解决办法,通过不断总结积累问题解决办法,在遇到同类问题时能够快速的复盘,以最快的速度解决问题。