2021实训-Flume安装以及应用

  1. 上传flume到/root目录下

  2. 解压安装

    tar -zvxf apache-flume-1.9.0-bin.tar.gz
    
  3. 配置环境变量,并让环境变量生效

    export FLUME_HOME=/root/apache-flume-1.9.0-bin
    export PATH=$PATH:$FLUME_HOME/bin
    
  4. 将hadoop-2.7.3安装路径下的依赖的jar导入到/apache-flume-1.9.0-bin/lib下:

    share/hadoop/common/hadoop-common-2.7.3.jar
    share/hadoop/common/lib/commons-configuration-1.6.jar
    share/hadoop/common/lib/hadoop-auth-2.7.3.jar
    share/hadoop/hdfs/hadoop-hdfs-2.7.3.jar
    share/hadoop/common/lib/htrace-core-3.1.0-incubating.jar
    share/hadoop/common/lib/commons-io-2.4.jar
    
  5. 验证

    bin/flume-ng version 
    
  6. 配置Flume HDFS Sink:
    在/root/apache-flume-1.9.0-bin/conf/新建一个flume-hdfs.conf
    添加如下内容:

    # define the agent
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    # define the source
    #上传目录类型
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/var/log/nginx/logs/flumeLogs
    #定义自滚动日志完成后的后缀名
    a1.sources.r1.fileSuffix=.FINISHED
    #根据每行文本内容的大小自定义最大长度4096=4k
    a1.sources.r1.deserializer.maxLineLength=4096
    
    # define the sink
    a1.sinks.k1.type = hdfs
    #上传的文件保存在hdfs的/flumeLogs目录下
    a1.sinks.k1.hdfs.path = hdfs://niit01:9000/flumeLogs/%y-%m-%d/%H/%M/%S
    a1.sinks.k1.hdfs.filePrefix=access_log
    a1.sinks.k1.hdfs.fileSufix=.log
    a1.sinks.k1.hdfs.batchSize=1000
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.writeFormat= Text 
    # roll 滚动规则:按照数据块128M大小来控制文件的写入,与滚动相关其他的都设置成0
    #为了演示,这里设置成500k写入一次
    a1.sinks.k1.hdfs.rollSize= 512000
    a1.sinks.k1.hdfs.rollCount=0
    a1.sinks.k1.hdfs.rollInteval=0
    #控制生成目录的规则:一般是一天或者一周或者一个月一次,这里为了演示设置10秒 
    a1.sinks.k1.hdfs.round=true
    a1.sinks.k1.hdfs.roundValue=10
    a1.sinks.k1.hdfs.roundUnit= second    
    #是否使用本地时间
    a1.sinks.k1.hdfs.useLocalTimeStamp=true   
    
    #define the channel
    a1.channels.c1.type = memory
    #自定义event的条数
    a1.channels.c1.capacity = 500000
    #flume事务控制所需要的缓存容量1000条event
    a1.channels.c1.transactionCapacity = 1000
    
    #source channel sink cooperation
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    注意:- 需要先在/var/log/nginx/logs/创建flumeLogs
    - 需要在hdfs的根目录/下创建flumeLogs

  7. 修改conf/flume-env.sh(该文件事先是不存在的,需要复制一份)
    复制:

    cp flume-env.template.sh flume-env.sh
    

    编辑文件,并设置如下内容:

    #设置JAVA_HOME:
    export JAVA_HOME = /root/jdk1.8.0_171          
    #修改默认的内存:  
    export JAVA_OPTS="-Xms1024m -Xmx1024m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"   
    
  8. 启动flume

    • 测试数据:把 /var/log/nginx/logs/access.log 复制到
      /var/log/nginx/logs/flumeLogs/access_201904251200.log
    • 启动
      在/root/apache-flume-1.9.0-bin目录下,执行如下命令进行启动:
      bin/flume-ng agent --conf ./conf/ -f ./conf/flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
      
    • 到Hadoop的控制台http://niit01:50070/flumeLogs 查看有没有数据
  9. 编写Linux脚本rollingLog.sh,实现/var/log/nginx/logs/access.log日志的自动滚动到flumeLogs目录下

    • 在~目录下新建rollingLog.sh,并添加如下内容:
      	#!/bin/bash
      	#定义日期格式
      	dataformat=`date +%Y-%m-%d-%H-%M-%S`
      	
      	#复制access.log并重命名
      	cp /var/log/nginx/logs/access.log /var/log/nginx/logs/access_$dataformat.log
      	
      	host=`hostname`
      	sed -i 's/^/'${host}',&/g' /var/log/nginx/logs/access_$dataformat.log
      	#统计日志文件行数
      	lines=`wc -l < /var/log/nginx/logs/access_$dataformat.log`
      	
      	#将格式化的日志移动到flumeLogs目录下
      	mv /var/log/nginx/logs/access_$dataformat.log /var/log/nginx/logs/flumeLogs
      	
      	#清空access.log的内容
      	sed -i '1,'${lines}'d' /var/log/nginx/logs/access.log
      	
      	#重启nginx , 否则 log can not roll.
      	kill -USR1 `cat /var/log/nginx/logs/nginx.pid`
      	
      	##返回给服务器信息
      	ls -al /var/log/nginx/logs/flumeLogs/
      
  10. 编写启动Flume脚本 flume_start.sh,启动Flume

    #!/bin/bash
    /root/apache-flume-1.9.0-bin/bin/flume-ng agent -c /root/apache-flume-1.9.0-bin/conf/ -f /root/apache-flume-1.9.0-bin/conf/flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console &
    
  11. 编写停止Flume脚本 flume_stop.sh,停止Flume

    #!/bin/bash
    
    JAR="flume"
    
    #停止flume函数
    echo "begin stop flume process.."
    num=`ps -ef|grep java|grep $JAR|wc -l`
    echo "当前已经启动的flume进程数:$num"
    if [ "$num" != "0" ];then
    #正常停止flume
    ps -ef|grep java|grep $JAR|awk '{print $2;}'|xargs kill
    echo "进程已经关闭..."
    else
    echo "服务未启动,无须停止..."
    fi
    
  12. 编写重启Flume脚本 flume_to_hdfs.sh,综合了前两个脚本

    #!/bin/bash
    
    #先停止正在启动的flume
    ./flume_stop.sh
    
    #用法:nohup ./start-dishi.sh >output 2>&1 &
    nohup ./flume_start.sh > nohup_output.log 2>&1 &
    echo "启动flume成功……"
上一篇:Flume面试知识总结


下一篇:1.1-1.5 flume架构概述及安装使用