使用EMR-Flume同步Kafka数据到HDFS
1. 背景
Flume是一个分布式、可靠和高效的数据汇聚系统,其source、channel和sink的结构设计,不仅实现了数据生产者与消费者的解耦,还提供了数据缓冲的功能。Flume支持多种source、channel和sink,也可以实现自定义source、channel和sink并以插件的方式加入Flume中。同时,Flume也支持数据处理、负载均衡、failover和数据高可靠等高级特性。E-MapReduce从3.19.0版本开始,提供了EMR-Flume集群管理、gateway扩展等多种特性方便Flume的使用,详见Flume使用说明
一个比较通用的使用场景是使用Flume将Kafka的数据按照时间分区同步至HDFS,进行实时的流式分析,或者使用Hive等工具进行离线的统计。下面就详细的介绍使用Flume同步Kafka的数据到HDFS。
2.准备工作
创建Kafka集群并创建topic test,详细步骤参考Kafka快速入门。
创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群。
3. Flume配置
首先对agent和绑定关系进行如下配置
agent.sources | source |
---|---|
agent.sinks | sink |
agent.channels | channel |
agent.sources.source1.channels | channel |
agent.sinks.k1.channel | channel |
3.1 Kafka source
因为写入HDFS的数据是按照时间分区的,如果在HDFS sink中配置useLocalTimeStamp将写入HDFS的时间作为分区时间,在Flume有数据滞后时,会将数据写入错误的分区。默认的,Kafka Source会在接收数据时将系统时间写入header中,可以使用该时间作为分区时间。Kafka source配置如下
agent.sources.source.type | org.apache.flume.source.kafka.KafkaSource |
---|---|
agent.sources.source.batchSize | 5000 |
agent.sources.source.kafka.bootstrap.servers | ip:port |
agent.sources.source.kafka.topics | test |
agent.sources.source.kafka.consumer.group.id | test-group |
其中,agent.sources.source.kafka.bootstrap.servers为Kafka broker的地址,根据实际配置。
在实际使用中,Kafka topic的数据量可能很大,超过一个Flume agent的负载,可以启动多个agent,使用相同的consumer group id来共同消费同一个topic的数据;同时,如果其中一个agent失败,其他agent也会继续消费topic的数据,达到容灾的效果。
3.2 channel
根据实际情况,需要在性能和可靠性做权衡。比如相比file channel,memory channel性能更高,但是在agent停止后channel中的数据会丢失;file channel虽然性能不如memory channel,但是持久化在磁盘的数据可以在agent停止后保证数据不丢失。此处使用file channel做说明
agent.channels.channel.transactionCapacity | 51200 |
---|---|
agent.channels.channel.checkpointDir | /mnt/disk1/flume/file-channel/checkpoint |
agent.channels.channel.dataDirs | /mnt/disk1/flume/file-channel/data |
agent.channels.channel.capacity | 51200 |
3.3 HDFS sink
将数据以时间为分区写入HDFS。考虑到配合Hive进行查询,可以在路径中添加列名。例如添加datetime和hour列,如下所示
agent.sinks.sink.hdfs.path | /tmp/flume-data/datetime=%y%m%d/hour=%H |
---|---|
agent.sinks.sink.hdfs.fileType | DataStream |
agent.sinks.sink.hdfs.rollSize | 0 |
agent.sinks.sink.hdfs.rollCount | 0 |
agent.sinks.sink.hdfs.rollInterval | 3600 |
agent.sinks.sink.hdfs.batchSize | 51200 |
agent.sinks.sink.hdfs.round | true |
agent.sinks.sink.hdfs.roundValue | 60 |
agent.sinks.sink.hdfs.roundUnit | minute |
其中,batchSize的设置需要在发送效率和延迟中做出选择,设置过大会数据滞后,设置过小会影响HDFS的吞吐。
为防止生成过多小文件,此处按照时间(1小时)来生成文件,也可根据实际情况根据event数或者文件大小来生成文件。
4.运行Flume agent
参考Flume使用说明
成功运行agent之后,可以查看HDFS中存储的数据。如下图所示查看2019年4月9日20点的数据
5.load balance
为了保证下游sink的可靠性,可以配置多个sink并使用相同的load balance sink组。这样,在其中一个sink失败时,其他sink可以从channel拉取数据并sink到HDFS中。如下设置了两个avro sink同属于一个sink组load-balancer-sink-group。
agent.sinks | avro-sink-1 avro-sink-2 |
---|---|
agent.sinkgroups | load-balancer-sink-group |
agent.sinkgroups.load-balancer-sink-group.sinks | avro-sink-1 avro-sink-2 |
agent.sinkgroups.load-balancer-sink-group.processor.type | load_balance |
agent.sinkgroups.load-balancer-sink-group.processor.selector | random |
agent.sinks.avro-sink-1.type | avro |
agent.sinks.avro-sink-1.hostname | emr-worker-1 |
agent.sinks.avro-sink-1.port | 19999 |
agent.sinks.avro-sink-2.type | avro |
agent.sinks.avro-sink-2.hostname | emr-worker-2 |
agent.sinks.avro-sink-2.port | 19999 |
使用sink组替代2.3 HDFS sink 中介绍的配置后,需要在emr-worker-1和emr-worker-2两个节点配置source为avro,sink为HDFS的Flume agent。如下所示
agent.sinks.sink.hdfs.path | /tmp/flume-data/datetime=%y%m%d/hour=%H |
---|---|
agent.sinks.sink.hdfs.fileType | DataStream |
agent.sinks.sink.hdfs.rollSize | 0 |
agent.sinks.sink.hdfs.rollCount | 0 |
agent.sinks.sink.hdfs.rollInterval | 3600 |
agent.sinks.sink.hdfs.batchSize | 51200 |
agent.sources.source.bind | 0.0.0.0 |
agent.sources.source.port | 19999 |
agent.channels.channel.transactionCapacity | 51200 |
agent.channels.channel.dataDirs | /mnt/disk1/flume/file-channel/data |
agent.channels.channel.checkpointDir | /mnt/disk1/flume/file-channel/checkpoint |
agent.channels.channel.capacity | 51200 |
agent.sinks.sink.type | hdfs |
agent.channels.channel.type | file |
agent.sources.source.type | avro |