flume1.9安装

Flume介绍

介绍

(1)高可靠性,Flume提供了end to end的数据可靠性机制

(2)易于扩展,Agent为分布式架构,可水平扩展

(3)易于恢复,Channel中保存了与数据源有关的事件,用于失败时的恢复

(4)功能丰富,Flume内置了多种组件,包括不同数据源和不同存储方式

 

(1)Source:数据源,简单的说就是agent获取数据的入口。

(2)Channel:管道,数据流通和存储的通道。一个source必须至少和一个channel关联。

(3)Sink:用来接收channel传输的数据并将之传送到指定的地方,成功后从channel中删除。

 

Events

(1)是Flume数据传输的基本单元

(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据

(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8

一个source,可以绑定多个channel

一个sink,只能绑定一个channel

 

Flume安装

解压

tar -xzf apache-flume-1.9.0-bin.tar.gz

mv apache-flume-1.9.0-bin /usr/local/flume

 

修改/etc/profile

vim /etc/profile

FLUME_HOME=/usr/local/flume

PATH=$PATH:$FLUME_HOME/bin

source /etc/profile

 

修改flume-env.sh

cd /usr/local/flume/conf

mv flume-env.sh.template flume-env.sh

vim flume-env.sh

export JAVA_HOME=/usr/local/jdk1.8.0_271

export PATH=$PATH:$JAVA_HOME/bin

 

验证

cd /usr/local/flume/bin

flume-ng version

 

自定义flume配置目录

mkdir -pv /usr/local/flume/{log,job,shell}

job         #用于存放flume启动的agent端的配置文件

log         #用于存放日志文件

shell       #用于存放启动脚本

 

Flume案例

案例1:监控端口数据(netcat source - memory channel - logger sink)

1.编辑配置:

vim /usr/local/flume/job/flume-netcat.conf

# 指定Agent组件

a1.sources = r1

a1.channels = c1

a1.sinks = k1

 

# 指定source

a1.sources.r1.type = netcat

a1.sources.r1.bind = cancer01

a1.sources.r1.port = 8888

 

# 指定sink类型,我们这里指定的为logger,即控制台输出。

a1.sinks.k1.type = logger

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是100

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 绑定source和sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2.启动

flume-ng agent --conf /usr/local/flume/conf --name a1 --conf-file /usr/local/flume/job/flume-netcat.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger==INFO,console

3.测试

ss -ntl                                #查看本地端口

telnet cancer01 8888

4.编写脚本

vim /usr/local/flume/shell/start-netcat.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/conf --conf-file=/usr/local/flume/job/flume-netcat.conf --name a1 -Dflume.monitoring.type=http  -Dflume.monitoring.port=10501 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-netcat.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-netcat.sh

5.启动脚本

ss -ntl

/usr/local/flume/shell/start-netcat.sh

tail -100f /usr/local/flum/log/flume-netcat.log

6.查看channel状态

yum -y install epel-release        #安装EPEL源

yum list jq          #查看jq安装包是否存在

yum -y install jq       #安装jq工具,便于我们查看json格式的内容

curl http://cancer01:10501/metrics | jq

{

  "CHANNEL.c1": {          #这是c1的CHANEL监控数据,c1该名称在flume-netcat.conf中配置文件中定义的。

    "ChannelCapacity": "1000",     #channel的容量,目前仅支持File Channel,Memory channel的统计数据。

    "ChannelFillPercentage": "0.0",       #channel已填入的百分比。

    "Type": "CHANNEL",                #很显然,这里是CHANNEL监控项,类型为CHANNEL。

    "ChannelSize": "0",                 #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。

    "EventTakeSuccessCount": "64",      #sink成功从channel读取事件的总数量。

    "EventTakeAttemptCount": "227",     #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。

    "StartTime": "1563520459221",       #channel启动时的毫秒值时间。

    "EventPutAttemptCount": "64",       #Source尝试写入Channe的事件总次数。

    "EventPutSuccessCount": "64",       #成功写入channel且提交的事件总次数。

    "StopTime": "0"                    #channel停止时的毫秒值时间,为0表示一直在运行。

  }

}

 

6.杀掉进程

ss -ntl

netstat -untalp  | grep 8888

jps

kill 3816

netstat -untalp  | grep 8888

ss -ntl

 

案例2:实时读取本地文件到HDFS集群(exec source - memory channel - hdfs sink)

说明:需要flume节点配置hadoop集群环境

1.编辑配置:

vim /usr/local/flume/job/flume-hdfs.conf

# 指定Agent组件

a2.sources = file_source

a2.sinks = hdfs_sink

a2.channels = memory_channel

 

# 指定source

a2.sources.file_source.type = exec

a2.sources.file_source.command = tail -F /var/log/messages

a2.sources.file_source.shell = /bin/bash -c

 

# 指定sink。

a2.sinks.hdfs_sink.type = hdfs

a2.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume/%Y%m%d/%H

#上传文件的前缀

a2.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-

#是否按照时间滚动文件夹

a2.sinks.hdfs_sink.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.hdfs_sink.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.hdfs_sink.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次

a2.sinks.hdfs_sink.hdfs.batchSize = 1000

#设置文件类型,可支持压缩

a2.sinks.hdfs_sink.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.hdfs_sink.hdfs.rollInterval = 600

#设置每个文件的滚动大小

a2.sinks.hdfs_sink.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

a2.sinks.hdfs_sink.hdfs.rollCount = 0

#最小副本数

a2.sinks.hdfs_sink.hdfs.minBlockReplicas = 1

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

a2.channels.memory_channel.type = memory

a2.channels.memory_channel.capacity = 1000

a2.channels.memory_channel.transactionCapacity = 1000

 

# 绑定source和sink

a2.sources.file_source.channels = memory_channel

a2.sinks.hdfs_sink.channel = memory_channel

说明:(生产配置)

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

Channel参数解释:

capacity:默认该通道中最大的可以存储的event数量

trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

keep-alive:event添加到通道中或者移出的允许时间

 

2.编写脚本

vim /usr/local/flume/shell/start-hdfs.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-hdfs.conf --name a2 -Dflume.monitoring.type=http  -Dflume.monitoring.port=10502 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-hdfs.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-hdfs.sh

3.启动脚本

ss -ntl

/usr/local/flume/shell/start-hdfs.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-hdfs.log

4查看hdfs对应目录是否生成相应的日志信息

hdfs dfs -ls /flume/20190719/17

5.查看channel状态

curl http://cancer01:10502/metrics | jq

{

  "SOURCE.file_source": {

    "AppendBatchAcceptedCount": "0",              #成功提交到channel的批次的总数量。

    "GenericProcessingFail": "0",                  #常规处理失败的次数

    "EventAcceptedCount": "9",                     #成功写出到channel的事件总数量。

    "AppendReceivedCount": "0",                    #每批只有一个事件的事件总数量(与RPC调用的一个append调用相等)。

    "StartTime": "1563528439426",                  #SOURCE启动时的毫秒值时间。

    "AppendBatchReceivedCount": "0",              #接收到事件批次的总数量。

    "ChannelWriteFail": "0",                      #往CHANNEL写失败的次数

    "EventReceivedCount": "9",                     #目前为止source已经接收到的事件总数量。

    "EventReadFail": "0",                         #时间读取失败的次数

    "Type": "SOURCE",                              #当前类型为SOURRCE

    "AppendAcceptedCount": "0",                    #逐条录入的次数,单独传入的事件到Channel且成功返回的事件总数量。

    "OpenConnectionCount": "0",                    #目前与客户端或sink保持连接的总数量,目前仅支持avro source展现该度量。

    "StopTime": "0"                                #SOURCE停止时的毫秒值时间,0代表一直运行着

  },

  "CHANNEL.memory_channel": {

    "ChannelCapacity": "1000",                     #channel的容量,目前仅支持File Channel,Memory channel的统计数据。

    "ChannelFillPercentage": "0.0",                #channel已填入的百分比。

    "Type": "CHANNEL",                             #当前类型为CHANNEL

    "ChannelSize": "0",                            #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。

    "EventTakeSuccessCount": "9",                  #sink成功从channel读取事件的总数量。

    "EventTakeAttemptCount": "36",      #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。

    "StartTime": "1563528438997",                  #CHANNEL启动时的毫秒值时间。

    "EventPutAttemptCount": "9",                   #Source尝试写入Channe的事件总次数。

    "EventPutSuccessCount": "9",                   #成功写入channel且提交的事件总次数。

    "StopTime": "0"                                #CHANNEL停止时的毫秒值时间。

  },

  "SINK.hdfs_sink": {

    "ConnectionCreatedCount": "1",                #下一个阶段(或存储系统)创建链接的数量(如HDFS创建一个文件)。

    "BatchCompleteCount": "0",                    #批量处理event的个数等于批处理大小的数量。

    "EventWriteFail": "0",                        #时间写失败的次数

    "BatchEmptyCount": "26",    #批量处理event的个数为0的数量(空的批量的数量),如果数量很大表示source写入数据的速度比sink处理数据的速度慢很多。

    "EventDrainAttemptCount": "9",                 #sink尝试写出到存储的事件总数量。

    "StartTime": "1563528439448",                  #SINK启动时的毫秒值时间。

    "BatchUnderflowCount": "1",  #批量处理event个数小于批处理大小数量(比sink配置使用最大批量尺寸更小批量数量),如果该值很高也表示sink比source快

    "ChannelReadFail": "0",                        #从CHANNEL读取失败的次数

    "ConnectionFailedCount": "0",                  #连接失败的次数

    "ConnectionClosedCount": "0",                  #连接关闭的次数

    "Type": "SINK",                                #当前类型为SINK

    "EventDrainSuccessCount": "9",                 #sink成功写出到存储的事件总数量。

    "StopTime": "0"                                #SINK停止时的毫秒值时间。

  }

}

6.杀掉进程

ss -ntl

netstat -untalp  | grep ****

jps

kill ****

netstat -untalp  | grep ****

ss -ntl

 

案例3:实时指定目录文件内容到HDFS集群(spooldir source - memory channel - hdfs sink)

说明:需要flume节点配置hadoop集群环境

1.编辑配置:

vim /usr/local/flume/job/flume-dir.conf

# 指定Agent组件

a3.sources = spooldir_source

a3.sinks = hdfs_sink

a3.channels = memory_channel

 

# 指定source

a3.sources.spooldir_source.type = spooldir

a3.sources.spooldir_source.spoolDir = /home/flume/upload

a3.sources.spooldir_source.fileSuffix = .COMPLETED

a3.sources.spooldir_source.fileHeader = true

#忽略所有以.tmp结尾的文件,不上传

a3.sources.spooldir_source.ignorePattern = ([^ ]*\.tmp)

#获取源文件名称,方便下面的sink调用变量fileName

a3.sources.spooldir_source.basenameHeader = true

a3.sources.spooldir_source.basenameHeaderKey = fileName

 

# 指定sink。

a3.sinks.hdfs_sink.type = hdfs

a3.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume

#上传文件的前缀

a3.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-upload-

#是否按照时间滚动文件夹

a3.sinks.hdfs_sink.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.hdfs_sink.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.hdfs_sink.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次

a3.sinks.hdfs_sink.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.hdfs_sink.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.hdfs_sink.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是128M

a3.sinks.hdfs_sink.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

a3.sinks.hdfs_sink.hdfs.rollCount = 0

#最小冗余数

a3.sinks.hdfs_sink.hdfs.minBlockReplicas = 1

#和source的basenameHeader,basenameHeaderKey两个属性一起用可以保持原文件名称上传

a3.sinks.hdfs_sink.hdfs.filePrefix = %{fileName}

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

a3.channels.memory_channel.type = memory

a3.channels.memory_channel.capacity = 1000

a3.channels.memory_channel.transactionCapacity = 1000

 

# 绑定source和sink

a3.sources.spooldir_source.channels = memory_channel

a3.sinks.hdfs_sink.channel = memory_channel

2.编写脚本

vim /usr/local/flume/shell/start-dir.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-dir.conf --name a3 -Dflume.monitoring.type=http  -Dflume.monitoring.port=10503 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-dir.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-dir.sh

3.构建测试数据

mkdir -pv  /home/flume/upload

echo  http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie.blog

echo  http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie2.tmp

echo  http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie3.txt

4.启动脚本

ss -ntl

/usr/local/flume/shell/start-dir.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-dir.log

5.查看hdfs目录

ll /home/flume/upload

hdfs dfs -ls /flume/

6.查看channel状态

curl http://cancer01:10503/metrics | jq

 

6.杀掉进程

ss -ntl

netstat -untalp  | grep ****

jps

kill ****

netstat -untalp  | grep ****

ss -ntl

 

案例4:Flume与Flume之间数据传递,多Flume汇总数据到单Flume

说明:需要flume节点配置hadoop集群环境

场景说明:flume-1监控文件yinzhengjie.log,flume-2监控某一个端口的数据流,flume-1与flume-2将数据发送给flume-3,flume3将最终数据写入到HDFS。

 

1.编辑aggregation配置:

vim /usr/local/flume/job/flume-aggregation.conf

# 指定Agent组件

aggregation.sources = avro_source

aggregation.sinks = hdfs_sink

aggregation.channels = memory_channel

 

# 指定source

aggregation.sources.avro_source.type = avro

aggregation.sources.avro_source.bind = cancer01

aggregation.sources.avro_source.port = 6666

 

# 指定sink。

aggregation.sinks.hdfs_sink.type = hdfs

aggregation.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume/%Y%m%d/%H

#上传文件的前缀

aggregation.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-

#是否按照时间滚动文件夹

aggregation.sinks.hdfs_sink.hdfs.round = true

#多少时间单位创建一个新的文件夹

aggregation.sinks.hdfs_sink.hdfs.roundValue = 1

#重新定义时间单位

aggregation.sinks.hdfs_sink.hdfs.roundUnit = hour

#是否使用本地时间戳

aggregation.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次

aggregation.sinks.hdfs_sink.hdfs.batchSize = 100

#设置文件类型,可支持压缩

aggregation.sinks.hdfs_sink.hdfs.fileType = DataStream

#多久生成一个新的文件

aggregation.sinks.hdfs_sink.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是128M

aggregation.sinks.hdfs_sink.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

aggregation.sinks.hdfs_sink.hdfs.rollCount = 0

#最小冗余数

aggregation.sinks.hdfs_sink.hdfs.minBlockReplicas = 1

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

aggregation.channels.memory_channel.type = memory

aggregation.channels.memory_channel.capacity = 1000

aggregation.channels.memory_channel.transactionCapacity = 100

 

# 绑定source和sink

aggregation.sources.avro_source.channels = memory_channel

aggregation.sinks.hdfs_sink.channel = memory_channel

2.编写aggregation脚本

vim /usr/local/flume/shell/start-aggregation.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-aggregation.conf --name aggregation -Dflume.monitoring.type=http  -Dflume.monitoring.port=10511 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-aggregation.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-aggregation.sh

3.启动aggregation脚本

ss -ntl

/usr/local/flume/shell/start-aggregation.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-aggregation.log

4.编辑netcat配置:

vim /usr/local/flume/job/flume-my_netcat.conf

# 指定Agent组件

my_netcat.sources = netcat_source

my_netcat.sinks = avro_sink

my_netcat.channels = memory_channel

 

# 指定source

my_netcat.sources.netcat_source.type = netcat

my_netcat.sources.netcat_source.bind = cancer01

my_netcat.sources.netcat_source.port = 8888

 

# 指定sink

my_netcat.sinks.avro_sink.type = avro

my_netcat.sinks.avro_sink.hostname = cancer01

my_netcat.sinks.avro_sink.port = 6666

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_netcat.channels.memory_channel.type = memory

my_netcat.channels.memory_channel.capacity = 1000

my_netcat.channels.memory_channel.transactionCapacity = 100

 

# 绑定source和sink

my_netcat.sources.netcat_source.channels = memory_channel

my_netcat.sinks.avro_sink.channel = memory_channel

5.编写netcat脚本

vim /usr/local/flume/shell/start-my_netcat.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_netcat.conf --name my_netcat -Dflume.monitoring.type=http  -Dflume.monitoring.port=10512 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-my_netcat.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-my_netcat.sh

6.启动netcat脚本

ss -ntl

/usr/local/flume/shell/start-my_netcat.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_netcat.log

7.编辑exec配置:

vim /usr/local/flume/job/flume-my_exec.conf

# 指定Agent组件

my_exec.sources = exec_source

my_exec.sinks = avro_sink

my_exec.channels = memory_channel

 

# 指定source

my_exec.sources.exec_source.type = exec

my_exec.sources.exec_source.command = tail -F /home/flume/blog.txt

my_exec.sources.exec_source.shell = /bin/bash -c

 

# 指定sink

my_exec.sinks.avro_sink.type = avro

my_exec.sinks.avro_sink.hostname = cancer01

my_exec.sinks.avro_sink.port = 6666

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_exec.channels.memory_channel.type = memory

my_exec.channels.memory_channel.capacity = 1000

my_exec.channels.memory_channel.transactionCapacity = 100

 

# 绑定source和sink

my_exec.sources.exec_source.channels = memory_channel

my_exec.sinks.avro_sink.channel = memory_channel

8.编写exec脚本

vim /usr/local/flume/shell/start-my_exec.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_exec.conf --name my_exec -Dflume.monitoring.type=http  -Dflume.monitoring.port=10513 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-my_exec.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-my_exec.sh

9.启动exec脚本

ss -ntl

/usr/local/flume/shell/start-my_exec.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_exec.log

10测试

telnet cancer01 8888

echo "www.baidu.com" >> /home/flume/blog.txt

11.查看hdfs目录

hdfs dfs -ls /flume/

hdfs dfs -cat /flume/*******

6.查看channel状态

curl http://cancer01:10511/metrics | jq

curl http://cancer01:10512/metrics | jq

curl http://cancer01:10513/metrics | jq

 

6.杀掉进程

ss -ntl

netstat -untalp  | grep ****

jps

kill ****

netstat -untalp  | grep ****

ss -ntl

 

案例5:挑选器案例

channel selector: 通道挑选器,选择指定的event发送到指定的channel

(1)Replicating Channel Selector: 默认为副本挑选器,事件均以副本方式输出,换句话说就是有几个channel就发送几个副本。

(2)multiplexing selector: 多路复用挑选器,作用就是可以将不同的内容发送到指定的channel

 

1.编辑avro配置:

vim /usr/local/flume/job/flume-my_avro.conf

# 指定Agent组件

my_avro.sources = avro_source

my_avro.sinks = hdfs_sink

my_avro.channels = memory_channel

 

# 指定source

my_avro.sources.avro_source.type = avro

my_avro.sources.avro_source.bind = cancer01

my_avro.sources.avro_source.port = 8888

 

# 指定sink。

my_avro.sinks.hdfs_sink.type = hdfs

my_avro.sinks.hdfs_sink.hdfs.path = hdfs://cancer01/flume/%Y%m%d/%H

#上传文件的前缀

my_avro.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-

#是否按照时间滚动文件夹

my_avro.sinks.hdfs_sink.hdfs.round = true

#多少时间单位创建一个新的文件夹

my_avro.sinks.hdfs_sink.hdfs.roundValue = 1

#重新定义时间单位

my_avro.sinks.hdfs_sink.hdfs.roundUnit = hour

#是否使用本地时间戳

my_avro.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true

#积攒多少个Event才flush到HDFS一次

my_avro.sinks.hdfs_sink.hdfs.batchSize = 100

#设置文件类型,可支持压缩

my_avro.sinks.hdfs_sink.hdfs.fileType = DataStream

#多久生成一个新的文件

my_avro.sinks.hdfs_sink.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是128M

my_avro.sinks.hdfs_sink.hdfs.rollSize = 134210000

#文件的滚动与Event数量无关

my_avro.sinks.hdfs_sink.hdfs.rollCount = 0

#最小冗余数

my_avro.sinks.hdfs_sink.hdfs.minBlockReplicas = 1

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_avro.channels.memory_channel.type = memory

my_avro.channels.memory_channel.capacity = 1000

my_avro.channels.memory_channel.transactionCapacity = 100

 

# 绑定source和sink

my_avro.sources.avro_source.channels = memory_channel

my_avro.sinks.hdfs_sink.channel = memory_channel

 

2.编写avro脚本

vim /usr/local/flume/shell/start-my_avro.sh

#!/bin/bash

anohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_avro.conf --name my_avro -Dflume.monitoring.type=http  -Dflume.monitoring.port=10514 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-my_avro.log  2>&1 &

 

赋予权限:

chmod +x /usr/local/flume/shell/start-my_avro.sh

3.启动avro脚本

ss -ntl

/usr/local/flume/shell/start-my_avro.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_avro.log

 

4.编辑file_roll配置:

vim /usr/local/flume/job/flume-my_file_roll.conf

# 指定Agent组件

my_file_roll.sources = avro_source

my_file_roll.sinks = file_roll_sink

my_file_roll.channels = memory_channel

 

# 指定source

my_file_roll.sources.avro_source.type = avro

my_file_roll.sources.avro_source.bind = cancer01

my_file_roll.sources.avro_source.port = 9999

 

# 指定sink

my_file_roll.sinks.file_roll_sink.type = file_roll

#输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

my_file_roll.sinks.file_roll_sink.sink.directory = /home/flume/output

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_file_roll.channels.memory_channel.type = memory

my_file_roll.channels.memory_channel.capacity = 1000

my_file_roll.channels.memory_channel.transactionCapacity = 100

 

# 绑定source和sink

my_file_roll.sources.avro_source.channels = memory_channel

my_file_roll.sinks.file_roll_sink.channel = memory_channel

 

5.编写file_roll脚本

vim /usr/local/flume/shell/start-my_file_roll.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_file_roll.conf --name my_file_roll -Dflume.monitoring.type=http  -Dflume.monitoring.port=10515 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-my_file_roll.log  2>&1 &

 

赋予权限:

chmod +x /usr/local/flume/shell/start-my_file_roll.sh

6.启动file_roll脚本

ss -ntl

/usr/local/flume/shell/start-my_file_roll.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_file_roll.log

 

7.编辑replica配置:

vim /usr/local/flume/job/flume-my_replica.conf

# 指定Agent组件

replica.sources = exec_source

replica.sinks = hdfs_sink file_roll_sink

replica.channels = hdfs_channel file_roll_channel

 

# 将数据流复制给多个channel

replica.sources.exec_source.selector.type = replicating

 

# 指定source

replica.sources.exec_source.type = exec

replica.sources.exec_source.command = tail -F /home/flume/blog.txt

replica.sources.exec_source.shell = /bin/bash -c

 

# 定义要输出到hdfs的sink,注意端口号

replica.sinks.hdfs_sink.type = avro

replica.sinks.hdfs_sink.hostname = cancer01

replica.sinks.hdfs_sink.port = 8888

 

# 定义要输出到local filesystem的sink

replica.sinks.file_roll_sink.type = avro

replica.sinks.file_roll_sink.hostname = cancer01

replica.sinks.file_roll_sink.port = 9999

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

replica.channels.hdfs_channel.type = memory

replica.channels.hdfs_channel.capacity = 1000

replica.channels.hdfs_channel.transactionCapacity = 100

replica.channels.file_roll_channel.type = memory

replica.channels.file_roll_channel.capacity = 1000

replica.channels.file_roll_channel.transactionCapacity = 100

 

# 绑定source和sink

replica.sources.exec_source.channels = hdfs_channel file_roll_channel

replica.sinks.hdfs_sink.channel = hdfs_channel

replica.sinks.file_roll_sink.channel = file_roll_channel

 

8.编写replica脚本

vim /usr/local/flume/shell/start-my_replica.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-replica.conf --name replica -Dflume.monitoring.type=http  -Dflume.monitoring.port=10516 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-replica.log  2>&1 &

 

赋予权限:

chmod +x /usr/local/flume/shell/start-my_replica.sh

9.启动replica脚本

ss -ntl

/usr/local/flume/shell/start-my_replica.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_replica.log

10.查看hdfs目录

hdfs dfs -ls /flume/

hdfs dfs -cat /flume/*******

ll /home/flume/output

 

11.编辑multiplexing_selector配置:

vim /usr/local/flume/job/flume-my_multiplexing_selector.conf

# 指定Agent组件

my_multiplexing_selector.sources = avro_source

my_multiplexing_selector.sinks = Java_sink Go_sink Python_sink

my_multiplexing_selector.channels = Java_channel Go_channel Python_channel

 

# 指定source

my_multiplexing_selector.sources.avro_source.type = avro

my_multiplexing_selector.sources.avro_source.bind = cancer01

my_multiplexing_selector.sources.avro_source.port = 8888

# 指定挑选器类型为:多路复用

my_multiplexing_selector.sources.avro_source.selector.type = multiplexing

# 指定event header的key值

my_multiplexing_selector.sources.avro_source.selector.header = language

# 指定event header的key值对应的value值,编辑文件内容如:language    java

my_multiplexing_selector.sources.avro_source.selector.mapping.JAVA = Java_channel

my_multiplexing_selector.sources.avro_source.selector.mapping.GOLANG = Go_channel

my_multiplexing_selector.sources.avro_source.selector.default = Python_channel

 

# 定义sink

my_multiplexing_selector.sinks.Java_sink.type = file_roll

my_multiplexing_selector.sinks.Java_sink.sink.directory= /home/language/java

my_multiplexing_selector.sinks.Java_sink.sink.rollInterval = 0

my_multiplexing_selector.sinks.Go_sink.type = file_roll

my_multiplexing_selector.sinks.Go_sink.sink.directory= /home/language/golang

my_multiplexing_selector.sinks.Go_sink.sink.rollInterval = 0

my_multiplexing_selector.sinks.Python_sink.type = file_roll

my_multiplexing_selector.sinks.Python_sink.sink.directory= /home/language/python

my_multiplexing_selector.sinks.Python_sink.sink.rollInterval = 0

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_multiplexing_selector.channels.Java_channel.type = memory

my_multiplexing_selector.channels.Java_channel.capacity = 100000

my_multiplexing_selector.channels.Java_channel.transactionCapacity = 10000

my_multiplexing_selector.channels.Go_channel.type = memory

my_multiplexing_selector.channels.Go_channel.capacity = 100000

my_multiplexing_selector.channels.Go_channel.transactionCapacity = 10000

my_multiplexing_selector.channels.Python_channel.type = memory

my_multiplexing_selector.channels.Python_channel.capacity = 100000

my_multiplexing_selector.channels.Python_channel.transactionCapacity = 10000

 

# 绑定source和sink

my_multiplexing_selector.sources.avro_source.channels = Java_channel Go_channel Python_channel

my_multiplexing_selector.sinks.Java_sink.channel = Java_channel

my_multiplexing_selector.sinks.Go_sink.channel = Go_channel

my_multiplexing_selector.sinks.Python_sink.channel = Python_channel

 

12.编写multiplexing_selector脚本

vim /usr/local/flume/shell/start-my_multiplexing_selector.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_multiplexing_selector.conf --name my_multiplexing_selector -Dflume.monitoring.type=http  -Dflume.monitoring.port=10522 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-my_multiplexing_selector.log  2>&1 &

 

赋予权限:

chmod +x /usr/local/flume/shell/start-my_multiplexing_selector.sh

13.启动replica脚本

ss -ntl

/usr/local/flume/shell/start-my_multiplexing_selector.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-my_multiplexing_selector.log

14.测试

cat a.txt

language    java

language    php

language    shell

language    golang

language    python

language    scanla

language    js

language    vbs

language    c++

language    linux

cat header.txt

language    java

flume-ng avro-client -H cancer01 -p 8888 -F a.txt -R header.txt

 

案例6:主机拦截器案例

拦截器(interceptor): 是source端的在处理过程中能够对数据(event)进行修改或丢弃的组件。常见的拦截器有:

(1)host interceptor: 将发送的event添加主机名的header

(2)timestamp interceptor: 将发送的event添加时间戳的header

1.编辑host_interceptor配置:

vim /usr/local/flume/job/flume-host_interceptor.conf

# 指定Agent组件

my_host_interceptor.sources = netcat_source

my_host_interceptor.sinks = logger_sink

my_host_interceptor.channels = memory_channel

 

# 指定source

my_host_interceptor.sources.netcat_source.type = netcat

my_host_interceptor.sources.netcat_source.bind = cancer01

my_host_interceptor.sources.netcat_source.port = 8888

 

# 指定添加拦截器

my_host_interceptor.sources.netcat_source.interceptors = i1

my_host_interceptor.sources.netcat_source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder

my_host_interceptor.sources.netcat_source.interceptors.i1.preserveExisting = false

# 指定header的key

my_host_interceptor.sources.netcat_source.interceptors.i1.hostHeader = hostname

# 指定header的value为主机ip

my_host_interceptor.sources.netcat_source.interceptors.i1.useIP = true

 

# 指定sink。

my_host_interceptor.sinks.logger_sink.type = logger

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_host_interceptor.channels.memory_channel.type = memory

my_host_interceptor.channels.memory_channel.capacity = 100000

my_host_interceptor.channels.memory_channel.transactionCapacity = 10000

 

# 绑定source和sink

my_host_interceptor.sources.netcat_source.channels = memory_channel

my_host_interceptor.sinks.logger_sink.channel = memory_channel

 

2.编写host_interceptor脚本

vim /usr/local/flume/shell/start-host_interceptor.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-host_interceptor.conf --name my_host_interceptor -Dflume.monitoring.type=http  -Dflume.monitoring.port=10520 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-host_interceptor.log  2>&1 &

 

赋予权限:

chmod +x /usr/local/flume/shell/start-host_interceptor.sh

3.启动host_interceptor脚本

ss -ntl

/usr/local/flume/shell/start-host_interceptor.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-host_interceptor.log

4.测试

telnet cancer01 8888 

tail -100f /usr/local/flum/log/flume-host_interceptor.log

 

1.编辑timestamp_interceptor配置:

vim /usr/local/flume/job/flume-timestamp_interceptor.conf

# 指定Agent组件

my_timestamp_interceptor.sources = netcat_source

my_timestamp_interceptor.sinks = logger_sink

my_timestamp_interceptor.channels = memory_channel

 

# 指定source

my_timestamp_interceptor.sources.netcat_source.type = netcat

my_timestamp_interceptor.sources.netcat_source.bind = cancer01

my_timestamp_interceptor.sources.netcat_source.port = 8888

 

# 指定添加拦截器

my_timestamp_interceptor.sources.netcat_source.interceptors = i1

my_timestamp_interceptor.sources.netcat_source.interceptors.i1.type = timestamp

 

# 指定sink。

my_timestamp_interceptor.sinks.logger_sink.type = logger

 

# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000

my_timestamp_interceptor.channels.memory_channel.type = memory

my_timestamp_interceptor.channels.memory_channel.capacity = 100000

my_timestamp_interceptor.channels.memory_channel.transactionCapacity = 10000

 

# 绑定source和sink

my_timestamp_interceptor.sources.netcat_source.channels = memory_channel

my_timestamp_interceptor.sinks.logger_sink.channel = memory_channel

 

2.编写timestamp_interceptor脚本

vim /usr/local/flume/shell/start-timestamp_interceptor.sh

#!/bin/bash

nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-timestamp_interceptor.conf --name my_timestamp_interceptor -Dflume.monitoring.type=http  -Dflume.monitoring.port=10521 -Dflume.root.logger=INFO,console  >> /usr/local/flume/log/flume-timestamp_interceptor.log  2>&1 &

赋予权限:

chmod +x /usr/local/flume/shell/start-timestamp_interceptor.sh

3.启动timestamp_interceptor脚本

ss -ntl

/usr/local/flume/shell/start-timestamp_interceptor.sh

ss -ntl

tail -100f /usr/local/flum/log/flume-timestamp_interceptor.log

4.测试

telnet cancer01 8888 

tail -100f /usr/local/flum/log/flume-timestamp_interceptor.log

 

案例7:日志合并

1.场景:

 

2.分析:

 

3. 在服务器A和服务器B上创建配置文件:exec_source_avro_sink.conf

# Name the components on this agent

a1.sources = r1 r2 r3

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/data/access.log

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = access

 

a1.sources.r2.type = exec

a1.sources.r2.command = tail -F /root/data/nginx.log

a1.sources.r2.interceptors = i2

a1.sources.r2.interceptors.i2.type = static

a1.sources.r2.interceptors.i2.key = type

a1.sources.r2.interceptors.i2.value = nginx

 

a1.sources.r3.type = exec

a1.sources.r3.command = tail -F /root/data/web.log

a1.sources.r3.interceptors = i3

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = type

a1.sources.r3.interceptors.i3.value = web

 

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = cancer01

a1.sinks.k1.port = 41414

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sources.r2.channels = c1

a1.sources.r3.channels = c1

a1.sinks.k1.channel = c1

 

4. 在服务器C上创建配置文件:avro_source_hdfs_sink.conf 

#定义agent名, source、channel、sink的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

#定义source

a1.sources.r1.type = avro

a1.sources.r1.bind = mini2

a1.sources.r1.port =41414

 

#添加时间拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =

org.apache.flume.interceptor.TimestampInterceptor$Builder

 

#定义channels

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

 

#定义sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path=hdfs://cancer/flume/logs/%{type}/%Y%m%d

a1.sinks.k1.hdfs.filePrefix =events

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text

#时间类型

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件不按条数生成

a1.sinks.k1.hdfs.rollCount = 0

#生成的文件按时间生成

a1.sinks.k1.hdfs.rollInterval = 30

#生成的文件按大小生成

a1.sinks.k1.hdfs.rollSize  = 10485760

#批量写入hdfs的个数

a1.sinks.k1.hdfs.batchSize = 10000

flume操作hdfs的线程数(包括新建,写入等)

a1.sinks.k1.hdfs.threadsPoolSize=10

#操作hdfs超时时间

a1.sinks.k1.hdfs.callTimeout=30000

 

#组装source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

5启动

先启动服务器C上flume

flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

在启动服务器AB上flume

flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

案例7:sink group

# Name the components on this agent

a1.sources = r1

a1.channels = c1

a1.sinkgroups = g1

a1.sinks = k1 k2

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 配置sink组相关信息

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = round_robin

a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

 

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop102

a1.sinks.k1.port = 4141

 

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop102

a1.sinks.k2.port = 4142

 

# Describe the channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c1

 

案例8:source mysql

cd /var/lib/flume

touch sql-source.status  #状态文件记录最新id值

chmod -R 777 /usr/local/flume/

cp flume-ng-sql-source-1.3.7.jar /usr/local/flume/lib/

cp mysql-connector-java-5.1.17.jar /usr/local/flume/lib/mysql-connector-java.jar

agent.sources = sql-source

agent.channels = ch1

agent.sinks = HDFS

 

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123456

agent.sources.sql-source.table = wlslog

agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id

agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /usr/local/flume

agent.sources.sql-source.status.file.name = mysql-source.status

 

agent.sinks.HDFS.type = hdfs

agent.sinks.HDFS.hdfs.path = hdfs://cancer/flume/mysql

agent.sinks.HDFS.hdfs.fileType = DataStream

agent.sinks.HDFS.hdfs.writeFormat = Text

agent.sinks.HDFS.hdfs.rollSize = 268435456

agent.sinks.HDFS.hdfs.rollInterval = 0

agent.sinks.HDFS.hdfs.rollCount = 0

 

agent.channels.ch1.type = memory

agent.channels.ch1.capacity = 100000

agent.channels.ch1.transactionCapacity = 1000

 

agent.sources.sql-source.channels = ch1

agent.sinks.HDFS.channel = ch1

说明:

利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。这种方案的缺点与其优点一样突出,主要体现在以下几方面。

在源库上执行了查询,具有入侵性。

通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。

只能识别新增数据,检测不到删除与更新。

要求源库必须有用于表示增量的字段。

即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

自定义mysql source:

https://blog.csdn.net/lianghecai52171314/article/details/104919999

https://www.cnblogs.com/jhxxb/p/11589851.html

 

案例9:source mysql sink kafka

agent.sources = sql-source

agent.sinks = k1

agent.channels = ch

 

#将flume-ng-sql-source-1.x.x.jar包放到FLUME_HOME/lib下,

#如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下

#将mysql-connector-java-X-bin.jar放到FLUME_HOME/lib下,

#如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下

 

agent.sources.sql-source.type= org.keedio.flume.source.SQLSource

agent.sources.sql-source.hibernate.connection.url=jdbc:mysql://hostname:3306/yinqing?useUnicode=true&characterEncoding=utf-8&useSSL=false

agent.sources.sql-source.hibernate.connection.user=root

agent.sources.sql-source.hibernate.connection.password =password

agent.sources.sql-source.hibernate.connection.autocommit = true

agent.sources.sql-source.hibernate.connection.driver_calss=com.mysql.jdbc.Driver

agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect

agent.sources.sql-source.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider

agent.sources.sql-source.run.query.delay=10000

# 增量配置

#agent.sources.sql-source.table =table_name

#agent.sources.sql-source.columns.to.select = *

#agent.sources.sql-source.incremental.column.name=id

#agent.sources.sql-source.incremental.value=0

agent.sources.sql-source.custom.query=select * from lt_api_getallstops order by id desc

agent.sources.sql-source.start.from=0  #增量列的初始值

agent.sources.sql-source.status.file.path = /var/lib/flume-ng

agent.sources.sql-source.status.file.name = sql-source.status

 

#kafka.sink配置,此处是集群

agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.k1.topic = topic_test

agent.sinks.k1.brokerList = cancer01:9092,cancer02:9092,cancer03:9092

agent.sinks.k1.batchsize = 200

agent.sinks.kafkaSink.requiredAcks=1

agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder

agent.sinks.kafkaSink.zookeeperConnect=cancer01:2181,cancer02:2181,cancer03:2181

 

agent.channels.ch.type = memory

agent.channels.ch.capacity = 100000

agent.channels.ch.transactionCapacity = 1000

agent.channels.hbaseC.keep-alive = 20

 

agent.sources.sql-source.channels = ch

agent.sinks.k1.channel = ch

 

 

案例9:source kafka sink hdfs

agent.sources = kafkaSource

agent.channels = memoryChannel

agent.sinks = hdfsSink

 

agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource

agent.sources.kafkaSource.zookeeperConnect=cancer01:2181,cancer02:2181,cancer03:2181

agent.sources.kafkaSource.topic=flume-data

agent.sources.kafkaSource.kafka.consumer.timeout.ms=100

 

agent.sinks.hdfsSink.type=hdfs

agent.sinks.hdfsSink.hdfs.path=hdfs://cancer/kafka

agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.fileType=DataStream

 

agent.channels.memoryChannel.type=memory

agent.channels.memoryChannel.capacity=1000

agent.channels.memoryChannel.transactionCapacity=100

 

agent.sources.kafkaSource.channels = memoryChannel

agent.sinks.hdfsSink.channel = memoryChannel

 

 

其他补充

如果flume sink连接hadoop,则需要拷贝hdoop配置和lib

拷贝hdoop配置,添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

拷贝hdoop lib,将hadoop的jar包添加到flume的lib目录下

 

 

如果flume sink连接hive,则需要拷贝hive配置和lib

拷贝hive lib,将hive的一些jar拷贝过来 flume的lib目录下

 

 

修改flume.conf

vim /usr/local/flume/conf/flume.conf

# 指定Agent的组件名称

a1.sources = r1

a1.channels = c1

a1.sinks = k1

 

# 指定Flume source(示例1)

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/hadoop/log

 

# source(示例2)

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# source(示例3)

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

 

# source(示例4)

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log

 

# source(示例5)

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true

 

# source(示例6)

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444

 

# source(示例7)

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt

 

# 指定Flume sink

a1.sinks.k1.type = logger

 

# sink(示例4)

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://cancer/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

 

# sink(示例5)

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://cancer/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text

 

# 指定Flume channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# channel(示例5)

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs

 

# channel(示例6)

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://cancer01:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\t'

 

# channel(示例7)

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\t'

 

# 绑定source和sink到channel上

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

启动

cd /usr/local/flume/bin

flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,consol

flume-ng agent --c conf --f conf/flume.conf --n a1 -Dflume.root.logger=INFO,consol

说明:

–-conf 或 --c      指定配置文件夹,包含flume-env.sh和log4j的配置文件     -–conf conf

-–conf-file 或 --f 配置文件地址       –-conf-file conf/flume.conf

–-name 或 --n    agent名称     –-name a1

--z  zookeeper连接字符串 --z zkhost:2181,zkhost1:2181

--p  zookeeper中的存储路径前缀     --p /flume

 

使用avro-client发送文件

flume-ng avro-client -H localhost -p 4141 -F ./log.00

flume-ng avro-client --conf conf --host host --port 8080

 

示例5配置说明:

1.source:spooldir(已经生成好的最终的数据文件)

(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件

(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)

(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)

(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)

2.flume监控目录,支持文件修改,并记录文件状态

(1)source:taildir (类似exec + spooldir的组合)

(2)filegroups :设置source组 可设置多个 filegroups = f1

(3)filegroups.

(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置

3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。

4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上

5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有

6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。

7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式

9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩

10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;

 

示例6配置说明:

1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用org.apache.hive.hcatalog.data.JsonSerDe)

2.DELIMITED:

serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t"

serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表

serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\t'

hive要做对应配置:

<!--以下配置为配合flume sink写入hive -->

<property>

       <name>hive.metastore.uris</name>

       <value>thrift://cancer01:9083</value>

</property>

<property>

       <name>hive.txn.manager</name>

       <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>

</property>

<property>

       <name>hive.compactor.initiator.on</name>

       <value>true</value>

</property>

<property>

       <name>hive.compactor.worker.threads</name>

       <value>1</value>

</property>

<property>

       <name>hive.support.concurrency</name>

       <value>true</value>

</property>

<property>

       <name>hive.enforce.bucketing</name>

       <value>true</value>

</property>

<property>

       <name>hive.exec.dynamic.partition.mode</name>

       <value>nonstrict</value>

</property>

<property>

       <name>hive.in.test</name>

       <value>true</value>

</property>

配置说明:

(1)报错:Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -》 打开一部分事务支持

hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务

hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间

hive.support.concurrency=true; -》是否支持并发,默认是false

hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶

hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式

(2)启动metastore时报错:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的

(3)再启动metastore时报错:

Error rolling back: Can't call rollback when autocommit=true

去掉以下属性:之前没有安装,先安装:yum install -y nc

运行:

使用nc去连接,然后输入数据,数据以制表符分割

 

Hive中可以看到数据

 

 

 

 

 

上一篇:Direct2D (5) : 绘制自定义图形


下一篇:2021-04-12