Mongo在DataWorks中的使用

一、query语句,制定数据同步规则。

Mongo在DataWorks中的使用

 

 

同步2021-07-05一天的数据
"{'createTime':{'$gt':NumberLong('1625414400000'),'$lt':NumberLong('1625500800000')}}"

二、

您可以通过该配置型来限制返回MongoDB数据范围,仅支持时间类型。例如您可以配置
"query":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}",限制返回operationTime大于等于${last_day}零点的数据。此处${last_day}为DataWorks调度参数,其中
last_day格式为
yyyy-mm-dd。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等),逻辑操作符(and和or等),函数(max、min、sum、avg和ISODate等)。

三、

同步json脚本

1.官方案例

#官方案例
{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", //数据源名称。
                "collectionName": "tag_data", //集合名称。
                "query": "", // 数据查询过滤。
                "column": [
                    {
                        "name": "unique_id", //字段名称。
                        "type": "string" //字段类型。
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

 

2.同步mongon表,按时间增量同步,并做分区表

同步mongon表,按时间增量同步,并做分区表
同步脚本如下:

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "mongo_member",
                "envType": 0,
                "cursorTimeoutInMs": "600000",
                "query": "{'reportTimeStr':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}",
                "column": [
                    {
                        "name": "_id",
                        "type": "string"
                    },
                    {
                        "name": "_class",
                        "type": "string"
                    },
                    {
                        "name": "did",
                        "type": "string"
                    },
                    {
                        "name": "stage",
                        "type": "string"
                    },
                    {
                        "name": "platform",
                        "type": "string"
                    },
                    {
                        "name": "channel",
                        "type": "string"
                    },
                    {
                        "name": "appVersion",
                        "type": "string"
                    },
                    {
                        "name": "reportWay",
                        "type": "string"
                    },
                    {
                        "name": "reportTime",
                        "type": "long"
                    },
                    {
                        "name": "reportTimeStr",
                        "type": "string"
                    },
                    {
                        "name": "reportDate",
                        "type": "string"
                    }
                ],
                "tableComment": "This kind of datasource dosen't support get table comment. This is a comment produced by di.",
                "batchSize": "1000",
                "collectionName": "device_install_app_info"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "dt=${bizdate}",
                "truncate": true,
                "datasource": "odps_first",
                "envType": 0,
                "column": [
                    "id",
                    "class",
                    "did",
                    "stage",
                    "platform",
                    "chanel",
                    "appversion",
                    "reportway",
                    "reporttime",
                    "reporttimestr",
                    "reportdate"
                ],
                "emptyAsNull": false,
                "table": "ods_lx_mg_member_device_install_app_info"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

 

3.脚本2

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "mongo_member",
                "envType": 0,
                "cursorTimeoutInMs": "600000",
                "query": "{'createTime':{'$gt':NumberLong('1625414400000')}}",
                "column": [
                    {
                        "name": "_id",
                        "type": "string"
                    },
                    {
                        "name": "userId",
                        "type": "string"
                    },
                    {
                        "name": "summaryDate",
                        "type": "string"
                    },
                    {
                        "name": "year",
                        "type": "string"
                    },
                    {
                        "name": "month",
                        "type": "string"
                    },
                    {
                        "name": "day",
                        "type": "string"
                    },
                    {
                        "name": "stage",
                        "type": "string"
                    },
                    {
                        "name": "platform",
                        "type": "string"
                    },
                    {
                        "name": "channel",
                        "type": "string"
                    },
                    {
                        "name": "isPaidUser",
                        "type": "string"
                    },
                    {
                        "name": "isOldUser",
                        "type": "string"
                    },
                    {
                        "name": "createTime",
                        "type": "string"
                    },
                    {
                        "name": "_class",
                        "type": "string"
                    }
                ],
                "tableComment": "This kind of datasource dosen't support get table comment. This is a comment produced by di.",
                "batchSize": "1000",
                "collectionName": "user_request_summary"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": true,
                "datasource": "odps_first",
                "envType": 0,
                "column": [
                    "id",
                    "class",
                    "userid",
                    "summarydate",
                    "year",
                    "month",
                    "day",
                    "stage",
                    "platform",
                    "channel",
                    "ispaiduser",
                    "createtime",
                    "isolduser"
                ],
                "emptyAsNull": false,
                "table": "ods_lx_mg_member_user_request_summary"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "executeMode": null,
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

 

上一篇:为什么在Java中object.wait()/notify()方法必须在持有锁的情况下才能执行?


下一篇:Linux电源管理(7)_Wakeup events framework【转】