1. 问题背景
我们在使用Kafka的时候,有时候会遇到以下几种场景:
- 原有Kafka集群机型配置过旧,需要升级换代,重新规划一个Kafka集群,将老集群的数据迁移到新集群上
- 数据上云,云下Kafka集群数据迁移到云上Kafka集群/Kafka服务
- 多个Kafka集群数据汇总到一个Kafka集群
- 基于业务Kafka集群,构建一个Kafka灾备集群
总结一下,以上场景可以抽象成两类:
- 数据迁移
- 数据灾备
Kafka社区提供了一个工具,即MirrorMaker,它可以满足用户的数据迁移需求,同时一定程度的满足数据灾备需求。当然除了原生MirrorMaker工具,也存在着各种衍生版本的数据同步工具。下面就简单介绍一下社区版的MirrorMaker工具。
2. MirrorMaker工具介绍
MirrorMaker工具不过是将Kafka Consumer和Producer打包在一起。一个MirrorMaker进程会起若干个Consumer和一个Producer,Consumer负责从源Kafka集群消费数据,Producer负责将数据写到目标Kafka集群。
The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same.
上面是Kafka官方文档原文。在目标Kafka集群中,Topic可以有不同的分区数,offset也可能不一样。
3. 如何创建镜像集群
在E-MapReduce上创建两个Kafka集群,这里使用EMR-3.18.1版本,分别命名为Kafka-A和Kafka-B。建议将这两个集群建在一个安全组内,避免网络配置问题。
在Kafka-A 集群创建一个Topic "test":
kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 100
--replication-factor 2 --create
并向其中发送数据:
kafka-producer-perf-test.sh --topic test --num-records 10000000000 --throughput 10000
--producer-props bootstrap.servers=emr-worker-1:9092 --record-size 10
在Kafka-B集群同样创建一个Topic "test",改变一下分区数目:
kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.1.1 --topic test --partitions 50
--replication-factor 2 --create
同时在Kafka-B集群启动一个MirrorMaker进程,这个进程会启动4个消费线程消费数据:
kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf
--new.consumer --num.streams 4 --whitelist test
这时,我们就可以在Kafka-B集群看到同步过来的数据。我们在Kafka-A发送一条有意义的数据“hello world!”,在Kafka-B消费看看:
注意:
- 在实际使用中,建议不要开启"auto.topic.create",虽然这样避免我们在目标集群手动创建topic,并且可以自动应对新增topic。但是默认创建的topic,分区数目,副本数都是固定的,不一定就合适。建议按照实际需求手动创建topic,或者至少手动创建那些核心topic,自动创建那些非核心topic。
- 一个MirrorMaker进程的处理能力是有限的,我们可以在一台机器多启动几个MirrorMaker进程。
- MirrorMaker所在的机器的带宽能力是有限的,我们可以在多台机器同时部署MirrorMaker进程。
- 所有的MirrorMaker进程必选使用相同的配置。
4. MirrorMaker参数说明
简单介绍下工具的各个参数:
配置项 | 说明 |
---|---|
abort.on.send.failure | 当发送数据失败时,MirrorMaker进程退出,默认true。 |
blacklist | 无需复制数据的Topic列表,支持Java正则表达。注意,只有使用旧的Consumer时才生效。 |
whitelist | 需要复制数据的Topic列表,支持Java正则表达。 |
consumer.config | Consumer客户端配置文件。注意,当配置了"zookeeper.connect"时表示使用旧的Consumer。 |
producer.config | Producer客户端配置文件 |
consumer.rebalance.listener | 自定义的consumer rebalance listener,一般无需配置 |
rebalance.listener.args | 自定义的consumer rebalance listener参数 |
message.handler | 自定义的消息处理器,源端数据经过处理后再写到目的端 |
message.handler.args | 自定义的消息处理器参数 |
new.consumer | 指定使用新的Consumer |
num.streams | Consumer并发数 |
offset.commit.interval.ms | Offset commit间隔 |
注意:
- 目标Kafka集群需要同步源Kakfa集群的配置,譬如最大消息Size等。
-
Consumer配置:
- 根据最大消息,调整max.partition.fetch.bytes和fetch.max.bytes参数等等。
- 根据网络环境,适当增大session.timeout.ms和request.timeout.ms。当网络环境恶劣时,适当增加heartbeat.interval.ms,避免不必要的rebalance,当网络环境比较好时,则适当减小heartbeat.interval.ms,以使得服务端更快响应Client的异常。
-
Producer配置:
- 根据最大消息,调整max.request.size等参数。
- reties=Int.MAX_VALUE,acks=-1(all)。
- 根据网络环境,适当增大request.timeout.ms。
5. 小结
本文介绍了如何使用社区的Kafka MirrorMaker工具进行集群间的数据复制。E-MapReduce Kafka基于Kafka Conenct实现了新的一种数据同步工具,支持数据复制和灾备集群构建。Kafka社区当前正在进行MirrorMaker v2.0版本开发,同样也是基于Kafka Conenct来实施。后续将专门写一篇文章来介绍。