[ Project ] Editing Flume.conf

文件编写

简单了解

一、介绍

  这一步主要是利用 flume 采集 HDFS 上的源数据并流向 kafka

二、简单要点

  1.Flume 是什么?
  简单了解一下。

  2.为什么 flumeKafka 要联合使用 ?
  简单了解一下。

  3.建议用谷歌浏览器打开 Flume 官方网站(地址:flume.apache.org),如果需要可在站内将网页转为中文。

  4.首先是查看用户指南,找到 Spooling Directory Source ,这主要是说把 文件夹 放到 目录 中来索取数据,说明这边监控的是文件夹,不监控文件,所以在编辑脚本时,目录写到文件夹即可,不需要写到文件。

三、操作步骤

(一)按图操作

[ Project ]  Editing Flume.conf
[ Project ]  Editing Flume.conf
[ Project ]  Editing Flume.conf

(二)修改 sources

找到 Spooling Directory Source

  • 把这一段代码考过来
a1.channels = ch-1										# 设置名称
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir =/var/log/apache/flumeSpool	# 需要flume处理的文件目录
a1.sources.src-1.fileHeader = true						# 是否添加存储绝对路径文件名的标头
  • 进行简单修改:
a1.channels = c1
a1.sources = s1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = ch-1
a1.sources.s1.spoolDir =/opt/data/event_attendees
  • 最大长度进行修改:
    注:因查询文件中最长的一行达10w个字符,故将下面最长行限制设置为12W
    查询代码:wc -L 文件名
a1.sources.s1.deserializer.maxLineLength=120000

(三)使用拦截器去除表头

  1. 先找到正则拦截器

[ Project ]  Editing Flume.conf

  1. 编写代码
    注:因为选择了正则拦截器,那 type 势必为 regex_filter

[ Project ]  Editing Flume.conf

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true
  • regex 后面跟的是正则表达式。

  • regex 后面跟的是正则表达式。

  • true 代表符合正则表达式的都不要,都过滤掉。

(四)修改 channels

找到 File Channels

  • 把这一段代码考过来
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint	# 这是检查点文件夹
a1.channels.c1.dataDirs = /mnt/flume/data				# 这是数据通道
  • 这里的 checkpointDirdataDirs 文件夹在代码运行的时候系统都会自动创建
  • 进行简单修改:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data

如果担心在传输的过程中日志会丢掉,就要加上一个备份检查点。

[ Project ]  Editing Flume.conf

(五)修改 sinks

找到 Kafka Sink

  • 把这一段代码考过来
a1.sinks.k1.channel  =  c1 
a1.sinks.k1.type  =  org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic  =  mytopic 
a1.sinks.k1.kafka.bootstrap.servers  = 本地主机:9092 
a1.sinks.k1.kafka.flumeBatchSize  =  20 					# 一批的尺寸是20条
a1.sinks.k1.kafka.producer.acks  =  1 
a1.sinks.k1.kafka.producer.linger.ms  =  1 					# 保留缓存时间
a1.sinks.k1.kafka .producer.compression.type  = snappy		# 压缩方式
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288

四、完整配置代码

a1.channels = c1
a1.sources = s1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/data/event_attendees
a1.sources.s1.deserializer.maxLineLength=120000

a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data


a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288
  • kafka 参数调优
  • 刚刚上边查出一行最长有 107513 个字符,所以 batch.size 要设置成 524288 ,也就是 512kb,才能保证实现批数据处理
  • kafka.topic 这里要先在 kafka 里创建同名 topic

五、执行命令

  • 执行
flume-ng agent -n a1 -c /opt/software/hadoop/flume160/conf -f /opt/flumecfg/event_attendees.conf -Dflume.root.logger=info,console
  • 查询行数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.59.200:9092 --topic event_attendees_raw --time -1
  • 后台监控
kafka-console-consumer.sh --bootstrap-server 192.168.59.200:9092 --topic event_attendees_raw --from-beginning
上一篇:python 字典练习题


下一篇:菜肴制作(洛谷P3243)