使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

1.前言

E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式

2.准备工作

创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群

3.配置Flume

3.1 配置source

配置项 说明
type org.apache.flume.source.loghub.LogHubSource
endpoint Lohub的endpoint 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP
project Lohub的project
logstore Lohub的logstore
accessKeyId Aliyun的access key id
accessKey Aliyun的access key
useRecordTime true
consumerGroup consumer_1 消费组名称,默认值为consumer_1

配置项说明如下

  1. useRecordTime
    默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
    但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
  2. consumerPosition
    消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
    可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
    首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
  3. heartbeatInterval和fetchInOrder
    heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
  4. batchSize和batchDurationMillis
    通用的source batch配置,表示触发event写入channel的阈值。
  5. backoffSleepIncrement和maxBackoffSleep
    通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。

3.2配置channel和sink

此处使用memory channel和hdfs sink,hdfs sink配置如下

type hdfs
hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
hdfs.fileType DataStream
hdfs.rollInterval 3600
hdfs.round true
hdfs.roundValue 60
hdfs.roundUnit minute
hdfs.rollSize 0
hdfs.rollCount 0

memory channel配置如下

type memory
capacity 2000
transactionCapacity 2000

4.运行Flume agent

在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。

使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

查看Log Service上的消费组状态

使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

上一篇:用WEB技术栈开发NATIVE应用(一):WEEX SDK原理详解


下一篇:w3c系列CSS之路(二):错误解析和基本数据类型