什么是Flume
ApacheFlume是一个分布式的、可靠的、可用的系统,用于高效地收集、聚合和将大量来自不同来源的日志数据移动到一个集中的数据存储区。
系统要求
1. JDK 1.8 或以上版本
2. 内存、磁盘 空间充足
3. 代理使用的目录有读写权限
数据流动模型
数据源Source支持多种数据类型,采集到数据后经过Channel通道临时存储,包括 基于内存,Kafka,文件磁盘,然后通过Sink将数据进行落地存储;
Flume Source
主要支持以下几种类型
1. Kafka Source
可以消费kaka中topic中的消息,如果有多个kafka有多个源在运行,可以配置在以消费组的形式读取每一组分区中的topic信息;当前支持的kafak版本为
0.10.1.0或更高版本,配置参考:
tier1.sources.source1.type= = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
2. Avro Source
监听来自于Avro端口的事件流,比如另一个Flume作为数据源,配置参考:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
3. Exec Source
执行Unix上命令 生产数据做为Flume的数据源,配置参考:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
Flume Sinks
主要支持如下类型
SinkS
HDFS
Hive
Kafka
Avor
Flume Channels
主要支持如下类型
Channel
Memory Channel
JDBC Channel
Kafka Channel
File Channel
下载安装
安装非常简单:
Wget
http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
解压缩
Tar xvf apache-flume-1.9.0-bin.tar.gz -c /usr/flume
编写简单实例
需求说明:
模拟将服务器A上的磁盘日志复制采集到另一台服务器B的磁盘上
从需求上分析可知source 为 exec 类型 , channel 基于内存即可,sinks 为file_roll 类型.
操作步骤:
1. 在conf 里面新增配置文件 demo.conf
2. Vim demo.conf , 输入以下配置内容:
// 数据源配置
a1.sources = r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/website/logs/1.txt
a1.sources.r1.channels = c1
// 数据通道配置
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
//数据目标存储配置
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /usr/website/sinklogs
3. 启动flume 服务
bin/flume-ng agent --f conf/demo.conf --name a1 -Dflume.root.logger=INFO,console
4. 启动成功后在source 的目录下新建日志文件 1.txt ,然后输入字符串保存
5. 查看sinks 的磁盘目录 /usr/website/sinklogs 是否有生成的txt 日志文件,如果有说明数据已同步成功,同步的策略是source文件内容每变动一次都会全量的同步到sinks上.
当然,sinks 的类型也可以是kafka 消费者.
扫码或长按关注查看更多文章