系统要求
- Java1.8或以上
- 内存要足够大
- 硬盘足够大
- Agent对源和目的要有读写权限
Flume部署
我这8G内存的电脑之前搭建Hadoop、Hive和HBase已经苟延残喘了,怀疑会卡死,硬着头皮上吧。先解压缩,大数据的这些产品都是一个部署套路。
我准备在server01上部署flume,单节点就可以了。在公司生产环境部署要考虑高可用。
[root@server01 home]# tar -xvf apache-flume-1.9.0-bin.tar.gz -C /usr [root@server01 home]# cd /usr [root@server01 usr]# chown -R hadoop:hadoop apache-flume-1.9.0-bin/ [root@server01 usr]# mv apache-flume-1.9.0-bin/ apache-flume-1.9.0
在profile文件中添加配置
FLUME_HOME=/usr/apache-flume-1.9.0/ PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HBASE_HOME/bin:$FLUME_HOME/bin
刷新配置文件
[root@server01 bin]# source /etc/profile
修改flume配置文件
[hadoop@server01 conf]$ pwd /usr/apache-flume-1.9.0/conf [hadoop@server01 conf]$ mv flume-env.sh.template flume-env.sh [hadoop@server01 conf]$ vi flume-env.sh
把flume-env.sh里的JAVA_HOME修改为绝对路径
export JAVA_HOME=/usr/java/jdk1.8.0
Flume启动
我们试一下通过网络端口写入数据。新建一个配置文件。
[hadoop@server01 conf]$ vi config1
数据流向:telent -> source -> channel -> sink -> logger
具体配置内容如下。
[hadoop@server01 conf]$ cat config1 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = server01 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动flume。注意flume1.0以后叫ng(next generation),之前叫og(original generation)。
[hadoop@server01 apache-flume-1.9.0]$ flume-ng agent --conf conf --conf-file conf/config1 --name a1 -Dflume.root.logger=INFO,console
启动之后,另开server02对44444端口发送数据。
[hadoop@server02 ~]$ telnet server01 44444 Trying 182.182.0.8... Connected to server01. Escape character is '^]'. hello OK thank you OK thank you very much OK how are you everyone OK
看看server01控制台输出了啥。
2021-01-07 11:17:14,198 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. } 2021-01-07 11:18:24,209 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 61 6E 6B 20 79 6F 75 0D thank you. } 2021-01-07 11:18:34,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 61 6E 6B 20 79 6F 75 20 76 65 72 79 20 6D thank you very m } 2021-01-07 11:18:51,602 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 6F 77 20 61 72 65 20 79 6F 75 20 65 76 65 72 how are you ever }
我们可以看到,控制台只会输出前面几个字节的内容,但是信息已经获取到了。
再来一个例子
上面是一个最简单的例子,从网络端口获取数据,输出到控制台。再来一个复杂一点的,从日志文件获取增量数据,写入HDFS。
做过开发的都清楚用tail -f filename来查看最新的请求日志,配置文件新建config2,内容如下。
[hadoop@server01 conf]$ cat config2 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/log.txt # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://mycluster/flume a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动hdfs,用上面的配置文件启动flume。
[hadoop@server01 apache-flume-1.9.0]$ flume-ng agent --name a1 --conf conf --conf-file conf/config2 -Dflume.root.logger=INFO,console
启动报错。
2021-01-07 19:19:51,905 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.hadoop.conf.Configuration.set(Configuration.java:1380) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1361) at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1703) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.hadoop.conf.Configuration.set(Configuration.java:1380) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1361) at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1703) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748)
这跟Hive启动错误是一样的,原因就是与Hadoop的guava包版本不一致。把Hadoop的jar包拷到Flume路径下,删除老的jar包。在Flume的lib目录执行如下命令。
[hadoop@server01 lib]$ cp /usr/hadoop-3.3.0/share/hadoop/common/lib/guava-27.0-jre.jar . [hadoop@server01 lib]$ ll|grep guava -rw-rw-r--. 1 hadoop hadoop 1648200 9月 13 2018 guava-11.0.2.jar -rw-r--r--. 1 hadoop hadoop 2747878 1月 12 11:42 guava-27.0-jre.jar [hadoop@server01 lib]$ rm guava-11.0.2.jar [hadoop@server01 lib]$ ll|grep guava -rw-r--r--. 1 hadoop hadoop 2747878 1月 12 11:42 guava-27.0-jre.jar
再次启动Flume。启动完毕后,模拟向/home/log.txt写入数据,中间间隔一段时间。
[root@server01 home]# echo "hello,thank you,thank you very much" >> log.txt [root@server01 home]# echo "How are you Indian Mi fans?" >> log.txt
再去看看HDFS生成的文件里有什么内容。
打开下面的两个文件,看看内容。原谅我不厚道地用了雷总歌词。
这样就把日志收集到HDFS了,后续可以通过MR任务来处理HDFS文件,提取需要的内容。