Flume入门知识

目录

1. Flume基础知识

2. Flume事务

3. Flume内部流程

4.Flume实例

4.1 Netcat+Logger

4.2 Exec+Logger

4.3 Exec+HdfsSink

5.Flume cluster实例

5.1 sinks failover

5.2 Flume loadbalance

5.3 Interceptor拦截与过滤

5.3.1 timestamp interceptor

5.3.2 Host inceptor

5.3.3 static interceptor

5.3.4 RegEx Filtering Interceptor

5.3.5 RegEx Extractor Interceptor

5.4 Flume Channel Selector选择器

5.4.1 replicating

5.4.2 multiplexing

6.参考资料

7.Flume入Hive遇到的问题

7.1 缺少hive jar包导致启动flume agent失败

7.2 flume连接hive metastore失败

7.3 Flume连接HiveMetaStore后通信失败Lock Exception


1. Flume基础知识

Flume是一个分布式、高可靠的、高可用的海量日志数据采集、聚合和传输服务。Flume由Agent组成(表现为一个jvm进程),每个agent含有source、channel、sink三个部分。

Agent以event形式将数据从source送至sink。Event含有Header和Body两部分。

Flume入门知识

2. Flume事务

为保证数据可靠传输,Flume内部采用事务机制实现source和channel、channel和sink间数据安全传输。

Flume入门知识

Flume入门知识

3. Flume内部流程

Flume入门知识

4.Flume实例

4.1 Netcat+Logger

#cat ./conf/flume-netcat.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-netcat.conf --name a1 -Dflume.root.logger=INFO,console
 

[root@master ~]# telnet 127.0.0.1 44444
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello
OK
flume
OK
hadoop hdfs hive es
OK
elasticsearch


#flume
2020-12-10 00:06:19,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
2020-12-10 00:06:21,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 66 6C 75 6D 65 0D                               flume. }
2020-12-10 00:06:43,534 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 61 64 6F 6F 70 20 68 64 66 73 20 68 69 76 65 hadoop hdfs hive }
2020-12-10 00:07:05,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 6C 61 73 74 69 63 73 65 61 72 63 68 0D       elasticsearch. }


4.2 Exec+Logger

#cat conf/flume-exec.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /var/log/messages
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#./bin/flume-ng agent --conf conf --conf-file ./conf/flume-exec.conf --name a1 -Dflume.root.logger=INFO,console

4.3 Exec+HdfsSink

#cat flume-hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/src-hdfs.log

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollsize = 20
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#./bin/flume-ng agent --conf conf --conf-file ./conf/flume-hdfs.conf  --name a1

5.Flume cluster实例

5.1 sinks failover

1.启动master、slave1和slave2,然后向/var/log/logserver.log中写入数据。数据会先发送到slave1(因slave1优先级高)。

2.停止slave1上的flume进程,数据会发送到slave2。

3.重启slave1上的flume,数据会重新发送到slave1。

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-master-failover.conf --name a1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-slave1-failover.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-slave2-failover.conf --name a1 -Dflume.root.logger=INFO,console

#master服务器
[root@master flume-1.6.0]# cat ./conf/flume-failover/flume-master-failover.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/logserver.log

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave1
a1.sinks.k1.port = 52020

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave2
a1.sinks.k2.port = 52020

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.priority.maxpenality = 10000

#slave1服务器
[root@slave1 flume-1.6.0]# cat ./conf/flume-failover/flume-slave1-failover.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 52020

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#slav2服务器
[root@slave2 flume-1.6.0]# cat ./conf/flume-failover/flume-slave2-failover.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 52020

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.2 Flume loadbalance

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-master-loadbalance.conf --name a1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-slave1-loadbalance.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-slave2-loadbalance.conf --name a1 -Dflume.root.logger=INFO,console

[root@master flume-loadbalancer]# cat flume-master-loadbalance.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/logserver.log

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave1
a1.sinks.k1.port = 52020

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave2
a1.sinks.k2.port = 52020

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000

[root@slave1 flume-loadbalancer]# cat flume-slave1-loadbalance.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 52020

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


[root@slave2 flume-loadbalancer]# cat flume-slave2-loadbalance.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 52020

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3 Interceptor拦截与过滤

5.3.1 timestamp interceptor

timestamp interceptor会在event header中插入处理event时的时间。插入的header将key。使用timestamp或自定义header为key。

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-timestamp-interceptor.conf --name a1 -Dflume.root.logger=INFO,conso
#发送数据
curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:52020

[root@master flume-1.6.0]# cat ./conf/flume-interceptor/flume-timestamp-interceptor.conf
#cat flume-timestamp-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = TimeHeader
a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3.2 Host inceptor

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-host-interceptor.conf --name a1 -Dflume.root.logger=INFO,console
#发送数据
echo "12345"|nc master 52020

[root@master flume-interceptor]# cat flume-host-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = TimeHeader
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false



a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3.3 static interceptor

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-static-interceptor.conf --name a1 -Dflume.root.logger=INFO,console

#curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:52020/

#log info
2020-12-14 09:35:21,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{hadoop2=hadoop2 is header, bigdata_flume=so_easy} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 21 21       hello world!!! }


[root@master flume-interceptor]# cat flume-static-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = bigdata_flume
a1.sources.r1.interceptors.i1.value = so_easy

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100




a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3.4 RegEx Filtering Interceptor

排除或保留body中匹配RE的event。

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-regexp-filter-interceptor.conf --name a1 -Dflume.root.logger=INFO,console


#排除那些body为全数字的event
[root@master flume-interceptor]# cat flume-regexp-filter-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3.5 RegEx Extractor Interceptor

通过RE来在header中添加指定的key和value,这里的key和value是通过RE匹配抽取出来的。

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-regexp-extractor-interceptor.conf --name a1 -Dflume.root.logger=INFO,console

[root@slave1 ~]# curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"12.3.56h"}]' http://master:52020/
2020-12-15 00:02:05,477 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{hadoop2=hadoop2 is header} body: 31 32 2E 33 2E 35 36 68                         12.3.56h }
[root@slave1 ~]# curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"12:3:56h"}]' http://master:52020/
2020-12-15 00:02:27,484 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{two=3, three=5, hadoop2=hadoop2 is header, one=2} body: 31 32 3A 33 3A 35 36 68                         12:3:56h }


[root@master flume-interceptor]# cat flume-regexp-extractor-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.4 Flume Channel Selector选择器

5.4.1 replicating

./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-master-replicating.conf --name a2
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave2-replicating.conf --name a1 -Dflume.root.logger=INFO,console

#发送测试数据
[root@master ~]# echo "good"|nc master 50000
2020-12-15 08:25:58,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 67 6F 6F 64                                     good }



[root@master flume-1.6.0]# cat ./conf/flume-selector/flume-master-replicating.conf

a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

a2.sources.r1.type = syslogtcp
a2.sources.r1.port = 50000
a2.sources.r1.host = master
a2.sources.r1.selector.type = replicating

a2.sinks.k1.type = avro
a2.sinks.k1.hostname = slave1
a2.sinks.k1.port = 50000

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = slave2
a2.sinks.k2.port = 50000

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

[root@slave1 flume-1.6.0]# cat ./conf/flume-selector/flume-slave1-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 50000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


[root@slave2 flume-1.6.0]# cat ./conf/flume-selector/flume-slave2-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 50000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

5.4.2 multiplexing

[root@slave1 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
[root@slave1 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
[root@slave2 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave2-replicating.conf --name a1 -Dflume.root.logger=INFO,console

#测试数据
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"OK", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"NO", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"xyz", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"ABC":"xyz", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/

#只有第二条消息数据被slave2接收
2020-12-15 08:36:10,054 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{areyouok=NO, hadoop2=hadoop2 is header} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 21 21       hello world!!! }



[root@master flume-1.6.0]# cat ./conf/flume-selector/flume-master-multiplexing.conf

a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

a2.sources.r1.type = org.apache.flume.source.http.HTTPSource
a2.sources.r1.port = 50000
a2.sources.r1.host = master
a2.sources.r1.selector.type = multiplexing

a2.sources.r1.selector.header = areyouok
a2.sources.r1.selector.mapping.OK = c1
a2.sources.r1.selector.mapping.NO = c2
a2.sources.r1.selector.default = c1

a2.sinks.k1.type = avro
a2.sinks.k1.hostname = slave1
a2.sinks.k1.port = 50000

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = slave2
a2.sinks.k2.port = 50000

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

[root@slave1 flume-1.6.0]# cat ./conf/flume-selector/flume-slave1-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 50000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

[root@slave2 flume-1.6.0]# cat ./conf/flume-selector/flume-slave2-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 50000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6.参考资料

1. flume官网:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

2. flume官网:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html

3. 尚硅谷flume教程:https://www.bilibili.com/video/BV184411B7kU?p=17 & https://www.bilibili.com/video/BV184411B7kU?p=18

 

7.Flume入Hive遇到的问题

7.1 缺少hive jar包导致启动flume agent失败

java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/streaming/RecordWriter

解决方法:$HIVE_HOME/hcatalog/share/hcatalog/下面hive jar包放到flume lib目录下。

Flume入门知识

7.2 flume连接hive metastore失败

需要启动HiveMetaStore服务。日志在/tmp/root/hive.log中。

解决方法:#hive --service metastore

Flume入门知识

Flume入门知识

7.3 Flume连接HiveMetaStore后通信失败Lock Exception

从Hive日志中看到表metastore.TXNS不存在,因为在安装hive时没有初始化元数据库metastore。执行下面命令初始化hive中metastore数据库。

进入hive的bin目录下执行schematool脚本 :./schematool -dbType mysql -initSchema

查看初始化信息:./schematool -dbType mysql -info

查看hive的元数据库,

mysql> show tables;

Flume日志:

Flume入门知识

Hive日志:

Flume入门知识

Flume入门知识

上一篇:虚拟机VMware的Ubuntu下安装tensorflow详解


下一篇:rosdep update (‘The read operation time out’)超时问题