Flume中从kafkasource分配数据到kafkaSink时,topic被覆盖问题

问题:

当kafkasource和kafkasink一起使用时, 传输到kafkasink的数据,被传到了kafkasource。
即陷入死循环,从 kafkasource 读取的数据被flume 重新传到kafkasource。

原因

当flume从kafka里读取的数据时,消息上会event headers结构。在events header中会自动带上,topic:‘topic名称’。
如:
Flume中从kafkasource分配数据到kafkaSink时,topic被覆盖问题
而kafka sink的 中 allowTopicOveride参数默认为true,即会使用topicheader中的topic覆盖我们在kafka sink 配置的的topic。

所以flume在输出的时候,会优先读取 从event header中读取到了topic,其次才是读取 Sink端的配置的topic。
导致flume在输出的时候kafka sink的topic被覆盖,数据被输出到kafka source的topic中了。

解决

方式1

如果kafka sink的topic是固定的,已知的。可以将allowTopicOverride参数设置为false,让header中的topic不覆盖sink配置的topic值

sink 配置设置:

a1.sinks.kafkaSink.allowTopicOverride = false

配置文件完整代码


#agent 的名称

#指定source组件,channel组件和sink组件的名称
# Name the components on this agent
agent.sources = kafkaSource
agent.channels = fileChannl
agent.sinks = kafkaSink

# 把组件连接起来
agent.sources.kafkaSource.channels = fileChannl
agent.sinks.kafkaSink.channel = fileChannl

#配置source组件
agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.batchSize= 1000
agent.sources.kafkaSource.batchDurationMillis = 1000
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2
agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1

#配置channel组件
agent.channels.fileChannl.type = memory
agent.channels.fileChannl.capacity=2048
agent.channels.fileChannl.transactionCapacity=1000

# 配置sink组件
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 指定topic名称
agent.sinks.kafkaSink.kafka.topic = test
# 指定kafka地址,多个节点地址使用逗号分割
agent.sinks.kafkaSink.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sinks.kafkaSink.allowTopicOverride = false

方式2

当kafka sink中的topic是未知的,动态的。即可能需要根据消息的内容,将消息分发到不同的topic中时,可以使用拦截器覆盖 header中的topic值。

使用正则表达式拦截器,抽取消息正文内容的值,将其覆盖到header 的topic中。

#配置拦截器
# 定义拦截器,抽取数据
agent.sources.kafkaSource.interceptors = i2 i1
# 设置拦截器类型,i1为正则表达式,
agent.sources.kafkaSource.interceptors.i1.type = regex_extractor
# 配置指定的数据,这样设置会在数据的header中增加topic=test
agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\\w+)"
agent.sources.kafkaSource.interceptors.i1.serializers = s1
agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic

# 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】
agent.sources.kafkaSource.interceptors.i2.type = static
agent.sources.kafkaSource.interceptors.i2.key = topic
agent.sources.kafkaSource.interceptors.i2.preserveExisting = false
agent.sources.kafkaSource.interceptors.i2.value = test

完整配置文件

#agent 的名称

#指定source组件,channel组件和sink组件的名称
# Name the components on this agent
agent.sources = kafkaSource
agent.channels = fileChannl
agent.sinks = kafkaSink

# 把组件连接起来
agent.sources.kafkaSource.channels = fileChannl
agent.sinks.kafkaSink.channel = fileChannl

#配置source组件
agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.batchSize= 1000
agent.sources.kafkaSource.batchDurationMillis = 1000
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2
agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1

#配置拦截器
# 定义拦截器,抽取数据
agent.sources.kafkaSource.interceptors = i2 i1
# 设置拦截器类型,i1为正则表达式,
agent.sources.kafkaSource.interceptors.i1.type = regex_extractor
# 配置指定的数据,这样设置会在数据的header中增加topic=test
agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\\w+)"
agent.sources.kafkaSource.interceptors.i1.serializers = s1
agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic

# 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】
agent.sources.kafkaSource.interceptors.i2.type = static
agent.sources.kafkaSource.interceptors.i2.key = topic
agent.sources.kafkaSource.interceptors.i2.preserveExisting = false
agent.sources.kafkaSource.interceptors.i2.value = test

#配置channel组件
agent.channels.fileChannl.type = file
agent.channels.fileChannl.checkpointDir = /data/filechannle_data/all_type_data/checkpoint
agent.channels.kafka2HdfsShow.dataDirs = /data/filechannle_data/all_type_data/data



# 配置sink组件
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 指定topic名称
agent.sinks.kafkaSink.kafka.topic = default
# 指定kafka地址,多个节点地址使用逗号分割
agent.sinks.kafkaSink.kafka.bootstrap.servers =bigdata01:9092,bigdata02:9092,bigdata03:9092

若配置不生效,请注意参数是否写错,如参数大小写不对等。

上一篇:SpringMVC配置了拦截器(interceptors)却显示不出css、js样式的解决办法


下一篇:Vue的axios封装