标签
PostgreSQL , 后台任务 , DBLINK 异步调用
背景
使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。
例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。
《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》
《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》
《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》
并行后台任务接口实现
接口效果:
select run_sqls_parallel (
参数1:并行度,
参数2:要执行的SQLs(数组呈现)
);
实现
1、创建dblink插件
create extension if not exists dblink;
2、创建一个建立连接函数,不报错
create or replace function conn(
name, -- dblink名字
text -- 连接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
3、创建跑多任务的接口函数
create or replace function run_sqls_parallel(
parallels int, -- 并行度
sqls text[], -- 需要执行的SQLs
conn text default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database()) -- 连接串
)
returns setof record as $$
declare
app_prefix_stat text := md5(random()::text); -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)
app_prefix text := md5(random()::text); -- application, dblink name prefix
i int := 1; -- 任务ID变量,1累加
app_conn_name text; -- application_name, dblink conn name = app_prefix+i
sql text; -- SQL 元素
current_conns int := 0; -- 当前活跃的异步调用
begin
-- 建立获取实时pg_stat_activity内容连接
perform conn(app_prefix_stat, conn||app_prefix_stat);
foreach sql in array sqls
loop
-- 当前是否有空闲异步连接
select application_name into app_conn_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))
as t(application_name text);
-- 有空闲异步连接
if found then
-- 消耗掉上一次异步连接的结果,否则会报错。
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
-- 发送异步DBLINK调用
perform dblink_send_query(app_conn_name, sql);
-- 无空闲异步连接
else
-- 当前已建立的异步连接数
select cn into current_conns from
dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))
as t(cn int);
loop
-- 达到并行度
if current_conns >= parallels then
-- 是否有空闲异步连接
select application_name into app_conn_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))
as t(application_name text);
-- 有
if found then
-- 消耗掉上一次异步连接的结果,否则会报错。
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
-- 发送异步DBLINK调用
perform dblink_send_query(app_conn_name, sql);
-- 退出循环
exit;
-- 没有,等
else
perform pg_sleep(1);
raise notice 'current running tasks: %, waiting idle conns.', current_conns;
end if;
-- 未达到并行度
else
-- 建立连接
perform conn(app_prefix||i, conn||app_prefix||i); -- 建立连接。
-- 发送异步DBLINK调用
perform dblink_send_query(app_prefix||i, sql);
-- 连接suffix序号 递增
i := i+1;
-- 退出循环
exit;
end if;
end loop;
end if;
end loop;
loop
-- 当前已建立的异步连接数
select cn into current_conns from
dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))
as t(cn int);
if current_conns=0 then
raise notice 'whole tasks done.';
for app_conn_name in
select application_name from
dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))
as t(application_name text)
loop
return query select a from dblink_get_result(app_conn_name, false) as t(a text);
end loop;
return;
else
raise notice 'the last % tasks running.', current_conns;
perform pg_sleep(1);
end if;
end loop;
end;
$$ language plpgsql strict;
试用
1、运行5条SQL,开2个并行任务
select * from run_sqls_parallel(
2, -- 并行度
-- 执行如下SQL数组
array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']
)
as t(a text);
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: current running tasks: 2, waiting idle conns.
NOTICE: the last 2 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: the last 1 tasks running.
NOTICE: whole tasks done.
run_sqls_parallel
-------------------
(1 row)
Time: 30070.275 ms (00:30.070)
2、运行10个并行任务,跑6条SQL
postgres=# select * from run_sqls_parallel(
10, -- 并行度
-- 执行如下SQL数组
array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']
)
as t(a text);
NOTICE: the last 6 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: the last 5 tasks running.
NOTICE: whole tasks done.
run_sqls_parallel
-------------------
(1 row)
Time: 10050.064 ms (00:10.050)
完全符合预期。
3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务
create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);
do language plpgsql $$
declare
tables name[] := array['t1','t2','t3']; -- 表名
n name; -- 表名
x name; -- 字段名
i int; -- LOOP值
sql text;
sqls text[];
tbs name := 'tbs1'; -- 索引表空间
begin
set maintenance_work_mem='2GB';
foreach n in array tables loop
i := 1;
for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped
loop
-- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引
sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs); -- 封装创建索引的SQL
sqls := array_append(sqls, sql);
i:=i+1;
end loop;
end loop;
perform * from run_sqls_parallel(
10, -- 并行度
sqls -- 执行index SQL数组
) as t(a text);
foreach n in array tables loop
execute format('analyze %s', n);
end loop;
end;
$$;
完全符合预期。
小结
本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。
接口效果:
select * from run_sqls_parallel (
参数1:并行度,
参数2:要后台并行执行的SQLs(数组呈现)
)
as t(a text);
参考
《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》
《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》