文件编写
简单了解
一、介绍
这一步主要是利用 flume 采集 HDFS 上的源数据并流向 kafka。
二、简单要点
1.Flume 是什么?
简单了解一下。
2.为什么 flume 和 Kafka 要联合使用 ?
简单了解一下。
3.建议用谷歌浏览器打开 Flume 官方网站(地址:flume.apache.org),如果需要可在站内将网页转为中文。
4.首先是查看用户指南,找到 Spooling Directory Source ,这主要是说把 文件夹 放到 目录 中来索取数据,说明这边监控的是文件夹,不监控文件,所以在编辑脚本时,目录写到文件夹即可,不需要写到文件。
三、操作步骤
(一)按图操作
(二)修改 sources
找到 Spooling Directory Source
- 把这一段代码考过来
a1.channels = ch-1 # 设置名称
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir =/var/log/apache/flumeSpool # 需要flume处理的文件目录
a1.sources.src-1.fileHeader = true # 是否添加存储绝对路径文件名的标头
- 进行简单修改:
a1.channels = c1
a1.sources = s1
a1.sources.s1.type = spooldir
a1.sources.s1.channels = ch-1
a1.sources.s1.spoolDir =/opt/data/event_attendees
- 对 行 最大长度进行修改:
注:因查询文件中最长的一行达10w个字符,故将下面最长行限制设置为12W
查询代码:wc -L 文件名
a1.sources.s1.deserializer.maxLineLength=120000
(三)使用拦截器去除表头
- 先找到正则拦截器
- 编写代码
注:因为选择了正则拦截器,那 type 势必为 regex_filter
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true
-
regex 后面跟的是正则表达式。
-
regex 后面跟的是正则表达式。
-
true 代表符合正则表达式的都不要,都过滤掉。
(四)修改 channels
找到 File Channels
- 把这一段代码考过来
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint # 这是检查点文件夹
a1.channels.c1.dataDirs = /mnt/flume/data # 这是数据通道
- 这里的 checkpointDir 、 dataDirs 文件夹在代码运行的时候系统都会自动创建
- 进行简单修改:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data
如果担心在传输的过程中日志会丢掉,就要加上一个备份检查点。
(五)修改 sinks
找到 Kafka Sink
- 把这一段代码考过来
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = 本地主机:9092
a1.sinks.k1.kafka.flumeBatchSize = 20 # 一批的尺寸是20条
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1 # 保留缓存时间
a1.sinks.k1.kafka .producer.compression.type = snappy # 压缩方式
-
如果 snappy 压缩方式报错,可点此链接获取一些帮助
-
进行简单修改:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288
四、完整配置代码
a1.channels = c1
a1.sources = s1
a1.sinks = k1
a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /opt/data/event_attendees
a1.sources.s1.deserializer.maxLineLength=120000
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type = regex_filter
a1.sources.s1.interceptors.i1.regex = \s*event.*
a1.sources.s1.interceptors.i1.excludeEvents=true
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flumekfk/checkpoint
a1.channels.c1.dataDirs = /opt/flumekfk/data
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = event_attendees_raw
a1.sinks.k1.kafka.bootstrap.servers = 192.168.59.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 10
a1.sinks.k1.kafka.producer.batch.size=524288
- kafka 参数调优
- 刚刚上边查出一行最长有 107513 个字符,所以 batch.size 要设置成 524288 ,也就是 512kb,才能保证实现批数据处理
- kafka.topic 这里要先在 kafka 里创建同名 topic
五、执行命令
- 执行
flume-ng agent -n a1 -c /opt/software/hadoop/flume160/conf -f /opt/flumecfg/event_attendees.conf -Dflume.root.logger=info,console
- 查询行数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.59.200:9092 --topic event_attendees_raw --time -1
- 后台监控
kafka-console-consumer.sh --bootstrap-server 192.168.59.200:9092 --topic event_attendees_raw --from-beginning