离线项目数据收集 Flume

离线项目数据收集

Flume

Apache版本下载地址:http://archive.apache.org/dist/flume/

CDH版本下载地址:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz

配置flume-env.sh的JAVA_HOME目录

随后配置环境变量

export FLUME_HOME=/hmaster/flume/apache-flume-1.8.0-bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$PATH:$FLUME_HOME/bin

1. Flume监控文件,上传到HDFS

配置

exec-hdfs-agent.sources = exec-source
exec-hdfs-agent.sinks = hdfs-sink
exec-hdfs-agent.channels = exec-momory-channel

 Describe/configure the source
 exec-hdfs-agent.sources.exec-source.type = exec
 exec-hdfs-agent.sources.exec-source.command = tail -F /home/hadoop/data/data.log
 exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c

 # Use a channel which buffers events in memory
 exec-hdfs-agent.channels.exec-momory-channel.type = memory


 # Describe the sink
 exec-hdfs-agent.sinks.hdfs-sink.type = hdfs
 exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:8020/flume/
 exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream
 exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
 exec-hdfs-agent.sinks.hdfs-sink.hdfs.batchSize = 10

 # Bind the source and sink to the channel
 exec-hdfs-agent.sources.exec-source.channels = exec-momory-channel
 exec-hdfs-agent.sinks.hdfs-sink.channel = exec-momory-channel

启动

flume-ng agent \
--name exec-hdfs-agent \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \
-Dflume.root.logger=INFO,console

2. Spooling监控文件夹内数据到HDFS

spooling-hdfs-agent.sources = spooling-source
spooling-hdfs-agent.sinks = hdfs-sink
spooling-hdfs-agent.channels = spooling-momory-channel

# Describe/configure the source
spooling-hdfs-agent.sources.spooling-source.type = spooldir
spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data

# Use a channel which buffers events in memory
spooling-hdfs-agent.channels.spooling-momory-channel.type = memory


# Describe the sink
spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/data/flume/spooling
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = org.apache.hadoop.io.compress.GzipCodec
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = events-
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30

# Bind the source and sink to the channel
spooling-hdfs-agent.sources.spooling-source.channels = spooling-momory-channel
spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-momory-channel

启动

flume-ng agent \
--name spooling-hdfs-agent \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-hdfs.conf \
-Dflume.root.logger=INFO,console

合并小文件
hdfs.rollInterval 30
hdfs.rollSize 1024
hdfs.rollCount 10
谁到了 谁先滚

3. Spooling监控文件夹内数据到HDFS 配置分区

.useLocalTimeStamp = true 启动分区时间

spooling-hdfs-agent.sources = spooling-source
spooling-hdfs-agent.sinks = hdfs-sink
spooling-hdfs-agent.channels = spooling-momory-channel

# Describe/configure the source
spooling-hdfs-agent.sources.spooling-source.type = spooldir
spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data

# Use a channel which buffers events in memory
spooling-hdfs-agent.channels.spooling-momory-channel.type = memory


# Describe the sink
spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/data/flume/logs/%m%d%H%M
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = org.apache.hadoop.io.compress.GzipCodec
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = page-views
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30


spooling-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.roundUnit = minute
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.round = true
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.roundValue = 1


# Bind the source and sink to the channel
spooling-hdfs-agent.sources.spooling-source.channels = spooling-momory-channel
spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-momory-channel

启动

flume-ng agent \
--name spooling-hdfs-agent \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-hdfspartition.conf \
-Dflume.root.logger=INFO,console

4. Taildir

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/data/spool_data/taildir_position.json

a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/data/taildir/test1/example.logger
a1.sources.r1.filegroups.f2 = /home/hadoop/data/taildir/test2/.*log.*

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

flume-ng agent \
--name a1 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \
-Dflume.root.logger=INFO,console

11 . 多channels 一进多出

#flume1


# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 44444

a1.sources.r1.selecttor.type = replicating


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 44445

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 44446

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#flume2

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 
a2.channels = c1 

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop001
a2.sources.r1.port = 44445

a2.channels.c1.type = memory

a2.sinks.k1.type = logger

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#flume3

# Name the components on this agent
a3.sources = r1
a3.sinks = k1 
a3.channels = c1 

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop001
a3.sources.r1.port = 44446

a3.channels.c1.type = memory

a3.sinks.k1.type = logger

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent \
--name a1 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume1.conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name a2 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume2.conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name a3 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume3.conf \
-Dflume.root.logger=INFO,console

12 . sinkfailover 高可用

#flume1


# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 44444

a1.sources.r1.selecttor.type = replicating

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 44445

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 44446

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.maxpenalty = 10000


# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#flume2

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 
a2.channels = c1 

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop001
a2.sources.r1.port = 44445

a2.channels.c1.type = memory

a2.sinks.k1.type = logger

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#flume3

# Name the components on this agent
a3.sources = r1
a3.sinks = k1 
a3.channels = c1 

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop001
a3.sources.r1.port = 44446

a3.channels.c1.type = memory

a3.sinks.k1.type = logger

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent \
--name a1 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume1.conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name a2 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume2.conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name a3 \
--conf  $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume3.conf \
-Dflume.root.logger=INFO,console

总结

Flume => 配置一个多个Agent
可插拔

上一篇:Pycharm from XXX import XXX 引入本地文件标红报错


下一篇:2021-03-22