数据采集flume kafka

需求:采集8类日志数据,并且进入es展示:

方案制定:目前数据采集通用flume+kafka模式,所以采用原有模式,一类服务进入一个topic,然后通过logstash进行数据清洗,最后进入es进行展示。

flume采用tadir 读取数据源,memory 进行缓存,kafka进行sink

 

a1.sources = s1 s2 s3 s4 s5 s6 s7 s8 
a1.channels = c1 c2 c3 c4 c5 c6 c7 c8 
a1.sinks = k1 k2 k3 k4 k5 k6 k7 k8 

a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /home/es/.*.log
a1.sources.s1.channels = c1

a1.sources.s2.type = TAILDIR
a1.sources.s2.filegroups = f1 f2
a1.sources.s2.filegroups.f1 = /home/adm/.*.log
a1.sources.s2.channels = c2

a1.sources.s3.type = TAILDIR
a1.sources.s3.filegroups = f1 f2
a1.sources.s3.filegroups.f1 = /home/bas/.*.log
a1.sources.s3.channels = c3

a1.sources.s4.type = TAILDIR
a1.sources.s4.filegroups = f1 f2
a1.sources.s4.filegroups.f1 = /home/cha/.*.log
a1.sources.s4.channels = c2

a1.sources.s5.type = TAILDIR
a1.sources.s5.filegroups = f1 f2
a1.sources.s5.filegroups.f1 = /home/anog/.*.log
a1.sources.s5.channels = c5

a1.sources.s6.type = TAILDIR
a1.sources.s6.filegroups = f1 f2
a1.sources.s6.filegroups.f1 = /home/dip/es_okeano/3.27.20.38/config_log/.*.log
a1.sources.s6.channels = c6

a1.sources.s7.type = TAILDIR
a1.sources.s7.filegroups = f1 f2
a1.sources.s7.filegroups.f1 = /home/oau/.*.log
a1.sources.s7.channels = c7

a1.sources.s8.type = TAILDIR
a1.sources.s8.filegroups = f1 f2
a1.sources.s8.filegroups.f1 = /home/z/.*.log
a1.sources.s8.channels = c8

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000

a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 1000

a1.channels.c4.type = memory
a1.channels.c4.capacity = 10000
a1.channels.c4.transactionCapacity = 1000

a1.channels.c5.type = memory
a1.channels.c5.capacity = 10000
a1.channels.c5.transactionCapacity = 1000

a1.channels.c6.type = memory
a1.channels.c6.capacity = 10000
a1.channels.c6.transactionCapacity = 1000

a1.channels.c7.type = memory
a1.channels.c7.capacity = 10000
a1.channels.c7.transactionCapacity = 1000

a1.channels.c8.type = memory
a1.channels.c8.capacity = 10000
a1.channels.c8.transactionCapacity = 1000

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ws_activity
a1.sinks.k1.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.channel = c2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = ws_admin
a1.sinks.k2.kafka.bootstrap.servers = 172.56.10.23:9092
a1.sinks.k2.kafka.flumeBatchSize = 5
a1.sinks.k2.kafka.producer.acks = 1

a1.sinks.k3.channel = c3
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k3.kafka.topic = ws_authorization_oauth
a1.sinks.k3.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k3.kafka.flumeBatchSize = 5
a1.sinks.k3.kafka.producer.acks = 1

a1.sinks.k4.channel = c4
a1.sinks.k4.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k4.kafka.topic = ws_base
a1.sinks.k4.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k4.kafka.flumeBatchSize = 5
a1.sinks.k4.kafka.producer.acks = 1

a1.sinks.k5.channel = c5
a1.sinks.k5.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k5.kafka.topic = ws_channel
a1.sinks.k5.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k5.kafka.flumeBatchSize = 5
a1.sinks.k5.kafka.producer.acks = 1

a1.sinks.k6.channel = c6
a1.sinks.k6.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k6.kafka.topic = ws_config
a1.sinks.k6.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k6.kafka.flumeBatchSize = 5
a1.sinks.k6.kafka.producer.acks = 1

a1.sinks.k7.channel = c7
a1.sinks.k7.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k7.kafka.topic = ws_material
a1.sinks.k7.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k7.kafka.flumeBatchSize = 5
a1.sinks.k7.kafka.producer.acks = 1

a1.sinks.k8.channel = c8
a1.sinks.k8.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k8.kafka.topic = ws_zuul
a1.sinks.k8.kafka.bootstrap.servers =172.56.10.23:9092
a1.sinks.k8.kafka.flumeBatchSize = 5
a1.sinks.k8.kafka.producer.acks = 1

 

数据采集flume kafka

上一篇:java代理使用 apache ant实现文件压缩/解压缩


下一篇:Eclipse Java注释模板设置详解