DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

背景:数据集成无法同步MongoDB时间戳字段类型实现增量同步。
场景:定时获取10分钟的增量数据,MongoDB增量字段为时间戳格式数据。

设置任务依赖实现参数传递:

设置节点依赖关系,调度配置都设置10分钟调度
DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

1、使用两个赋值节点定义时间戳格式的时间

开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

2、配置MongoDB同步节点

添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

MongoDB原始数据:

脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",

脚本配置示例

{
    "type": "job",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "ds1",
                                "query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
                "column": [
                    {
                        "name": "doc_id",
                        "type": "STRING"
                    },
                    {
                        "name": "create_time",
                        "type": "DOUBLE"
                    },
                    {
                        "name": "date_time",
                        "type": "DATE"
                    }
                ],
                "collectionName": "test1"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": false,
                "compress": false,
                "datasource": "odps_first",
                "column": [
                    "doc_id",
                    "create_time",
                    "date_time"
                ],
                "emptyAsNull": false,
                "table": "tablename"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

DataWorks百问百答历史记录 请点击这里查看>>

更多DataWorks技术和产品信息,欢迎加入【DataWorks钉钉交流群】

上一篇:DataWorks百问百答33:数据同步怎么配置ODPS分区?


下一篇:【Docker】Dockerfile 之 EXPOSE