Flume
Flume可以有效的从不同的源,收集、聚合移动大量日志数据到集中式数据存储
Flumed的优势
Flume可以将应用产生的数据存储到任何集中的存储器中,比如HDFS ,Hive,Hbase。
1.当收集数据的速度大于写入数据的时候 ,Flume也会在数据生产者和数据收容器间做出调增
2.支持各种接入资源数据的类型和接出数据类型
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JpPCMUSb-1610538847306)(C:\Users\Acer\Desktop\思维导图\flume简单结构图.png)]
WebSever为数据源,Source接受数据源,流向Channel作为临时缓冲,Sink不断地从Channel里面抽取数据,并将数据发送到存储、索引系统,或者发送到另一个Flume Agent
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a2uEInuC-1610538847310)(C:\Users\Acer\Desktop\思维导图\Flume结构图详解.png)]
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的地。Agent主要有三个部分组成:Source、Channel、Sink。
Event
Flume数据传输的基本单元,带有一个可选的消息头。如果是文本文件,通常是一行记录。Event从Source,流向Channel,再到Sink,Sink将数据写入目的地。
Source
Source是负责接收数据到Flume Agent的组件。Source可以接收各种类型和格式的日志数据。
Channel
Channel主要提供一个队列的功能。是位于Source和Sink之间的缓冲区。Source到Channel是完全事务性。
一旦事务中的所有事件全部都传递到Channel且提交成功,那么Source就将其标记为完成。如果事件传递失败,那么事务将会回滚。
Flume对于Channel提供了Memory Channel、 File Channel 、JDBC Channel、Kafka Channel 以及自定义Channel等。
Memory Channel 是内存中的队列,可以在不考虑数据丢失的情况下使用
File Channel 将所有事件都写入磁盘中 ,可以防止数据丢失。
Flume 支持事务,分为Put事务与Take事务。
Put事务:
从Source到Channel传输的事务 叫Put事务 。通过doPut将数据先写入临时缓冲区 putList;
再通过doCommit将批数据提交给Channel,会检查Channel内存队列是否足够合并,如果Channel内存队列空间不足,则回滚数据。
Take事务:
从Channel拉取事件数据到Sink的过程叫Take事务。通过doTake现将数据取到临时缓冲区taskList;再通过doCommit将事件数据发送Sink。如果数据全部发送数据成功,则清除临时缓冲区takeList。如果数据出现异常rollback将临时缓冲区takeList中的数据归还channel内存序列。
四 、拓扑结构
负责均衡
聚合模式 多用于落地HDFS
Flume优点:
1、可靠性
当节点出现故障时,日志能够传到其它节点上而不被丢失。
有三种保障:
end to end (收到数据agent首先将event落地到磁盘,当数据发送成功后,再删除,如果数据发送失败,可以重新发送。)
store on failure (将数据写入本地,待sink恢复,再次发送)
Best effort(发送接收都不保存)
2、可扩展性
Flume有三层架构
agent
collector
storage
3、可管理性
4、功能扩展性
Flume传输数据的基本单位是event。
只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
agent1.sources.source1.type=spooldir
agent1.sources.source1.spooldir=/usr/aboutyunlog
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader=false
sink滚动时间:是指sink要在规定的时间内落地的文件。
flume与Kafka整合
kafka做为数据源
kafka做数据源其实就是kafka消费者从kafka topic 读取文件。
//sources的类型是kafka
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
//管道选择了channel
tier1.sources.source1.channels = channel1
//可以一次读取的最大行数为5000
tier1.sources.source1.batchSize = 5000
//当batchSize没有达到最大值的时候,超过2000这个时间也会写入到channel
tier1.sources.source1.batchDurationMillis = 2000
//kafka作为数据源的broker的列表
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
//kafka主题用","分割
tier1.sources.source1.kafka.topics = test1, test2
//消费者id
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
//用正则订阅topic
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
//正则表达式要订阅的topics,这个配置的优先级高于kafka.topics,他会覆盖kafka.topics
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
Kafka作为sink
也就是说flume将数据发送到kafka的topic。
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
Kafka作为channel
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
nel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
并非原创 如有雷同 请联系笔者