问题:
当kafkasource和kafkasink一起使用时, 传输到kafkasink的数据,被传到了kafkasource。
即陷入死循环,从 kafkasource 读取的数据被flume 重新传到kafkasource。
原因
当flume从kafka里读取的数据时,消息上会event headers结构。在events header中会自动带上,topic:‘topic名称’。
如:
而kafka sink的 中 allowTopicOveride参数默认为true,即会使用topicheader中的topic覆盖我们在kafka sink 配置的的topic。
所以flume在输出的时候,会优先读取 从event header中读取到了topic,其次才是读取 Sink端的配置的topic。
导致flume在输出的时候kafka sink的topic被覆盖,数据被输出到kafka source的topic中了。
解决
方式1
如果kafka sink的topic是固定的,已知的。可以将allowTopicOverride参数设置为false,让header中的topic不覆盖sink配置的topic值
sink 配置设置:
a1.sinks.kafkaSink.allowTopicOverride = false
配置文件完整代码
#agent 的名称
#指定source组件,channel组件和sink组件的名称
# Name the components on this agent
agent.sources = kafkaSource
agent.channels = fileChannl
agent.sinks = kafkaSink
# 把组件连接起来
agent.sources.kafkaSource.channels = fileChannl
agent.sinks.kafkaSink.channel = fileChannl
#配置source组件
agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.batchSize= 1000
agent.sources.kafkaSource.batchDurationMillis = 1000
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2
agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1
#配置channel组件
agent.channels.fileChannl.type = memory
agent.channels.fileChannl.capacity=2048
agent.channels.fileChannl.transactionCapacity=1000
# 配置sink组件
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 指定topic名称
agent.sinks.kafkaSink.kafka.topic = test
# 指定kafka地址,多个节点地址使用逗号分割
agent.sinks.kafkaSink.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sinks.kafkaSink.allowTopicOverride = false
方式2
当kafka sink中的topic是未知的,动态的。即可能需要根据消息的内容,将消息分发到不同的topic中时,可以使用拦截器覆盖 header中的topic值。
使用正则表达式拦截器,抽取消息正文内容的值,将其覆盖到header 的topic中。
#配置拦截器
# 定义拦截器,抽取数据
agent.sources.kafkaSource.interceptors = i2 i1
# 设置拦截器类型,i1为正则表达式,
agent.sources.kafkaSource.interceptors.i1.type = regex_extractor
# 配置指定的数据,这样设置会在数据的header中增加topic=test
agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\\w+)"
agent.sources.kafkaSource.interceptors.i1.serializers = s1
agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic
# 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】
agent.sources.kafkaSource.interceptors.i2.type = static
agent.sources.kafkaSource.interceptors.i2.key = topic
agent.sources.kafkaSource.interceptors.i2.preserveExisting = false
agent.sources.kafkaSource.interceptors.i2.value = test
完整配置文件
#agent 的名称
#指定source组件,channel组件和sink组件的名称
# Name the components on this agent
agent.sources = kafkaSource
agent.channels = fileChannl
agent.sinks = kafkaSink
# 把组件连接起来
agent.sources.kafkaSource.channels = fileChannl
agent.sinks.kafkaSink.channel = fileChannl
#配置source组件
agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.batchSize= 1000
agent.sources.kafkaSource.batchDurationMillis = 1000
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2
agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1
#配置拦截器
# 定义拦截器,抽取数据
agent.sources.kafkaSource.interceptors = i2 i1
# 设置拦截器类型,i1为正则表达式,
agent.sources.kafkaSource.interceptors.i1.type = regex_extractor
# 配置指定的数据,这样设置会在数据的header中增加topic=test
agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\\w+)"
agent.sources.kafkaSource.interceptors.i1.serializers = s1
agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic
# 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】
agent.sources.kafkaSource.interceptors.i2.type = static
agent.sources.kafkaSource.interceptors.i2.key = topic
agent.sources.kafkaSource.interceptors.i2.preserveExisting = false
agent.sources.kafkaSource.interceptors.i2.value = test
#配置channel组件
agent.channels.fileChannl.type = file
agent.channels.fileChannl.checkpointDir = /data/filechannle_data/all_type_data/checkpoint
agent.channels.kafka2HdfsShow.dataDirs = /data/filechannle_data/all_type_data/data
# 配置sink组件
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 指定topic名称
agent.sinks.kafkaSink.kafka.topic = default
# 指定kafka地址,多个节点地址使用逗号分割
agent.sinks.kafkaSink.kafka.bootstrap.servers =bigdata01:9092,bigdata02:9092,bigdata03:9092
若配置不生效,请注意参数是否写错,如参数大小写不对等。