概述
kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。
这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。
场景举例
下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。
- 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
bin/kafka-server-start.sh config/server.properties
- 创建一个Kafka的topic,名字为test
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
- 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
- 在Flume Home目录下配置并启动Flume服务
新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。
# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1
# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100
# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1
启动Flume服务:
bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'
从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。
2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE
通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar
查看OSS上的结果
$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw- 1 162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw- 1 925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw- 1 1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw- 1 1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw- 1 1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw- 1 588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657
$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
0 0 0 1911216 50036 1343828 0 0 0 0 1341 2396 1 1 98 0 0
0 0 0 1896964 50052 1343824 0 0 0 112 1982 2511 15 1 84 0 0
1 0 0 1896552 50052 1343828 0 0 0 76 2314 3329 3 4 94 0 0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
5 0 0 1903016 50052 1343828 0 0 0 0 2277 3249 2 4 94 0 0
0 0 0 1902892 50052 1343828 0 0 0 0 1417 2366 5 0 95 0 0
0 0 0 1902892 50052 1343828 0 0 0 0 1072 2243 0 0 99 0 0
0 0 0 1902892 50068 1343824 0 0 0 144 1275 2283 1 0 99 0 0
1 0 0 1903024 50068 1343828 0 0 0 24 1099 2071 1 1 99 0 0
0 0 0 1903272 50068 1343832 0 0 0 0 1294 2238 1 1 99 0 0
1 0 0 1903412 50068 1343832 0 0 0 0 1024 2094 1 0 99 0 0
2 0 0 1903148 50076 1343836 0 0 0 68 1879 2766 1 1 98 0 0
1 0 0 1903288 50092 1343840 0 0 0 92 1147 2240 1 0 99 0 0
0 0 0 1902792 50092 1343844 0 0 0 28 1456 2388 1 1 98 0 0