flume

第一节:简介

一、概念

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

做数据收集的工具,主要用于日志文件的收集。是一个单独的数据的采集的系统的,不在web项目controller,service,dao的三层中的。

二、flume的架构

1、NG架构(flume1.X

source:数据源,数据的来源,相当于对应用系统的日志文件进行读取。

channel:通道相当于一个缓冲区,作用就是缓冲source和sink的数据的速度不一致。

sink:数据的目的地,相当于将收集的数据写出到最终的目的地。

flume

2、OG架构(flume0.9

Agent->collector->master

第二节:flume的核心概念

一、event

数据收集,流的基本单位,flume进行数据读写的基本单元。

包含header(用以描述数据),body收集的数据(用以存储数据的,里面包含键值对,以及16进制的数字)

二、agent

数据收集代理,这里可以理解为一个数据收集服务器,一个代理包含flume的一套的组件的source、channel、sink。

三、三大组件

1、source

数据源,需要进行收集的日志信息的来源。

(1)avro source

监听avro协议的端口,当监听的端口中有数据流入的时候会被avro source监听到。

节点之间的传输一般也用avro协议。

(2)exec source

监听一个linux命令,指定一个需要监听的命令,当这个命令产生的数据就会被exec source监听到---进行采集。

(3)Spooling Directory Source

监听的是一个目录,这个目录是不能有子目录的。监听当前目录下的文件的数据的变化,一旦当前目录下文件的数据有变化就会被Spooling Directory Source监听到,就可以进行数据读取,一旦数据读取完成,当前的这个文件的名字就会加一个后缀.complete代表当前文件的数据读取完成。

(4)NetCat TCP Source

监听TCP协议的端口,一旦端口中有数据就可以被采集。

2、channel

数据通道,临时存储读取(source)的数据信息,给sink使用。可以使用内存、磁盘。

(1)Memory Channel

将数据存储在内存中,内存队列。

(2)FileChannel

event存储在文件中

3、sink

数据收集的目的地,将采集的数据进行最终的处理。

(1)hdfs sink

将采集的数据,写入到hdfs下

(2)Logger sink

将我们采集的数据,放在控制台打印,一般用于测试。

(3)avro sink

将采集的数据写入到制定的端口中

第三节:flume的搭建

flume

第四节:flume的案例

一、简介

Source、channel、sink依赖于配置文件,使用flume必须写配置文件,这个配置文件必须是以 .conf ,.properties这两种类型的文件。

二、运行命令

flume-ng agent --conf conf --conf-file /home/hadoop/case_one_flume.conf --name a1 -Dflume.root.logger=INFO,console

--conf 指定运行的方式

--conf-file 指定配置文件的路径

--name指定agent的名字,和配置文件中对应的

三、案例

# example.conf: A single-node Flume configuration

# 这里的a1指的是agent的名字  标识agent的  自定义的  但是注意:同一个机器上的两个agent一定不能重复  不同节点上的agent是可以一样的

# Name the components on this agent

# 定义的是当前的agent的  source  sink  channel的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# 设定source的类型和相关参数

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/home/hadoop/flumetest

 

# 设定channel

a1.channels.c1.type=memory

 

# 设定sink

a1.sinks.k1.type=logger

 

# Bind the source and sink to the channel

# 设定source的通道

a1.sources.r1.channels = c1

# 设定sink的通道

a1.sinks.k1.channel = c1

第五节:flume的拦截器

一、简介

数据流: source---channel(缓冲)---sink

拦截器就是用于拦截source--sink的数据,对数据进行包装,对数据添加一个数据头信息 headers中封装的,sink在接受到数据的时候经过拦截器处理的数据,有头信息数据的,sink就可以根据不同数据的头信息进行筛选,分类。

二、类型

1、时间戳

Timestamp Interceptor

添加的表头信息

headers:{timestamp=1558759325176}

 

# 指定当前agent a1的 sources sinks  channels 的别名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# agent的数据源的

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 指定agent的通道

a1.channels.c1.type = memory

 

# 指定agent的sink的

a1.sinks.k1.type = logger

 

# 指定拦截器

# 指定拦截器的别名

a1.sources.r1.interceptors = i1

# 指定类型的

a1.sources.r1.interceptors.i1.type = host

 

# 绑定agent的  r1   c1   k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2、主机名拦截器

Host Interceptor

添加的表头信息

headers:{timestamp=1558759325176}

 

# 指定当前agent a1的 sources sinks  channels 的别名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# agent的数据源的

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 指定agent的通道

a1.channels.c1.type = memory

 

# 指定agent的sink的

a1.sinks.k1.type = logger

 

# 指定拦截器

# 指定拦截器的别名

a1.sources.r1.interceptors = i1

# 指定类型的

a1.sources.r1.interceptors.i1.type = host

 

# 绑定agent的  r1   c1   k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3、静态拦截器

Static Interceptor

静态拦截器,自己静态指定,自己静态指定拦截器的 key value的值

# 指定当前agent a1的 sources sinks  channels 的别名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# agent的数据源的

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 指定agent的通道

a1.channels.c1.type = memory

 

# 指定agent的sink的

a1.sinks.k1.type = logger

 

# 指定拦截器

# 指定拦截器的别名

a1.sources.r1.interceptors = i1

# 指定类型的

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = country

a1.sources.r1.interceptors.i1.value = CN

 

# 绑定agent的  r1   c1   k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

上一篇:java8 Stream之原理


下一篇:通过系统日志采集大数据