本文介绍如何通过数据集成在PolarDB-X中进行数据导入和导出。
数据集成是阿里巴巴集团提供的数据同步平台。该平台具备可跨异构数据存储系统、可靠、安全、低成本、可弹性扩展等特点,可为20多种数据源提供不同网络环境下的离线(全量或增量)数据进出通道。
使用数据集成,您可以在PolarDB-X完成以下数据同步任务:
- 将PolarDB-X的数据同步到到其他的数据源里,并将数据进行相应的处理;
- 将处理好的其他数据源数据同步到PolarDB-X。
本文包含以下内容:
流程概述
数据同步流程主要包含以下几个步骤:
- 第一步:数据源端新建表
- 第二步:添加数据源
- 第三步:向导模式或脚本模式配置同步任务
- 第四步:运行同步任务,检查目标端的数据质量
准备工作
使用数据集成在PolarDB-X进行数据导入导出之前,请先注册阿里云账号并参考创建RAM子账号文档,完成以下准备工作:
- 开通阿里云官网实名认证账号,创建好账号的访问密钥,即 AccessKeys。
- 开通 MaxCompute,这样会自动产生一个默认的 ODPS 的数据源,并使用主账号登录大数据开发套件。
- 创建项目。您可以在项目中协作完成工作流,共同维护数据和任务等,因此使用大数据开发套件之前需要先创建一个项目。
- 如果想通过子账号创建数据集成任务,可以赋予其相应的权限。
新添加数据源
下面以添加PolarDB-X的数据源为例。
注意:只有项目管理员角色才能够新建数据源,其他角色的成员仅能查看数据源。
- 以项目管理员身份登录数加管理控制台。
- 在项目列表中对应项目的操作栏单击进入工作区。
- 进入顶部菜单栏中的数据集成页面,单击左侧导航栏中的数据源。
- 点击右上角的新增数据源,如下图所示:
- 在新增数据源弹出框中填写相关配置项,如下图所示:针对PolarDB-X数据源配置项的具体说明如下:
- 数据源名称:由英文字母、数字、下划线组成且需以字符或下划线开头,长度不超过 60 个字符 。
- 数据源描述:对数据源进行简单描述,不得超过 80 个字符 。
- 数据源类型:当前选择的数据源类型 DRDS。
- 网络类型:当前选择的网络类型。
- JDBCUrl:JDBC 连接信息,格式为
jdbc://mysql://serverIP:Port/database
。
- 用户名/密码:对应的用户名和密码。
- 数据源名称:由英文字母、数字、下划线组成且需以字符或下划线开头,长度不超过 60 个字符 。
- 完成上述信息项的配置后,单击测试连通性。
- 测试连通性通过后,单击确定。
通过数据集成导入数据
下文以通过数据集成的向导模式将 MaxCompute(原 ODPS)数据同步到PolarDB-X为例。
- 在数据集成页面,新建同步任务。
- 向导模式:向导模式是可视化界面配置同步任务, 一共涉及五步选择来源,选择目标,字段映射,通道控制,预览保存五个步骤。在每个不同的数据源之间,这几步的界面可能有不同的内容。向导模式可以转换成脚本模式。
- 脚本模式:进入脚本界面你可以选择相应的模板,此模板包含了同步任务的主要参数,将相关的信息填写完整,但是脚本模式不能转化成向导模式。
- 选择数据来源。选择 MaxCompute 数据源及源头表 mytest,数据浏览默认是收起的,选择后单击下一步:
- 选择目标。选择PolarDB-X数据源及目标表contact_infos,选择后单击下一步:
- preSql:执行数据同步任务之前率先执行的 SQL 语句。目前向导模式只允许执行一条 SQL 语句,脚本模式可以支持多条 SQL 语句,例如清除旧数据。
- postSql:执行数据同步任务之后执行的 SQL 语句。目前向导模式只允许执行一条 SQL 语句,脚本模式可以支持多条 SQL 语句,例如加上某一个时间戳。
- 选择字段的映射关系。左侧源头表字段和右侧目标表字段为一一对应的关系,如下图所示。
- 在通道控制页面单击下一步,配置作业速率上限和脏数据检查规则。
- 作业速率上限:是指数据同步作业可能达到的最高速率,其最终实际速率受网络环境、数据库配置等的影响。
- 作业并发数:作业速率上限=作业并发数 * 单并发的传输速率。
- 当作业速率上限已选定的情况下,可以根据以下原则选择并发数:
- 如果你的数据源是线上的业务库,建议您不要将并发数设置过大,以防对线上库造成影响;
- 如果您对数据同步速率特别在意,建议您选择最大作业速率上限和较大的作业并发数。
- 完成以上配置后,上下滚动鼠标可查看任务配置,确定无误后单击保存。
- 单击运行任务,直接运行同步任务结果。您可以将同步任务提交到调度系统中,调度系统会按照配置属性从第二天开始自动定时执行。
脚本模式配置同步任务
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "lzz_odps",//数据源的名称,建议都添加数据源后进行同步
"table": "mytest",//数据来源的表名
"partition": "",//分区信息
"column": [
"id"
]
}
},
"writer": {
"plugin": "drds",
"parameter": {
"datasource": "l_Drds_w",//数据源的名称,建议都添加数据源后进行同步
"table": "contact_infos",//目的表名
"preSql": [],//导入前准备语句
"postSql": [],//导入后准备语句
"column": [
"id"
]
}
},
"setting": {
"speed": {
"mbps": "1",//一个并发的速率上线是1MB/S
"concurrent": "1"//并发的数目
}
}
}
}
通过数据集成导出数据
下文以通过向导模式将PolarDB-X数据同步到 MaxCompute 为例。
- 在数据集成页面,新建同步任务。
- 选择数据来源。选择PolarDB-X数据源及源头表 bit_type_test。数据浏览默认是收起的,选择后单击下一步,如下图所示:
-
过滤条件:筛选条件,DrdsReader 根据指定的 column、table、where 条件拼接 SQL,并根据这个 SQL 进行数据抽取 。例如在做测试时,可以将 where 条件指定实际业务场景,往往会选择当天的数据进行同步,可以将 where 条件指定为
STRTODATE(‘${bdp.system.bizdate}’, ‘%Y%m%d’) <= taday AND taday < DATEADD(STRTODATE(‘${bdp.system.bizdate}’, ‘%Y%m%d’), interval 1 day)
。 - 切分键:您可以将源数据表中某一列作为切分键,切分之后可进行并发数据同步。目前仅支持整型字段;建议使用主键或有索引的列作为切分键。
-
过滤条件:筛选条件,DrdsReader 根据指定的 column、table、where 条件拼接 SQL,并根据这个 SQL 进行数据抽取 。例如在做测试时,可以将 where 条件指定实际业务场景,往往会选择当天的数据进行同步,可以将 where 条件指定为
- 选择 MaxCompute 数据源及目标表 mytest,选择后单击下一步。
- 单击下一步,选择字段的映射关系。左侧源头表字段和右侧目标表字段为一一对应的关系:您也可以单击“添加一行”增加映射关系:
- 可以输入常量,输入的值需要使用英文单引号包括,如’abc’、’123’等;
- 可以配合调度参数使用,如’${bdp.system.bizdate}’等;
- 可以输入关系数据库支持的函数,如
now()
、count(1)
等; - 如果您输入的值无法解析,则类型显示为’-‘。
- 在通道控制页面单击下一步,配置作业速率上限和脏数据检查规则。
- 作业速率上限:是指数据同步作业可能达到的最高速率,其最终实际速率受网络环境、数据库配置等的影响。
- 作业并发数:作业速率上限=作业并发数 * 单并发的传输速率。
- 当作业速率上限已选定的情况下,可以按以下原则选择并发数:
- 如果你的数据源是线上的业务库,建议您不要将并发数设置过大,以防对线上库造成影响;
- 如果您对数据同步速率特别在意,建议您选择最大作业速率上限和较大的作业并发数。
- 完成以上配置后,上下滚动鼠标可查看任务配置。确认无误后单击保存。
- 单击运行任务直接运行同步任务结果。您也可以将同步任务提交到调度系统中,调度系统会按照配置属性从第二天开始自动定时执行。
脚本模式配置同步任务
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "drds",
"parameter": {
"datasource": "l_Drds_w",//数据源的名称,建议都添加数据源后进行同步
"table": "bit_type_test",/数据来源的表名
"where": "",
"splitPk": "col2",//切分键
"column": [
"idbit"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "lzz_odps",//数据源的名称,建议都添加数据源后进行同步
"table": "mytest",
"truncate": true,
"partition": "",//分区信息
"column": [
"id"
]
}
},
"setting": {
"speed": {
"mbps": "1",//作业速率上限
"concurrent": "1"//并发数
},
"errorLimit": {
"record": "234"//错误记录数
}
}
}
}