第1节 flume:13、14、更多flume案例一,通过拦截器实现不同类型的数据区分

1.6、flume案例一

1. 案例场景

A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log

现在要求:

把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。

但是在hdfs中要求的目录为:

/source/logs/access/20180101/**

/source/logs/nginx/20180101/**

/source/logs/web/20180101/**

2. 场景分析

 第1节 flume:13、14、更多flume案例一,通过拦截器实现不同类型的数据区分

3. 数据流程处理分析

 第1节 flume:13、14、更多flume案例一,通过拦截器实现不同类型的数据区分

第1节 flume:13、14、更多flume案例一,通过拦截器实现不同类型的数据区分

 

4、实现

服务器A对应的IP为 192.168.52.100

服务器B对应的IP为 192.168.52.110

服务器C对应的IP为 192.168.52.120

 

采集端配置文件开发

node01与node02服务器开发flume的配置文件

 

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim exec_source_avro_sink.conf

 

# Name the components on this agent

a1.sources = r1 r2 r3

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/servers/taillogs/access.log

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = access

 

a1.sources.r2.type = exec

a1.sources.r2.command = tail -F /export/servers/taillogs/nginx.log

a1.sources.r2.interceptors = i2

a1.sources.r2.interceptors.i2.type = static

a1.sources.r2.interceptors.i2.key = type

a1.sources.r2.interceptors.i2.value = nginx

 

a1.sources.r3.type = exec

a1.sources.r3.command = tail -F /export/servers/taillogs/web.log

a1.sources.r3.interceptors = i3

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = type

a1.sources.r3.interceptors.i3.value = web

 

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node03

a1.sinks.k1.port = 41414

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sources.r2.channels = c1

a1.sources.r3.channels = c1

a1.sinks.k1.channel = c1

 

 

服务端配置文件开发

在node03上面开发flume配置文件

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim avro_source_hdfs_sink.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#定义source

a1.sources.r1.type = avro

a1.sources.r1.bind = 192.168.52.120

a1.sources.r1.port =41414

 

#添加时间拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

 

 

#定义channels

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

 

#定义sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path=hdfs://192.168.52.100:8020/source/logs/%{type}/%Y%m%d

a1.sinks.k1.hdfs.filePrefix =events

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text

#时间类型

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件不按条数生成

a1.sinks.k1.hdfs.rollCount = 0

#生成的文件按时间生成

a1.sinks.k1.hdfs.rollInterval = 30

#生成的文件按大小生成

a1.sinks.k1.hdfs.rollSize  = 10485760

#批量写入hdfs的个数

a1.sinks.k1.hdfs.batchSize = 10000

#flume操作hdfs的线程数(包括新建,写入等)

a1.sinks.k1.hdfs.threadsPoolSize=10

#操作hdfs超时时间

a1.sinks.k1.hdfs.callTimeout=30000

 

#组装source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

 

 

 

 

 

采集端文件生成脚本

在node01与node02上面开发shell脚本,模拟数据生成

cd /export/servers/shells

vim server.sh

 

#!/bin/bash

while true

do 

 date >> /export/servers/taillogs/access.log;

 date >> /export/servers/taillogs/web.log;

 date >> /export/servers/taillogs/nginx.log;

  sleep 0.5;

done

 

顺序启动服务

node03启动flume实现数据收集

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

 

bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

 

node01与node02启动flume实现数据监控

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

 

bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

 

node01与node02启动生成文件脚本

cd /export/servers/shells

sh server.sh

 

5、项目实现截图

 

上一篇:Flume日志采集框架的使用


下一篇:MySQL一次插入多行数据