Flume调优

Flume

Flume可以有效的从不同的源,收集、聚合移动大量日志数据到集中式数据存储

Flumed的优势

Flume可以将应用产生的数据存储到任何集中的存储器中,比如HDFS ,Hive,Hbase。

1.当收集数据的速度大于写入数据的时候 ,Flume也会在数据生产者和数据收容器间做出调增

2.支持各种接入资源数据的类型和接出数据类型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JpPCMUSb-1610538847306)(C:\Users\Acer\Desktop\思维导图\flume简单结构图.png)]

WebSever为数据源,Source接受数据源,流向Channel作为临时缓冲,Sink不断地从Channel里面抽取数据,并将数据发送到存储、索引系统,或者发送到另一个Flume Agent

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a2uEInuC-1610538847310)(C:\Users\Acer\Desktop\思维导图\Flume结构图详解.png)]

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的地。Agent主要有三个部分组成:Source、Channel、Sink。

Event

Flume数据传输的基本单元,带有一个可选的消息头。如果是文本文件,通常是一行记录。Event从Source,流向Channel,再到Sink,Sink将数据写入目的地。

Source

Source是负责接收数据到Flume Agent的组件。Source可以接收各种类型和格式的日志数据。

Channel

Channel主要提供一个队列的功能。是位于Source和Sink之间的缓冲区。Source到Channel是完全事务性。

一旦事务中的所有事件全部都传递到Channel且提交成功,那么Source就将其标记为完成。如果事件传递失败,那么事务将会回滚。

Flume对于Channel提供了Memory Channel、 File Channel 、JDBC Channel、Kafka Channel 以及自定义Channel等。

Memory Channel 是内存中的队列,可以在不考虑数据丢失的情况下使用

File Channel 将所有事件都写入磁盘中 ,可以防止数据丢失。

Flume 支持事务,分为Put事务与Take事务。

Put事务:

从Source到Channel传输的事务 叫Put事务 。通过doPut将数据先写入临时缓冲区 putList;

再通过doCommit将批数据提交给Channel,会检查Channel内存队列是否足够合并,如果Channel内存队列空间不足,则回滚数据。

Take事务:

从Channel拉取事件数据到Sink的过程叫Take事务。通过doTake现将数据取到临时缓冲区taskList;再通过doCommit将事件数据发送Sink。如果数据全部发送数据成功,则清除临时缓冲区takeList。如果数据出现异常rollback将临时缓冲区takeList中的数据归还channel内存序列。

四 、拓扑结构

负责均衡

聚合模式 多用于落地HDFS

Flume优点:

1、可靠性

当节点出现故障时,日志能够传到其它节点上而不被丢失。

有三种保障:

end to end (收到数据agent首先将event落地到磁盘,当数据发送成功后,再删除,如果数据发送失败,可以重新发送。)

store on failure (将数据写入本地,待sink恢复,再次发送)

Best effort(发送接收都不保存)

2、可扩展性

Flume有三层架构

agent

collector

storage

3、可管理性

4、功能扩展性

Flume传输数据的基本单位是event。

只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

agent1.sources.source1.type=spooldir

agent1.sources.source1.spooldir=/usr/aboutyunlog

agent1.sources.source1.channels=channel1

agent1.sources.source1.fileHeader=false

sink滚动时间:是指sink要在规定的时间内落地的文件。

flume与Kafka整合

kafka做为数据源

kafka做数据源其实就是kafka消费者从kafka topic 读取文件。

//sources的类型是kafka
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
//管道选择了channel
tier1.sources.source1.channels = channel1
//可以一次读取的最大行数为5000
tier1.sources.source1.batchSize = 5000
//当batchSize没有达到最大值的时候,超过2000这个时间也会写入到channel
tier1.sources.source1.batchDurationMillis = 2000
//kafka作为数据源的broker的列表
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
//kafka主题用","分割
tier1.sources.source1.kafka.topics = test1, test2
//消费者id
tier1.sources.source1.kafka.consumer.group.id = custom.g.id


//用正则订阅topic

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
//正则表达式要订阅的topics,这个配置的优先级高于kafka.topics,他会覆盖kafka.topics
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used




Kafka作为sink

也就是说flume将数据发送到kafka的topic。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

Kafka作为channel

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

nel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer



并非原创 如有雷同 请联系笔者


上一篇:Flume的学习和使用


下一篇:kali安装arpspoof及“E:无法定位软件包问题”、GPG错误:由于没有公钥,无法验证下列签名的问题