使用EMR-Flume同步HDFS audit日志到HDFS
1.前言
E-MapReduce从3.19.0版本开始对EMR-Flume提供集群管理的功能。通过集群管理功能,可以在Web页面方便的配置和管理Flume Agent。
本文将使用EMR-Flume实时同步HDFS audit日志至HDFS,便于对HDFS操作记录进行离线统计和实时分析。
2.部署方案
2.1方案一
在master实例启动Flume agent,收集本地磁盘中的audit日志并sink到HDFS。
这个方案架构和配置比较简单,但是master实例本身部署了比较重要且对资源占用比较敏感的服务,比如Zookeeper,在master实例中HDFS读写操作如果占用过多资源会对这些服务产生影响。
2.2方案二
选取core实例启动Flume agent做sink HDFS的操作,在master实例启动Flume agent,收集本地磁盘中的audit日志通过Avro协议发送数据至core实例。
使用这个方案时,考虑到core实例上Flume运行的稳定性,可以选取多个core实例构成failover sink processor。
本文以方案二对操作流程作说明。
3.操作流程
3.1准备工作
创建E-MapReduce Hadoop集群,在可选服务中选择Flume。具体操作可参考创建集群
3.2 core实例配置并启动Flume Agent
比如在emr-worker-1节点进行操作,选择核心实例组进行配置,如下入所示
在配置页面设置如下
default-agent.sinks.default-sink.type | hdfs |
---|---|
default-agent.channels.default-channel.type | file |
default-agent.sources.default-source.type | avro |
deploy_node_hostname | emr-worker-1 |
在配置页面通过自定义配置添加如下配置:
default-agent.sinks.default-sink.hdfs.path | 对于高可用集群,使用hdfs://emr-cluster/path形式的地址 |
---|---|
default-agent.sinks.default-sink.hdfs.fileType | DataStream |
default-agent.sinks.default-sink.hdfs.rollSize | 0 |
default-agent.sinks.default-sink.hdfs.rollCount | 0 |
default-agent.sinks.default-sink.hdfs.rollInterval | 86400 |
default-agent.sinks.default-sink.hdfs.batchSize | 51200 |
default-agent.sources.default-source.bind | 0.0.0.0 |
default-agent.sources.default-source.port | 根据实际设置 |
default-agent.channels.default-channel.transactionCapacity | 51200 |
default-agent.channels.default-channel.dataDirs | channel存储event数据的路径 |
default-agent.channels.default-channel.checkpointDir | 存储checkpoint的路径 |
default-agent.channels.default-channel.capacity | 根据hdfs roll进行设置 |
说明:为避免生成过多小文件,通常以GB为单位生成HDFS文件,或者按天生成一个文件。此处按照时间来生成文件,可根据实际情况进行设置.
保存配置后启动Flume agent
在查看操作历史里显示操作成功后,部署拓扑页面可以看到emr-worker-1节点的flume已经是started状态
emr-worker-1节点启动成功后,开始启动第二个worker节点。
同样的方式,比如在worker-2节点启动flume,修改配置项
deploy_node_hostname | 节点的hostname |
---|---|
default-agent.sinks.default-sink.hdfs.path | 对于高可用集群,使用hdfs://emr-cluster/path形式的地址 |
保存配置后,启动 All Components,指定机器为emr-worker-2。
3.3 master实例配置并启动Flume Agent
比如在emr-header-1节点进行操作,选择服务配置
配置agent如下
additional_sinks | k1 | |
---|---|---|
deploy_node_hostname | emr-header-1 | |
default-agent.sources.default-source.type | taildir | |
default-agent.sinks.default-sink.type | avro | |
default-agent.channels.default-channel.type | file |
新增配置如下:
配置项 | 值 |
---|---|
default-agent.sources.default-source.filegroups | f1 |
default-agent.sources.default-source.filegroups.f1 | /mnt/disk1/log/hadoop-hdfs/hdfs-audit.log.* |
default-agent.sources.default-source.positionFile | 存储position file的路径 |
default-agent.channels.default-channel.checkpointDir | 存储checkpoint的路径 |
default-agent.channels.default-channel.dataDirs | 存储event数据的路径 |
default-agent.channels.default-channel.capacity | 根据实际情况设置 |
default-agent.sources.default-source.batchSize | 2000 |
default-agent.channels.default-channel.transactionCapacity | 2000 |
default-agent.sources.default-source.ignoreRenameWhenMultiMatching | true |
default-agent.sinkgroups | g1 |
default-agent.sinkgroups.g1.sinks | default-sink k1 |
default-agent.sinkgroups.g1.processor.type | failover |
default-agent.sinkgroups.g1.processor.priority.default-sink | 10 |
default-agent.sinkgroups.g1.processor.priority.k1 | 5 |
default-agent.sinks.default-sink.hostname | emr-worker-1节点的IP |
default-agent.sinks.default-sink.port | emr-worker-1节点Flume Agent的port |
default-agent.sinks.k1.hostname | emr-worker-2节点的IP |
default-agent.sinks.k1.port | emr-worker-2节点Flume Agent的port |
default-agent.sinks.default-sink.batch-size | 2000 |
default-agent.sinks.k1.batch-size | 2000 |
default-agent.sinks.k1.type | avro |
default-agent.sinks.k1.channel | default-channel |
需要说明的是,Flume的taildir source在filegroups使用通配符匹配log4j的滚动日志时会有数据重复的问题,通过对EMR-Flume配置ignoreRenameWhenMultiMatching可以避免这种问题。
保存配置后,相同的方式指定机器为emr-header-1启动flume。
如果需要在emr-header-2节点启动Flume agent,只需对配置作如下修改
3.4查看同步结果
使用HDFS命令,可以看到同步的数据被写入FlumeData.${timestamp}形式的文件中,其中timestamp为文件创建的时间戳