目录
5.3.4 RegEx Filtering Interceptor
5.3.5 RegEx Extractor Interceptor
7.1 缺少hive jar包导致启动flume agent失败
7.3 Flume连接HiveMetaStore后通信失败Lock Exception
1. Flume基础知识
Flume是一个分布式、高可靠的、高可用的海量日志数据采集、聚合和传输服务。Flume由Agent组成(表现为一个jvm进程),每个agent含有source、channel、sink三个部分。
Agent以event形式将数据从source送至sink。Event含有Header和Body两部分。
2. Flume事务
为保证数据可靠传输,Flume内部采用事务机制实现source和channel、channel和sink间数据安全传输。
3. Flume内部流程
4.Flume实例
4.1 Netcat+Logger
#cat ./conf/flume-netcat.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-netcat.conf --name a1 -Dflume.root.logger=INFO,console
[root@master ~]# telnet 127.0.0.1 44444
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello
OK
flume
OK
hadoop hdfs hive es
OK
elasticsearch
#flume
2020-12-10 00:06:19,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
2020-12-10 00:06:21,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 66 6C 75 6D 65 0D flume. }
2020-12-10 00:06:43,534 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 61 64 6F 6F 70 20 68 64 66 73 20 68 69 76 65 hadoop hdfs hive }
2020-12-10 00:07:05,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 6C 61 73 74 69 63 73 65 61 72 63 68 0D elasticsearch. }
4.2 Exec+Logger
#cat conf/flume-exec.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /var/log/messages
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#./bin/flume-ng agent --conf conf --conf-file ./conf/flume-exec.conf --name a1 -Dflume.root.logger=INFO,console
4.3 Exec+HdfsSink
#cat flume-hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/src-hdfs.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollsize = 20
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#./bin/flume-ng agent --conf conf --conf-file ./conf/flume-hdfs.conf --name a1
5.Flume cluster实例
5.1 sinks failover
1.启动master、slave1和slave2,然后向/var/log/logserver.log中写入数据。数据会先发送到slave1(因slave1优先级高)。
2.停止slave1上的flume进程,数据会发送到slave2。
3.重启slave1上的flume,数据会重新发送到slave1。
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-master-failover.conf --name a1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-slave1-failover.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-failover/flume-slave2-failover.conf --name a1 -Dflume.root.logger=INFO,console
#master服务器
[root@master flume-1.6.0]# cat ./conf/flume-failover/flume-master-failover.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/logserver.log
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave1
a1.sinks.k1.port = 52020
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave2
a1.sinks.k2.port = 52020
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.priority.maxpenality = 10000
#slave1服务器
[root@slave1 flume-1.6.0]# cat ./conf/flume-failover/flume-slave1-failover.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 52020
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#slav2服务器
[root@slave2 flume-1.6.0]# cat ./conf/flume-failover/flume-slave2-failover.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 52020
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.2 Flume loadbalance
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-master-loadbalance.conf --name a1
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-slave1-loadbalance.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-loadbalancer/flume-slave2-loadbalance.conf --name a1 -Dflume.root.logger=INFO,console
[root@master flume-loadbalancer]# cat flume-master-loadbalance.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/logserver.log
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave1
a1.sinks.k1.port = 52020
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave2
a1.sinks.k2.port = 52020
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000
[root@slave1 flume-loadbalancer]# cat flume-slave1-loadbalance.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 52020
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[root@slave2 flume-loadbalancer]# cat flume-slave2-loadbalance.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 52020
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3 Interceptor拦截与过滤
5.3.1 timestamp interceptor
timestamp interceptor会在event header中插入处理event时的时间。插入的header将key。使用timestamp或自定义header为key。
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-timestamp-interceptor.conf --name a1 -Dflume.root.logger=INFO,conso
#发送数据
curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:52020
[root@master flume-1.6.0]# cat ./conf/flume-interceptor/flume-timestamp-interceptor.conf
#cat flume-timestamp-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = TimeHeader
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3.2 Host inceptor
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-host-interceptor.conf --name a1 -Dflume.root.logger=INFO,console
#发送数据
echo "12345"|nc master 52020
[root@master flume-interceptor]# cat flume-host-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = TimeHeader
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/events/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3.3 static interceptor
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-static-interceptor.conf --name a1 -Dflume.root.logger=INFO,console
#curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:52020/
#log info
2020-12-14 09:35:21,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{hadoop2=hadoop2 is header, bigdata_flume=so_easy} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 21 21 hello world!!! }
[root@master flume-interceptor]# cat flume-static-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = bigdata_flume
a1.sources.r1.interceptors.i1.value = so_easy
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3.4 RegEx Filtering Interceptor
排除或保留body中匹配RE的event。
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-regexp-filter-interceptor.conf --name a1 -Dflume.root.logger=INFO,console
#排除那些body为全数字的event
[root@master flume-interceptor]# cat flume-regexp-filter-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3.5 RegEx Extractor Interceptor
通过RE来在header中添加指定的key和value,这里的key和value是通过RE匹配抽取出来的。
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-interceptor/flume-regexp-extractor-interceptor.conf --name a1 -Dflume.root.logger=INFO,console
[root@slave1 ~]# curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"12.3.56h"}]' http://master:52020/
2020-12-15 00:02:05,477 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{hadoop2=hadoop2 is header} body: 31 32 2E 33 2E 35 36 68 12.3.56h }
[root@slave1 ~]# curl -X post -d '[{"headers":{"hadoop2":"hadoop2 is header"}, "body":"12:3:56h"}]' http://master:52020/
2020-12-15 00:02:27,484 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{two=3, three=5, hadoop2=hadoop2 is header, one=2} body: 31 32 3A 33 3A 35 36 68 12:3:56h }
[root@master flume-interceptor]# cat flume-regexp-extractor-interceptor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.4 Flume Channel Selector选择器
5.4.1 replicating
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-master-replicating.conf --name a2
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave2-replicating.conf --name a1 -Dflume.root.logger=INFO,console
#发送测试数据
[root@master ~]# echo "good"|nc master 50000
2020-12-15 08:25:58,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 67 6F 6F 64 good }
[root@master flume-1.6.0]# cat ./conf/flume-selector/flume-master-replicating.conf
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2
a2.sources.r1.type = syslogtcp
a2.sources.r1.port = 50000
a2.sources.r1.host = master
a2.sources.r1.selector.type = replicating
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = slave1
a2.sinks.k1.port = 50000
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = slave2
a2.sinks.k2.port = 50000
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
[root@slave1 flume-1.6.0]# cat ./conf/flume-selector/flume-slave1-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 50000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[root@slave2 flume-1.6.0]# cat ./conf/flume-selector/flume-slave2-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 50000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.4.2 multiplexing
[root@slave1 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
[root@slave1 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave1-replicating.conf --name a1 -Dflume.root.logger=INFO,console
[root@slave2 flume-1.6.0]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-selector/flume-slave2-replicating.conf --name a1 -Dflume.root.logger=INFO,console
#测试数据
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"OK", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"NO", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"areyouok":"xyz", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
[root@master ~]# curl -X post -d '[{"headers":{"ABC":"xyz", "hadoop2":"hadoop2 is header"}, "body":"hello world!!!"}]' http://master:50000/
#只有第二条消息数据被slave2接收
2020-12-15 08:36:10,054 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{areyouok=NO, hadoop2=hadoop2 is header} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 21 21 hello world!!! }
[root@master flume-1.6.0]# cat ./conf/flume-selector/flume-master-multiplexing.conf
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2
a2.sources.r1.type = org.apache.flume.source.http.HTTPSource
a2.sources.r1.port = 50000
a2.sources.r1.host = master
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = areyouok
a2.sources.r1.selector.mapping.OK = c1
a2.sources.r1.selector.mapping.NO = c2
a2.sources.r1.selector.default = c1
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = slave1
a2.sinks.k1.port = 50000
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = slave2
a2.sinks.k2.port = 50000
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
[root@slave1 flume-1.6.0]# cat ./conf/flume-selector/flume-slave1-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 50000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[root@slave2 flume-1.6.0]# cat ./conf/flume-selector/flume-slave2-replicating.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 50000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
6.参考资料
1. flume官网:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
2. flume官网:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html
3. 尚硅谷flume教程:https://www.bilibili.com/video/BV184411B7kU?p=17 & https://www.bilibili.com/video/BV184411B7kU?p=18
7.Flume入Hive遇到的问题
7.1 缺少hive jar包导致启动flume agent失败
java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/streaming/RecordWriter
解决方法:$HIVE_HOME/hcatalog/share/hcatalog/下面hive jar包放到flume lib目录下。
7.2 flume连接hive metastore失败
需要启动HiveMetaStore服务。日志在/tmp/root/hive.log中。
解决方法:#hive --service metastore
7.3 Flume连接HiveMetaStore后通信失败Lock Exception
从Hive日志中看到表metastore.TXNS不存在,因为在安装hive时没有初始化元数据库metastore。执行下面命令初始化hive中metastore数据库。
进入hive的bin目录下执行schematool脚本 :./schematool -dbType mysql -initSchema
查看初始化信息:./schematool -dbType mysql -info
查看hive的元数据库,
mysql> show tables;
Flume日志:
Hive日志: