关系型数据库(RDS)
阿里云关系型数据库(Relational Database Service)简称RDS是一种稳定可靠、可弹性伸缩的在线数据库服务。基于阿里云分布式文件系统和高性能存储,RDS支持MySQL、SQL Server、PostgreSQL和PPAS(Postgre Plus Advanced Server,一种高度兼容 Oracle 的数据库)引擎,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。
注意:
关系型数据库(RDS/DRDS)插件中的WITH参数一致,可以通用。
在使用关系型数据库(RDS/DRDS)作为结果表时,RDS或DRDS中必须要有真实的表存在。
DDL定义
实时计算支持使用RDS/DRDS作为结果输出(目前仅支持MySql数据存储类型)。示例代码如下。
create table rds_output(
id int,
len int,
content VARCHAR,
primary key(id,len)
) with (
type='rds',
url='jdbc:mysql:XXXXXXXXXX',
tableName='test4',
userName='test',
password='XXXXXX'
);
注意:
- 实时计算写入RDS/DRDS数据库结果表原理:针对实时计算每行结果数据,拼接成一行SQL向目标端数据库进行执行。如果使用批量写,需要在url后面加上参数
?rewriteBatchedStatements=true
,否则性能较差。- RDS/MySQL支持自增主键。如果需要让实时计算写入数据支持自增主键,在DDL中不声明该自增字段即可。
例如,ID是自增字段,实时计算DDL不写出该自增字段,则数据库在一行数据写入过程中会自动填补相关的自增字段。- 如果DRDS有分区表,拆分键必须在实时计算DDL里
primary key()
中声明,否则拆分的表无法写入。关于DRDS分库分表的概念可参见DRDS分库分表。- 建议使用数据存储,参见数据存储云数据库(RDS)。
WITH参数
参数 | 注释说明 | 备注 |
---|---|---|
url | 地址 | RDS的URL地址, DRDS的URL地址 |
tableName | 表名 | 无 |
userName | 用户名 | 无 |
password | 密码 | 无 |
maxRetryTimes | 最大尝试插入次数 | 可选,默认为3 |
batchSize | 每次写的批次大小 | 可选,默认值50,表示每次写多少条。 |
bufferSize | 去重的buffer大小,需要指定主键才生效 | 可选,默认值1(1.4.1默认值改为1000),表示输入的数据达到1条就开始输出。 |
flushIntervalMs | 写超时时间 | 可选,单位毫秒,默认值5000,表示数据超过了5秒,还没有写过,就会将缓存的数据都写一次。 |
excludeUpdateColumns | 对相同key的值更新时排除掉相应的column | 可选,默认为空(primary keys 字段默认会排除) |
ignoreDelete | 是否忽略delete操作 | 默认为false |
partitionBy | 写入Sink节点前,会根据该值做hash。数据会流向相应的hash节点。 | 可选,默认为空。 |
类型映射
RDS字段类型 | 实时计算字段类型 |
---|---|
text | varchar |
byte | varchar |
integer | int |
long | bigint |
double | double |
date | varchar |
datetime | varchar |
timestamp | varchar |
time | varchar |
year | varchar |
float | float |
decimal | decimal |
char | varchar |
JDBC 连接参数
参数名称 | 参数说明 | 缺省值 | 最低版本要求 |
---|---|---|---|
useUnicode | 是否使用Unicode字符集,如果参数characterEncoding设置为gb2312或gbk,本参数值必须设置为true。 | false | 1.1g |
characterEncoding | 当useUnicode设置为true时,指定字符编码。比如可设置为gb2312或gbk。 | false | 1.1g |
autoReconnect | 当数据库连接异常中断时,是否自动重新连接。 | false | 1.1 |
autoReconnectForPools | 是否使用针对数据库连接池的重连策略。 | false | 3.1.3 |
failOverReadOnly | 自动重连成功后,连接是否设置为只读。 | true | 3.0.12 |
maxReconnects | autoReconnect设置为true时,重试连接的次数。 | 3 | 1.1 |
initialTimeout | autoReconnect设置为true时,两次重连之间的时间间隔,单位:秒。 | 2 | 1.1 |
connectTimeout | 和数据库服务器建立socket连接时的超时,单位:毫秒。 0表示永不超时,适用于JDK 1.4及更高版本。 | 0 | 3.0.1 |
socketTimeout | socket操作(读写)超时,单位:毫秒。 0表示永不超时。 | 0 | 3.0.1 |
FAQ
Q:Flink的结果数据写入RDS表,是按主键更新的,还是新生成一条记录?
A:如果在DDL中定义了主键,会采用insert into on duplicate key update
的方式更新记录,也就意味着对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。
如果DDL中没有声明primary key,则会用insert into
方式插入记录,追加数据。
Q:使用RDS表中的唯一索引做GROUP BY需要注意什么?
A:RDS中只有一个自增主键,实时计算作业中不能声明为Primary Key;如果需要使用RDS表中的唯一索引做GROUP BY,需要在作业中的Primary Key中声明这些唯一索引。
本文转自实时计算——创建云数据库(RDS/DRDS)结果表