1. 写在前面
flume-ng高可用长在大数据处理环节第一个出现,对于处理日志文件有很好的作用,本篇博客将详细介绍flume-ng的高可用负载均衡搭建
2. flume-ng高可用负载均衡描述
在一般情况下,Flume-ng高可用采用server和client模式,server主要负责数据源source,client主要负责数据流向sink,client需要将server的信息统一管理,server和sink之间数据连接通过channels
3. 配置server,这里配置三个server
flume-server1.properties
#set Agent name
agent.sources = r1
agent.channels = c1
agent.sinks = k1
#set channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1024000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80
# other node,nna to nns
agent.sources.r1.type = avro
agent.sources.r1.bind = ynjz003
agent.sources.r1.port = 52020
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = static
agent.sources.r1.interceptors.i1.key = Collector
agent.sources.r1.interceptors.i1.value = ynjz003
agent.sources.r1.channels = c1
#set sink to hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList = ynjz003:9092,ynjz004:9092,ynjz005:9092,ynjz006:9092,ynjz007:9092,ynjz008:9092,ynjz009:9092
agent.sinks.k1.topic = flume-kafka-meijs33
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
flume-server2.properties
#set Agent name
agent.sources = r1
agent.channels = c1
agent.sinks = k1
#set channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1024000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80
# other node,nna to nns
agent.sources.r1.type = avro
agent.sources.r1.bind = ynjz004
agent.sources.r1.port = 52020
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = static
agent.sources.r1.interceptors.i1.key = Collector
agent.sources.r1.interceptors.i1.value = ynjz004
agent.sources.r1.channels = c1
#set sink to hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList = ynjz003:9092,ynjz004:9092,ynjz005:9092,ynjz006:9092,ynjz007:9092,ynjz008:9092,ynjz009:9092
agent.sinks.k1.topic = flume-kafka-meijs33
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
flume-server3.properties
#set Agent name
agent.sources = r1
agent.channels = c1
agent.sinks = k1
#set channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1024000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80
# other node,nna to nns
agent.sources.r1.type = avro
agent.sources.r1.bind = ynjz005
agent.sources.r1.port = 52020
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = static
agent.sources.r1.interceptors.i1.key = Collector
agent.sources.r1.interceptors.i1.value = ynjz005
agent.sources.r1.channels = c1
#set sink to hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList = ynjz003:9092,ynjz004:9092,ynjz005:9092,ynjz006:9092,ynjz007:9092,ynjz008:9092,ynjz009:9092
agent.sinks.k1.topic = flume-kafka-meijs33
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
可以看出多个server配置的规律
3. 配置client,这里也配置一个client示例
flume-client.properties
#agent1 name
agent.channels = c1
agent.sources = r1
agent.sinks = k1 k2 k3 k4 k5 k6 k7
#set gruop
agent.sinkgroups = g1
#set channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 102400
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80
agent.sources.r1.type = com.cbo.flume.source.zip.SpoolDirectorySource
agent.sources.r1.channels = c1
agent.sources.r1.spoolDir = /data/ynjz/workspace/zip
agent.sources.r1.fileHeader = true
agent.sources.r1.channels = c1
agent.sources.r1.flumeBatchSize=1000
agent.sources.r1.useFlumeEventFormat=false
agent.sources.r1.restart=true
agent.sources.r1.batchSize=1000
agent.sources.r1.batchTimeout=3000
agent.sources.r1.channels=c1
# set sink1
agent.sinks.k1.channel = c1
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = ynjz003
agent.sinks.k1.port = 52020
# set sink2
agent.sinks.k2.channel = c1
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = ynjz004
agent.sinks.k2.port = 52020
# set sink3
agent.sinks.k3.channel = c1
agent.sinks.k3.type = avro
agent.sinks.k3.hostname = ynjz005
agent.sinks.k3.port = 52020
# set sink4
agent.sinks.k1.channel = c1
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = ynjz006
agent.sinks.k1.port = 52020
# set sink5
agent.sinks.k2.channel = c1
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = ynjz007
agent.sinks.k2.port = 52020
# set sink6
agent.sinks.k3.channel = c1
agent.sinks.k3.type = avro
agent.sinks.k3.hostname = ynjz008
agent.sinks.k3.port = 52020
# set sink7
agent.sinks.k3.channel = c1
agent.sinks.k3.type = avro
agent.sinks.k3.hostname = ynjz009
agent.sinks.k3.port = 52020
#set sink group
agent.sinkgroups.g1.sinks = k1 k2 k3 k4 k5 k6 k7
#set failover
agent.sinkgroups.g1.processor.type = failover
agent.sinkgroups.g1.processor.priority.k1 = 10
agent.sinkgroups.g1.processor.priority.k2 = 10
agent.sinkgroups.g1.processor.priority.k3 = 10
agent.sinkgroups.g1.processor.priority.k4 = 10
agent.sinkgroups.g1.processor.priority.k5 = 10
agent.sinkgroups.g1.processor.priority.k6 = 10
agent.sinkgroups.g1.processor.priority.k7 = 10
agent.sinkgroups.g1.processor.maxpenalty = 10000
这里需要注意sinkgroups配置,flume sinkgroups在常用的应用中有两种方式failover
和load_balance
,failover
可以理解为容错机制,在上面的配置中sink只会往一个kafka写入数据,但一个kafka挂了,failover
机制会立马选举一个出来,所以这里的容错机制很完善,但是应对大数据量会影响数据写入的能力,所以建议在大数据量的时候采用load_balance
配置,下面时配置示例
#agent1 name
agent.channels = c1
agent.sources = r1
agent.sinks = k1 k2 k3 k4 k5 k6 k7
#set gruop
agent.sinkgroups = g1
#set channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 102400
agent.channels.c1.transactionCapacity = 24000
agent.channels.c1.byteCapacity=134217728
agent.channels.c1.byteCapacityBufferPercentage=80
agent.sources.r1.type = com.cbo.flume.source.zip.SpoolDirectorySource
agent.sources.r1.channels = c1
agent.sources.r1.spoolDir = /data/4G
agent.sources.r1.includePattern = ([^ ]*\.zip$)
agent.sources.r1.fileHeader = true
agent.sources.r1.channels = c1
agent.sources.r1.flumeBatchSize=10000
agent.sources.r1.useFlumeEventFormat=false
agent.sources.r1.restart=true
agent.sources.r1.batchSize=10000
agent.sources.r1.batchTimeout=3000
agent.sources.r1.channels=c1
# set sink1
agent.sinks.k1.channel = c1
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = ynjz003
agent.sinks.k1.port = 52020
# set sink2
agent.sinks.k2.channel = c1
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = ynjz004
agent.sinks.k2.port = 52020
# set sink3
agent.sinks.k3.channel = c1
agent.sinks.k3.type = avro
agent.sinks.k3.hostname = ynjz005
agent.sinks.k3.port = 52020
# set sink4
agent.sinks.k4.channel = c1
agent.sinks.k4.type = avro
agent.sinks.k4.hostname = ynjz006
agent.sinks.k4.port = 52020
# set sink5
agent.sinks.k5.channel = c1
agent.sinks.k5.type = avro
agent.sinks.k5.hostname = ynjz007
agent.sinks.k5.port = 52020
# set sink6
agent.sinks.k6.channel = c1
agent.sinks.k6.type = avro
agent.sinks.k6.hostname = ynjz008
agent.sinks.k6.port = 52020
# set sink7
agent.sinks.k7.channel = c1
agent.sinks.k7.type = avro
agent.sinks.k7.hostname = ynjz009
agent.sinks.k7.port = 52020
#set sink group
agent.sinkgroups.g1.sinks = k1 k2 k3 k4 k5 k6 k7
#set load_balance
agent.sinkgroups.g1.processor.type=load_balance
agent.sinkgroups.g1.processor.backoff=true
agent.sinkgroups.g1.processor.selector=random
在实际应用中多个client基本上一直,只有监控文件目录的配置不同即可agent.sources.r1.spoolDir = /data/4G
####4. 启动flume-ng高可用集群
首先启动每个server,每个server只是配置文件flume-server-data.properties
不同:./bin/flume-ng agent --name agent --conf conf --conf-file conf/flume-server-data.properties -Dflume.root.logger=INFO,console > /data/ynjz/workspace/flume-server-data.log 2>&1 &
启动每个client,,每个server只是配置文件flume-client-data.properties
不同:./bin/flume-ng agent --name agent --conf conf --conf-file conf/flume-client-data.properties -Dflume.root.logger=INFO,console > /data/ynjz/workspace/flume-client-data.log 2>&1 &
在平时应用中,可以随时停止client,但停止了server没起而启动client会导致报错