离线项目数据收集
Flume
Apache版本下载地址:http://archive.apache.org/dist/flume/
CDH版本下载地址:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
配置flume-env.sh的JAVA_HOME目录
随后配置环境变量
export FLUME_HOME=/hmaster/flume/apache-flume-1.8.0-bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
1. Flume监控文件,上传到HDFS
配置
exec-hdfs-agent.sources = exec-source
exec-hdfs-agent.sinks = hdfs-sink
exec-hdfs-agent.channels = exec-momory-channel
Describe/configure the source
exec-hdfs-agent.sources.exec-source.type = exec
exec-hdfs-agent.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c
# Use a channel which buffers events in memory
exec-hdfs-agent.channels.exec-momory-channel.type = memory
# Describe the sink
exec-hdfs-agent.sinks.hdfs-sink.type = hdfs
exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:8020/flume/
exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream
exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
exec-hdfs-agent.sinks.hdfs-sink.hdfs.batchSize = 10
# Bind the source and sink to the channel
exec-hdfs-agent.sources.exec-source.channels = exec-momory-channel
exec-hdfs-agent.sinks.hdfs-sink.channel = exec-momory-channel
启动
flume-ng agent \
--name exec-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \
-Dflume.root.logger=INFO,console
2. Spooling监控文件夹内数据到HDFS
spooling-hdfs-agent.sources = spooling-source
spooling-hdfs-agent.sinks = hdfs-sink
spooling-hdfs-agent.channels = spooling-momory-channel
# Describe/configure the source
spooling-hdfs-agent.sources.spooling-source.type = spooldir
spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data
# Use a channel which buffers events in memory
spooling-hdfs-agent.channels.spooling-momory-channel.type = memory
# Describe the sink
spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/data/flume/spooling
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = org.apache.hadoop.io.compress.GzipCodec
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = events-
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
# Bind the source and sink to the channel
spooling-hdfs-agent.sources.spooling-source.channels = spooling-momory-channel
spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-momory-channel
启动
flume-ng agent \
--name spooling-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-hdfs.conf \
-Dflume.root.logger=INFO,console
合并小文件
hdfs.rollInterval 30
hdfs.rollSize 1024
hdfs.rollCount 10
谁到了 谁先滚
3. Spooling监控文件夹内数据到HDFS 配置分区
.useLocalTimeStamp = true 启动分区时间
spooling-hdfs-agent.sources = spooling-source
spooling-hdfs-agent.sinks = hdfs-sink
spooling-hdfs-agent.channels = spooling-momory-channel
# Describe/configure the source
spooling-hdfs-agent.sources.spooling-source.type = spooldir
spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data
# Use a channel which buffers events in memory
spooling-hdfs-agent.channels.spooling-momory-channel.type = memory
# Describe the sink
spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/data/flume/logs/%m%d%H%M
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = org.apache.hadoop.io.compress.GzipCodec
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = page-views
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.roundUnit = minute
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.round = true
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.roundValue = 1
# Bind the source and sink to the channel
spooling-hdfs-agent.sources.spooling-source.channels = spooling-momory-channel
spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-momory-channel
启动
flume-ng agent \
--name spooling-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-hdfspartition.conf \
-Dflume.root.logger=INFO,console
4. Taildir
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/data/spool_data/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/data/taildir/test1/example.logger
a1.sources.r1.filegroups.f2 = /home/hadoop/data/taildir/test2/.*log.*
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \
-Dflume.root.logger=INFO,console
11 . 多channels 一进多出
#flume1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 44444
a1.sources.r1.selecttor.type = replicating
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 44445
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 44446
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#flume2
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop001
a2.sources.r1.port = 44445
a2.channels.c1.type = memory
a2.sinks.k1.type = logger
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#flume3
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop001
a3.sources.r1.port = 44446
a3.channels.c1.type = memory
a3.sinks.k1.type = logger
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume1.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a2 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume2.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a3 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/channelreplicating/flume3.conf \
-Dflume.root.logger=INFO,console
12 . sinkfailover 高可用
#flume1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 44444
a1.sources.r1.selecttor.type = replicating
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 44445
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop001
a1.sinks.k2.port = 44446
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.maxpenalty = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#flume2
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop001
a2.sources.r1.port = 44445
a2.channels.c1.type = memory
a2.sinks.k1.type = logger
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#flume3
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop001
a3.sources.r1.port = 44446
a3.channels.c1.type = memory
a3.sinks.k1.type = logger
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume1.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a2 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume2.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name a3 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/sinkfailover/flume3.conf \
-Dflume.root.logger=INFO,console
总结
Flume => 配置一个多个Agent
可插拔