ADB PG版基于Flink 自定义conenctor支持读取(维表)和写入(结果表)。通过Flink SQL即可实现对ADB PG版的访问。
前提条件
版本要求
Flink 1.11及以上版本
ADBPG 6.0版本;
网络要求
ADBPG实例与Flink实例在同一VPC下;
ADBPG设置白名单,开放对Flink实例的网络访问。
操作步骤
设置ADBPG实例
1、购买6.0版本ADBPG实例,创建账号,并设置白名单:
2、连接数据库,创建待写入目标表、和待查询源数据表:
create table test6(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean);
insert into test6 values(0 ,'b0', 'c0', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(0 ,'b0', 'c0', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(1 ,'b1', 'c1', 41, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(1 ,'b1', 'c1', 41, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(2 ,'b2', 'c2', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(2 ,'b2', 'c2', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(3 ,'b3', 'c3', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(3 ,'b3', 'c3', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(4 ,'b4', 'c4', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(4 ,'b4', 'c4', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(5 ,'b5', 'c5', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(5 ,'b5', 'c5', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(6 ,'b6', 'c6', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(6 ,'b6', 'c6', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(7 ,'b7', 'c7', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(7 ,'b7', 'c7', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(8 ,'b8', 'c8', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(8 ,'b8', 'c8', 40, 50, 60, 70, 80.2, 90.2, 'false');
insert into test6 values(9 ,'b9', 'c9', 40, 50, 60, 70, 80.1, 90.1, 'true');
insert into test6 values(9 ,'b9', 'c9', 40, 50, 60, 70, 80.2, 90.2, 'false');
create table test7(a int,b text,c text,d int,e int, f int, g bigint, h float, i double precision, j boolean, k int,l text,m text,n int,o int, p int, q bigint, r float, s double precision, t boolean);
Flink作业开发
1、创建Flink vvp版实例,要保证Flink实例与ADBPG实例处于同一个VPC下;
2、创建SQL作业
3、作业开发
代码参考:
CREATE TEMPORARY TABLE datagen_source2(
a INT,
b VARCHAR,
c CHAR(15),
d TINYINT,
e SMALLINT,
f INT,
g BIGINT,
h FLOAT,
i DOUBLE,
j BOOLEAN,
`proctime` AS PROCTIME()
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adbpg_dim2 (
a INT,
b VARCHAR,
c CHAR(15),
d TINYINT,
e SMALLINT,
f INT,
g BIGINT,
h FLOAT,
i DOUBLE,
j BOOLEAN
) with (
'connector' = 'adbpg',
'password' = 'password',
'tablename' = 'tablename',
'username' = 'username',
'url' = 'jdbc:postgresql://url:port/databasename',
'maxretrytimes' = '2',
'connectionmaxactive' = '5',
'targetschema' = 'public',
'casesensitive' = '0',
'retrywaittime' = '200',
'cache' = 'lru',
'cacheSize'= '1000000',
'cacheTTLMs' = '2000000000');
CREATE TEMPORARY TABLE adbpg_sink2(
a INT,
b VARCHAR,
c CHAR(15),
d TINYINT,
e SMALLINT,
f INT,
g BIGINT,
h FLOAT,
i DOUBLE,
j BOOLEAN,
k INT,
l VARCHAR,
m CHAR(15),
n TINYINT,
o SMALLINT,
p INT,
q BIGINT,
r FLOAT,
s DOUBLE,
t BOOLEAN
) with (
'connector' = 'adbpg',
'password' = 'password',
'tablename' = 'tablename',
'username' = 'username',
'url' = 'jdbc:postgresql://url:port/databasename',
'maxretrytimes' = '2',
'batchsize' = '100',
'connectionmaxactive' = '5',
'conflictmode' = 'ignore',
'usecopy' = '0',
'targetschema' = 'public',
'exceptionmode' = 'ignore',
'casesensitive' = '0',
'writemode' = '0',
'retrywaittime' = '200'
);
insert into adbpg_sink2 select T.a, T.b, T.c, T.d, T.e, T.f, T.g, T.h, T.i, T.j, H.a, H.b, H.c, H.d, H.e, H.f, H.g, H.h, H.i, H.j FROM datagen_source2 AS T JOIN adbpg_dim2 FOR SYSTEM_TIME AS OF T.proctime AS H ON MOD(T.a, 10) = H.a;
4、上传jar包:
5、运行上线:
点击验证、运行、上线,观察日志和数据库判断是否有异常,是否成功写入数据库。
维表参数说明
参数名 |
参数含义 |
备注 |
url |
ADBPG连接地址 |
必填,需要填写内网连接地址。 |
tableName |
ADBPG源表名 |
必填,填写维表对应的ADBPG数据仓库中的表名。 |
userName |
ADBPG用户名 |
必填。 |
password |
ADBPG密码 |
必填。 |
joinMaxRows |
左表一条记录连接右表的最大记录数 |
非必填,表示在一对多连接时,左表一条记录连接右表的最大记录数(默认值为1024)。在一对多连接的记录数过多时,可能会极大的影响流任务的性能,因此您需要增大Cache的内存(cacheSize限制的是左表key的个数)。 |
maxRetryTimes |
单次SQL失败后重试次数 |
非必填,实际执行时,可能会因为各种因素造成执行失败,比如网络或者IO不稳定,超时等原因,ADBPG维表支持SQL执行失败后自动重试,用maxRetryTimes参数可以设定重试次数。默认值为3。 |
connectionMaxActive |
连接池最大连接数 |
非必填,ADBPG维表中内置连接池,设置合理的连接池最大连接数可以兼顾效率和安全性,默认值为5。 |
retryWaitTime |
重试休眠时间 |
非必填,每次SQL失败重试之间的sleep间隔,单位ms,默认值100 |
targetSchema |
查询的ADBPG schema |
非必填,默认值public |
caseSensitive |
是否大小写敏感 |
非必填,默认值0,即不敏感;填1可以设置为敏感; |
cache |
缓存策略 |
目前分析型数据库PostgreSQL版支持以下三种缓存策略:
|
cacheSize |
设置LRU缓存的最大行数 |
非必填,默认为10000行 |
cacheTTLMs |
缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。 |
非必填,单位为毫秒。默认不设置此参数,表示不重新加载维表中的新数据。 |
结果表参数说明
参数 |
注释说明 |
是否必选 |
备注 |
type |
类型 |
是 |
固定值,为adbpg |
url |
jdbc连接地址 |
是 |
分析型数据库PostgreSQL版数据库的jdbc连接地址 。 格式为:'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>' 其中<yourNetworkAddress>为目标分析型数据库PostgreSQL版数据库的主机地址,<PortId>为连接端口,<yourDatabaseName>为连接的数据库。 示例: |
tableName |
表名 |
是 |
无。 |
username |
账号 |
是 |
无。 |
password |
密码 |
是 |
无。 |
maxRetryTimes |
写入重试次数 |
否 |
默认为3。 |
useCopy |
是否采用copy API写入数据 |
否 |
默认为1,表示采用copy API方式写入; 当取值为0时,代表根据writeMode字段采用其他方式写入数据。 |
batchSize |
一次批量写入的条数 |
否 |
默认值为5000。 |
exceptionMode |
当存在写入过程中出现异常时的处理策略 |
否 |
支持以下两种取值: 1)"ignore": 忽略出现导致写入异常的数据; 2)"strict": 日志记录导致写入异常的数据,然后停止任务; 默认取值为"ignore" |
conflictMode |
当出现主键冲突或者唯一索引冲突时的处理策略 |
否 |
支持以下三种取值: 1)"ignore": 忽略出现导致主键冲突的数据; 2)"strict": 日志记录导致主键冲突的数据,然后停止任务; 3)"update":当出现主键冲突时更新为新值。 4) "upsert": 以insert on conflict方式处理主键冲突。 默认取值为"ignore" |
targetSchema |
schema名称 |
否 |
默认值为"public"。 |
writeMode |
在useCopy字段基础上,更细分的写入方式 |
否 |
默认值为1,代表采用copy API写入数据; 在useCopy字段为0的场景下,可以设定writeMode字段采用其他写入方式: writeMode=0 :采用insert方式写入数据; writeMode=2:采用upsert方式写入数据。 upsert含义见文档 注意采用upsert方式写入时需要设定主键字段,设定主键的方式参考示例语句。 |
/