flume
1.flume是什么
Flume: ** Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、传输、聚合的系统。 ** Flume仅仅运行在linux环境下 ** flume.apache.org(Documentation--Flume User Guide)
Flume体系结构(Architecture): Source: 用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel Channel:连接 source 和 sink的数据传输通道 Sink: 从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase
2.flume安装
----flume安装----------------------------- 1、解压(建议安装到cdh目录里) 2、改名,并修改flume-env.sh $ mv flume-env.sh.template flume-env.sh export JAVA_HOME=/opt/modules/jdk1.7.0_67 3、使用flume-ng命令 $ bin/flume-ng --conf 指定配置目录 --name 指定Agent的名称 --conf-file 指定具体的配置文件
3.案例:
需求:使用flume监控某个端口,把从端口写入的数据输出为logger 1、复制 $ cp -a flume-conf.properties.template flume-telnet.conf 2、修改flume-telnet.conf # Name the components on this agent # a1为代理(中介)实例名,任意命名,agent分三部分 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # netcat是用于调试和检查网络的工具包,windows和linux(redhat)均可用,需要安装 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink # 可以在文档Flume Sinks--Logger Sink部分查找 # 往日志文件里面写 a1.sinks.k1.type = logger # Use a channel which buffers events in memory # 内存channel a1.channels.c1.type = memory # channel里存放的最大event数 a1.channels.c1.capacity = 1000 # 每个事务支持的最大event数 a1.channels.c1.transactionCapacity = 100 # 绑定source和sink到channel # 注意:这里有's' a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 *** 配置文件的使用: a) 命名 b) 配置source、sink、channel c) 关联 --------------------- 测试: *** 安装telnet $ su - # yum -y install telnet *** 启动flume,'-D'设置日志级别和输出源 $ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger=INFO,console //把日志结果输出到控制台 *** 打开另外一个窗口 $ netstat -an|grep 44444 --检查是否有程序(flume)在监听44444端口 $ telnet localhost 44444 --连接本机的44444端口,telnet是访问这个端口的客户端 然后随意输入字符串... PS: a) 退出telnet:'ctrl+]',然后输入quit。 b) 若flume-ng无法退出,则打开一个新的窗口,jps(或netstat -antp|grep 44444)查找pid,使用 kill -9
需求:实时抽取新生成的日志文件内容 --> 追加到HDFS上对应文件的末尾 本例使用flume去监控某个文件,将新增添的内容抽取到其他地方,如HDFS 本例监控的是apache的日志文件 /var/log/httpd/access_log ----安装Apache服务器------- $ su - # yum -y install httpd # service httpd start # service httpd status ** 编辑主页,/var/www/html是Apache web服务器根目录 # vi /var/www/html/index.html 随意输入内容... ** 打开浏览器,http://192.168.2.200访问网页 ** 授权 # chmod 755 /var/log/httpd/ ** 动态监看日志变化,刷新页面可以触发日志生成 # su - tom $ tail -f /var/log/httpd/access_log --'-F'和'-f'效果相同 ---------------------------- $ cp -a flume-telnet.conf flume-apache.conf a2.sources = r2 a2.channels = c2 a2.sinks = k2 # define sources a2.sources.r2.type = exec a2.sources.r2.command = tail -F /var/log/httpd/access_log # '-c'表示命令行,必需写 a2.sources.r2.shell = /bin/bash -c # define channels a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # define sinks #启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹 a2.sinks.k2.type = hdfs #目录会自动生成 a2.sinks.k2.hdfs.path=hdfs://192.168.2.200:8020/flume/%Y%m%d/%H # 文件前缀 a2.sinks.k2.hdfs.filePrefix = accesslog #启用按时间生成文件夹 a2.sinks.k2.hdfs.round=true #设置round值:1,单位:小时 a2.sinks.k2.hdfs.roundValue=1 a2.sinks.k2.hdfs.roundUnit=hour #使用本地时间戳,如:用来命名文件 a2.sinks.k2.hdfs.useLocalTimeStamp=true # 缓冲到hdfs之前,用以写文件的事件的最大数 a2.sinks.k2.hdfs.batchSize=1000 a2.sinks.k2.hdfs.fileType=DataStream a2.sinks.k2.hdfs.writeFormat=Text #解决文件过多过小的问题(若是使用默认配置,会生成很多个小文件) #每600秒生成一个文件 a2.sinks.k2.hdfs.rollInterval=600 #当文件达到128000000字节时,会创建一个新文件 #实际环境中如果一个文件块128M,那么这里一般设置成127M(127*1024*1024) a2.sinks.k2.hdfs.rollSize=128000000 #设置文件的生成和events数无关 a2.sinks.k2.hdfs.rollCount=0 #需要设置为1,否则当有副本复制时,就重新生成文件,上面三条则会失效 a2.sinks.k2.hdfs.minBlockReplicas=1 # bind the sources and sinks to the channels a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 测试: a) 启动CDH Hadoop $ sbin/start-dfs.sh ; sbin/start-yarn.sh ; sbin/mr-jobhistory-daemon.sh start historyserver b) 启动Apache # service httpd start c) 启动flume $ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache.conf d) 刷新http://192.168.2.200 监看web日志:$ tail -f /var/log/httpd/access_log 监看HDFS: $ bin/hdfs dfs -tail -f /flume/20170519/10/accesslog.1495161507253.tmp
利用flume监控某个目录(/home/tom/log),把里面回滚好的文件实时抽取到HDFS平台。 $ mkdir /home/hadoop/log $ cd log $ cp /var/log/httpd/access_log access_log.1 $ cp /var/log/httpd/access_log access_log.2 需求:抽取文件access_log.1和access_log.2 $ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint $ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkdata $ cp -a flume-apache.conf flume-dir.conf a3.sources = r3 a3.channels = c3 a3.sinks = k3 # define sources a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /home/tom/log # 使用正则表达式指定忽略的文件 # '.'表示除了'\r\n'以外的任意字符,'*'表示0-n个 a3.sources.r3.ignorePattern = ^.*\_log$ # define channels # 通过临时文件进行转存(即把数据缓存到一个临时文件中,然后一起flush),速度慢,但数据相对安全 # 这里使用memory channel也可以 a3.channels.c3.type = file # checkpoint文件存放的地方,checkpoint里存储着元数据信息,比如哪些文件被抽取过,哪些还没有... a3.channels.c3.checkpointDir = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint # 临时文件存放的地方 a3.channels.c3.dataDirs = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkdata # define sinks #启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹 a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path=hdfs://192.168.122.128:8020/flume2/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix = accesslog #启用按时间生成文件夹 a3.sinks.k3.hdfs.round=true a3.sinks.k3.hdfs.roundValue=1 a3.sinks.k3.hdfs.roundUnit=hour #使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp=true a3.sinks.k3.hdfs.batchSize=1000 a3.sinks.k3.hdfs.fileType=DataStream a3.sinks.k3.hdfs.writeFormat=Text #解决文件过多过小问题 #每600秒生成一个文件 a3.sinks.k3.hdfs.rollInterval=600 a3.sinks.k3.hdfs.rollSize=128000000 #设置文件的生成和events数无关 a3.sinks.k3.hdfs.rollCount=0 #设置成1,否则当有副本复制时就重新生成文件,上面三条则会失去效果 a3.sinks.k3.hdfs.minBlockReplicas=1 # bind the sources and sinks to the channels a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 测试: $ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-dir.conf 去http://192.168.2.200:50070查看即可 ** 进入log/,可以看到,带后缀的表示抽取完成 $ ls access_log.1.COMPLETED access_log.2.COMPLETED 再次生成一个日志文件,会发现其会被立即抽取 $ cp access_log.1.COMPLETED access_log.3 $ ls access_log.1.COMPLETED access_log.3.COMPLETED access_log.2.COMPLETED
在同一个服务器启动三个agent: agent1:用于实时监控/var/log/httpd/access_log ** flume-apache.conf # 配置agent1 agent1.sources = r1 agent1.channels = c1 agent1.sinks = k1 # define sources agent1.sources.r1.type = exec # 注意:执行flume命令的用户对/var/log/httpd/access_log文件一定要有可读权限 agent1.sources.r1.command = tail -F /var/log/httpd/access_log agent1.sources.r1.shell = /bin/bash -c # define channels agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100 # define sinks # 一种序列号技术 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = 192.168.2.200 agent1.sinks.k1.port = 4545 # bind the sources and sinks to the channels agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1 测试: 启动Apache 启动agent1: $ bin/flume-ng agent --conf conf/ --name agent1 --conf-file conf/flume-apache.conf $ tail -F /var/log/httpd/access_log 刷新网页,查看变化 ------------------ agent2:用于实时监控/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log $ mkdir logs $ vi conf/hive-log4j.properties hive.log.dir=/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs ** flume-hive.conf # 配置agent2 agent2.sources = r2 agent2.channels = c2 agent2.sinks = k2 # define sources agent2.sources.r2.type = exec agent2.sources.r2.command = tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log agent2.sources.r2.shell = /bin/bash -c # define channels agent2.channels.c2.type = memory agent2.channels.c2.capacity = 1000 agent2.channels.c2.transactionCapacity = 100 # define sinks agent2.sinks.k2.type = avro agent2.sinks.k2.hostname = 192.168.2.200 agent2.sinks.k2.port = 4545 # bind the sources and sinks to the channels agent2.sources.r2.channels = c2 agent2.sinks.k2.channel = c2 测试: 启动agent2: $ bin/flume-ng agent --conf conf/ --name agent2 --conf-file conf/flume-hive.conf $ tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log 进入hive,随便执行几条语句,查看日志变化 hive> show databases; ... ------------------- agent3:用于实时监控收集agent1和agent2传递过来的数据 ** flume-collector.conf # 配置agent3 agent3.sources = r3 agent3.channels = c3 agent3.sinks = k3 # define sources agent3.sources.r3.type = avro agent3.sources.r3.bind = 192.168.2.200 agent3.sources.r3.port = 4545 # define channels agent3.channels.c3.type = memory agent3.channels.c3.capacity = 1000 agent3.channels.c3.transactionCapacity = 100 # define sinks # 启用设置多级目录,这里按"年月日"时 2级目录,每个小时生成一个文件夹 agent3.sinks.k3.type = hdfs agent3.sinks.k3.hdfs.path=hdfs://192.168.2.200:8020/flume3/%Y%m%d/%H agent3.sinks.k3.hdfs.filePrefix = accesslog # 启用按小时生成文件夹 agent3.sinks.k3.hdfs.round=true agent3.sinks.k3.hdfs.roundValue=1 agent3.sinks.k3.hdfs.roundUnit=hour agent3.sinks.k3.hdfs.useLocalTimeStamp=true agent3.sinks.k3.hdfs.batchSize=1000 agent3.sinks.k3.hdfs.fileType=DataStream agent3.sinks.k3.hdfs.writeFormat=Text # 解决文件过多过小的问题 # 每600秒生成一个文件 agent3.sinks.k3.hdfs.rollInterval=600 agent3.sinks.k3.hdfs.rollSize=128000000 # 设置文件的生成和events数无关 agent3.sinks.k3.hdfs.rollCount=0 # 设置成1,否则当有副本复制时就会重新生成文件,上面三条则会失效 agent3.sinks.k3.hdfs.minBlockReplicas=1 # bind the sources and sinks to the channels agent3.sources.r3.channels = c3 agent3.sinks.k3.channel = c3 测试: 启动agent3: $ bin/flume-ng agent --conf conf/ --name agent3 --conf-file conf/flume-collector.conf 进入CDH Hadoop,监控日志变化,注意:路径要修改(监控.temp文件效果会明显点) $ bin/hdfs dfs -tail -f /flume3/20161220/11/accesslog.1482203839459