在流式数据处理过程中,E-MapReduce经常需要在Kafka与其他系统间进行数据同步或者在Kafka集群间进行数据迁移。本节向您介绍如何在E-MapReduce上通过Kafka Connect快速的实现Kafka集群间的数据同步或者数据迁移。
前提条件
背景信息
Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速的进行流式数据传输的工具。例如,Kafka Connect可以获取数据库的binlog数据,将数据库数据同步至Kafka集群,从而达到迁移数据库数据的目的。由于Kafka集群可对接流式处理系统,所以还可以间接实现数据库对接下游流式处理系统的目的。同时,Kafka Connect还提供了REST API接口,方便您创建和管理Kafka Connect。
kafka Connect分为standalone和distributed两种运行模式。在standalone模式下,所有的worker都在一个进程中运行。相比于standalone模式,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。
本文介绍如何在E-MapReduce上使用Kafka Connect的REST API接口在Kafka集群间进行数据迁移,kafka Connect使用distributed模式。
步骤一 创建Kafka集群
在EMR上创建源Kafka集群和目的Kafka集群。Kafka Connect安装在Task节点上,所以目的Kafka集群必须创建Task节点。集群创建好后,Task节点上Kafka Connect服务会默认启动,端口号为8083。
推荐您将源Kafka集群和目的kafka集群创建在同一个安全组下。如果源Kafka集群和目的kafka集群不在同一个安全组下,则两者的网络默认是不互通的,您需要对两者的安全组分别进行相关配置,以使两者的网络互通。
- 登录阿里云 E-MapReduce 控制台。
- 创建源Kafka集群和目的Kafka集群,详情请参见创建集群。
说明 创建目的Kafka集群时,必须开启Task实例,即创建Task节点。
步骤二 准备待迁移数据Topic
在源Kafka集群上创建一个名称为connect的Topic。
- 以SSH方式登录到源Kafka集群的header节点(本例为emr-header-1)。
- 以root用户运行如下命令创建一个名称为connect的Topic。
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect
说明 完成上述操作后,请保留该登录窗口,后续仍将使用。
步骤三 创建Kafka Connect的connector
在目的Kafka集群的Task节点上,使用curl命令通过JSON数据创建一个Kafka Connect的connector。
- 以SSH方式登录到目的Kafka集群的Task节点(本节为emr-worker-3)。
- 可选: 自定义Kafka Connect配置。
进入目的Kafka集群Kafka服务的配置页面,在connect-distributed.properties中自定义offset.storage.topic、config.storage.topic和status.storage.topic三个配置项,详情请参见组件参数配置。
Kafka Connect会将offsets、configs和任务状态保存在Topic中,Topic名对应offset.storage.topic、config.storage.topic和status.storage.topic三个配置项。Kafka Connect会自动使用默认的partition和replication factor创建这三个Topic,其中partition和repication factor配置项保存在/etc/ecm/kafka-conf/connect-distributed.properties文件中。
- 以root用户运行如下命令创建一个Kafka Connect。
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors
在JSON数据中,name字段代表创建的Kafka Connect的名称,本例为connect-test;config字段需要根据实际情况进行配置,关键变量的说明如下:
说明 完成上述操作后,请保留该登录窗口,后续仍将使用。
步骤四 查看Kafka Connect和Task节点状态
查看Kafka Connect和Task节点信息,确保两者的状态正常。
- 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
- 以root用户运行如下命令查看所有的Kafka Connect。
curl emr-worker-3:8083/connectors
- 以root用户运行如下命令查看本例创建的Kafka Connect(本例为connect-test)的状态。
curl emr-worker-3:8083/connectors/connect-test/status
确保Kafka Connect(本例为connect-test)的状态为RUNNING。
- 以root用户运行如下命令查看Task节点信息。
curl emr-worker-3:8083/connectors/connect-test/tasks
确保Task节点的返回信息中无错误信息。
步骤五 生成待迁移数据
通过命令向源集群中的connect Topic发送待迁移的数据。
- 返回到源Kafka集群的header节点(本例为emr-header-1)的登录窗口。
- 以root用户运行如下命令向connect Topic发送数据。
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092
步骤六 查看数据迁移结果
生成待迁移数据后,Kafka Connect会自动将这些数据迁移到目的集群的相应文件(本例为connect.replica)中。
- 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
- 以root用户运行如下命令查看数据迁移是否成功。
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000
从上述返回结果可以看出,在源Kafka集群发送的100000条数据已经迁移到了目的kafka集群。
小结
本文介绍并演示了使用Kafka Connect在Kafka集群间进行数据迁移的方法。如果需要了解Kafka Connect更详细的使用方法,请参见Kafka官网资料和REST API。
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。