阿里云Dataworks离线数据同步写入Kafka

Step By Step

1、kafka实例的创建&独享数据集成资源组的创建参考博客(资源创建部分):
Dataworks实时数据同步(Kafka -> maxcompute)

2、数据集成配置Kafka数据源&测试连通性
阿里云Dataworks离线数据同步写入Kafka

3、maxcompute创建测试数据表

CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);

INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");

SELECT * FROM odps_to_kafka1;

阿里云Dataworks离线数据同步写入Kafka

4、配置离线同步脚本(注意目前Kafka仅支持脚本模式,不支持想到模式)

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "envType": 1,
                "column": [
                    "key1",
                    "value1"
                ],
                "table": "odps_to_kafka1"  // maxcompute中表的名称
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "Kafka",
            "parameter": {
                "server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内网地址
                "keyIndex": 0,   // key值对应maxcompute读取column的第一列
                "valueIndex": 1,  // value值对应maxcompute读取column的第二列
                "valueType": "BYTEARRAY",
                "topic": "from_odps1",  // kafka 中表的名称
                "batchSize": 1024,
                "keyType": "BYTEARRAY"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "2"
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}
注意: 保存脚本的时候如果提示不满足json格式规范,将注释部分删除即可。

5、执行同步任务
阿里云Dataworks离线数据同步写入Kafka

6、Kafka控制台查看数据同步情况
阿里云Dataworks离线数据同步写入Kafka

更多参考

Kafka Writer
Dataworks实时数据同步(Kafka -> maxcompute)
新增和使用独享数据集成资源组

上一篇:提取MapInfo地图数据中的空间数据解决方案


下一篇:坚持学习WF(14):自定义持久化服务