如何在E-MapReduce上进行Kafka集群间数据复制

1. 问题背景

我们在使用Kafka的时候,有时候会遇到以下几种场景:

  • 原有Kafka集群机型配置过旧,需要升级换代,重新规划一个Kafka集群,将老集群的数据迁移到新集群上
  • 数据上云,云下Kafka集群数据迁移到云上Kafka集群/Kafka服务
  • 多个Kafka集群数据汇总到一个Kafka集群
  • 基于业务Kafka集群,构建一个Kafka灾备集群

总结一下,以上场景可以抽象成两类:

  • 数据迁移
  • 数据灾备

Kafka社区提供了一个工具,即MirrorMaker,它可以满足用户的数据迁移需求,同时一定程度的满足数据灾备需求。当然除了原生MirrorMaker工具,也存在着各种衍生版本的数据同步工具。下面就简单介绍一下社区版的MirrorMaker工具。

2. MirrorMaker工具介绍

如何在E-MapReduce上进行Kafka集群间数据复制

MirrorMaker工具不过是将Kafka Consumer和Producer打包在一起。一个MirrorMaker进程会起若干个Consumer和一个Producer,Consumer负责从源Kafka集群消费数据,Producer负责将数据写到目标Kafka集群。

The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same.

上面是Kafka官方文档原文。在目标Kafka集群中,Topic可以有不同的分区数,offset也可能不一样。

3. 如何创建镜像集群

在E-MapReduce上创建两个Kafka集群,这里使用EMR-3.18.1版本,分别命名为Kafka-A和Kafka-B。建议将这两个集群建在一个安全组内,避免网络配置问题。

在Kafka-A 集群创建一个Topic "test":

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 100 
--replication-factor 2 --create

并向其中发送数据:

kafka-producer-perf-test.sh --topic test --num-records 10000000000 --throughput 10000 
--producer-props bootstrap.servers=emr-worker-1:9092 --record-size 10

在Kafka-B集群同样创建一个Topic "test",改变一下分区数目:

kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 50 
--replication-factor 2 --create

同时在Kafka-B集群启动一个MirrorMaker进程,这个进程会启动4个消费线程消费数据:

kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf 
--new.consumer --num.streams 4 --whitelist test

这时,我们就可以在Kafka-B集群看到同步过来的数据。我们在Kafka-A发送一条有意义的数据“hello world!”,在Kafka-B消费看看:
如何在E-MapReduce上进行Kafka集群间数据复制

注意:

  • 在实际使用中,建议不要开启"auto.topic.create",虽然这样避免我们在目标集群手动创建topic,并且可以自动应对新增topic。但是默认创建的topic,分区数目,副本数都是固定的,不一定就合适。建议按照实际需求手动创建topic,或者至少手动创建那些核心topic,自动创建那些非核心topic。
  • 一个MirrorMaker进程的处理能力是有限的,我们可以在一台机器多启动几个MirrorMaker进程。
  • MirrorMaker所在的机器的带宽能力是有限的,我们可以在多台机器同时部署MirrorMaker进程。
  • 所有的MirrorMaker进程必选使用相同的配置。

4. MirrorMaker参数说明

简单介绍下工具的各个参数:

配置项 说明
abort.on.send.failure 当发送数据失败时,MirrorMaker进程退出,默认true。
blacklist 无需复制数据的Topic列表,支持Java正则表达。注意,只有使用旧的Consumer时才生效。
whitelist 需要复制数据的Topic列表,支持Java正则表达。
consumer.config Consumer客户端配置文件。注意,当配置了"zookeeper.connect"时表示使用旧的Consumer。
producer.config Producer客户端配置文件
consumer.rebalance.listener 自定义的consumer rebalance listener,一般无需配置
rebalance.listener.args 自定义的consumer rebalance listener参数
message.handler 自定义的消息处理器,源端数据经过处理后再写到目的端
message.handler.args 自定义的消息处理器参数
new.consumer 指定使用新的Consumer
num.streams Consumer并发数
offset.commit.interval.ms Offset commit间隔

注意:

  • 目标Kafka集群需要同步源Kakfa集群的配置,譬如最大消息Size等。
  • Consumer配置:

    • 根据最大消息,调整max.partition.fetch.bytes和fetch.max.bytes参数等等。
    • 根据网络环境,适当增大session.timeout.ms和request.timeout.ms。当网络环境恶劣时,适当增加heartbeat.interval.ms,避免不必要的rebalance,当网络环境比较好时,则适当减小heartbeat.interval.ms,以使得服务端更快响应Client的异常。
  • Producer配置:

    • 根据最大消息,调整max.request.size等参数。
    • reties=Int.MAX_VALUE,acks=-1(all)。
    • 根据网络环境,适当增大request.timeout.ms。

5. 小结

本文介绍了如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。E-MapReduce Kafka基于Kafka Conenct实现了新的一种数据同步工具,支持数据复制和灾备集群构建。Kafka社区当前正在进行MirrorMaker v2.0版本开发,同样也是基于Kafka Conenct来实施。后续将专门写一篇文章来介绍。

上一篇:SAP Spartacus 自定义 PageResolver 的用法


下一篇:如何在E-MapReduce上提交Storm作业处理Kafka数据