PostgreSQL Oracle 兼容性之 - PL/SQL FORALL, BULK COLLECT

Oracle PL/SQL 开发的童鞋,一定对O家的bulk批量处理的性能很是赞赏吧。
但是PostgreSQL用户请不要垂涎,作为学院派和工业界的一颗璀璨明珠。
开源数据库PostgreSQL,也有对应的批量处理策略哦,而且看起来性能完全不输Oracle。
下面是一组LOOP和BULK的性能测试数据
PostgreSQL Oracle 兼容性之 - PL/SQL FORALL, BULK COLLECT
PostgreSQL Oracle 兼容性之 - PL/SQL FORALL, BULK COLLECT
PostgreSQL Oracle 兼容性之 - PL/SQL FORALL, BULK COLLECT

一起来耍耍吧,先看看Oracle怎么耍的。

Oracle PL/SQL FORALL, BULK COLLECT

为什么Oracle的PL/SQL过程语言需要bulk处理SQL,看一张图你就明白了,因为过程语言handler和SQL之间需要切换,如果是一个较大的LOOP,切换一多,性能就会下降严重。
PostgreSQL Oracle 兼容性之 - PL/SQL FORALL, BULK COLLECT
因此对于在PL/SQL需要多次调用SQL的处理场景,Oracle想到了bulk collect的处理方法。
比如用户提交一个数组,要求PL/SQL将这个数组的元素一条条插入到表里面,或者拿来更新表里面的值,又或是删除表里的值。
Oracle官网的例子
http://www.oracle.com/technetwork/issue-archive/2012/12-sep/o52plsql-1709862.html
原始的处理方法,通过query loop来更新某个表的值,pl/sql引擎和sql引擎之间需要切换100次(即循环次数)

Code Listing 1: increase_salary procedure with FOR loop 

PROCEDURE increase_salary (
   department_id_in   IN employees.department_id%TYPE,
   increase_pct_in    IN NUMBER)
IS
BEGIN
   FOR employee_rec
      IN (SELECT employee_id
            FROM employees
           WHERE department_id =
                    increase_salary.department_id_in)
   LOOP
      UPDATE employees emp
         SET emp.salary = emp.salary + 
             emp.salary * increase_salary.increase_pct_in
       WHERE emp.employee_id = employee_rec.employee_id;
   END LOOP;
END increase_salary;
 

Suppose there are 100 employees in department 15. When I execute this block, 

BEGIN
   increase_salary (15, .10);
END;
 

the PL/SQL engine will “switch” over to the SQL engine 100 times, once for each row being updated. Tom Kyte, of AskTom (asktom.oracle.com), refers to row-by-row switching like this as “slow-by-slow processing,” and it is definitely something to be avoided.

把它改成bluk的模式

Code Listing 4: Bulk processing for the increase_salary procedure 

1  CREATE OR REPLACE PROCEDURE increase_salary (
 2     department_id_in   IN employees.department_id%TYPE,
 3     increase_pct_in    IN NUMBER)
 4  IS
 5     TYPE employee_ids_t IS TABLE OF employees.employee_id%TYPE
 6             INDEX BY PLS_INTEGER; -- 这里有点意思,用了某个字段作为IDX,PG里可用hstore或者一个数组来表示
 7     l_employee_ids   employee_ids_t;
 8     l_eligible_ids   employee_ids_t;
 9
10     l_eligible       BOOLEAN;
11  BEGIN
12     SELECT employee_id
13       BULK COLLECT INTO l_employee_ids  --  批量的插入到一个类似数组的对象中
14       FROM employees
15      WHERE department_id = increase_salary.department_id_in;
16
17     FOR indx IN 1 .. l_employee_ids.COUNT
18     LOOP
19        check_eligibility (l_employee_ids (indx),
20                           increase_pct_in,
21                           l_eligible);
22
23        IF l_eligible
24        THEN
25           l_eligible_ids (l_eligible_ids.COUNT + 1) :=
26              l_employee_ids (indx);
27        END IF;
28     END LOOP;
29
30     FORALL indx IN 1 .. l_eligible_ids.COUNT  -- 批处理,不需要在pl/sql和sql引擎之间多次切换了
31        UPDATE employees emp
32           SET emp.salary =
33                    emp.salary
34                  + emp.salary * increase_salary.increase_pct_in
35         WHERE emp.employee_id = l_eligible_ids (indx);
36  END increase_salary;

效果

作者的描述, 插入10万记录,从4.94秒提升到了0.12秒。不过他没有给出具体的代码。

On my laptop running Oracle Database 11g Release 2, it took 4.94 seconds to insert 100,000 rows, one at a time. With FORALL, those 100,000 were inserted in 0.12 seconds. Wow!   


PostgreSQL plpgsql BULK SQL

耍完Oracle,接下该是PostgreSQL登场了。
模拟的例子是批量插入,更新,删除,或查询. (下面是在一个虚拟机上进行的测试)
我这里用了两种方法来表示批量数据,标量数组以及hstore数组。

测试表

create table test(id int, info text);

类似Oracle FORALL的批量插入用法1
用一个数组表示条件,另一个数组表示VALUE
如果有多个条件或者value时,可以用record数组或者hstore(Key-Value类型)数组来表示。

CREATE OR REPLACE FUNCTION public.f_bulk_insert1(i_k integer[], i_v text[])
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare 
  i_length int := array_length(i_k,1);
  s timestamp;
  e timestamp;
begin 
  s := clock_timestamp(); 
  raise notice 'start: %', s;
  insert into test select i_k[i], i_v[i] from generate_series(1, i_length) t(i); 
  e := clock_timestamp(); 
  raise notice 'end: %, %', e, e-s; 
end;
$function$;

类似Oracle FORALL的批量插入用法2
使用hstore表示KV

create extension hstore;

CREATE OR REPLACE FUNCTION public.f_bulk_insert2(i_kv hstore[])
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare 
  i_length int := array_length(i_kv,1);
  s timestamp;
  e timestamp;
begin 
  s := clock_timestamp(); 
  raise notice 'start: %', s;
  insert into test select ((i_kv[i])->'k')::int, (i_kv[i])->'v' from generate_series(1,i_length) t(i); 
  e := clock_timestamp(); 
  raise notice 'end: %, %', e, e-s; 
end;
$function$;

为了对比性能,我这里也用了传统的LOOP方法

CREATE OR REPLACE FUNCTION public.f_loop_insert(i_k integer[], i_v text[])
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare 
  i_length int := array_length(i_k,1); 
  i int;
  s timestamp;
  e timestamp;
begin 
  s := clock_timestamp(); 
  raise notice 'start: %', s;
  for i in 1..i_length loop
    insert into test values (i_k[i], i_v[i]); 
  end loop;
  e := clock_timestamp(); 
  raise notice 'end: %, %', e, e-s; 
end;
$function$;

测试
插入10万

select f_bulk_insert1(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,100000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:23:09.240424
NOTICE:  end: 2016-06-15 14:23:09.322634, 00:00:00.08221

select f_bulk_insert2(kv) from (
  select array_agg(kv) as kv from (
    select hstore(t) as kv from (select k, md5(random()::text) v from generate_series(1,100000) t(k)) t
  ) t
) t;
NOTICE:  start: 2016-06-15 14:23:58.59405
NOTICE:  end: 2016-06-15 14:23:58.703137, 00:00:00.109087


select f_loop_insert(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,100000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:24:24.598243
NOTICE:  end: 2016-06-15 14:24:24.959381, 00:00:00.361138

插入100万

select f_bulk_insert1(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,1000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:24:53.105227
NOTICE:  end: 2016-06-15 14:24:53.866958, 00:00:00.761731

select f_bulk_insert2(kv) from (
  select array_agg(kv) as kv from (
    select hstore(t) as kv from (select k, md5(random()::text) v from generate_series(1,1000000) t(k)) t
  ) t
) t;
NOTICE:  start: 2016-06-15 14:25:04.706203
NOTICE:  end: 2016-06-15 14:25:05.788041, 00:00:01.081838

select f_loop_insert(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,1000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:25:15.812975
NOTICE:  end: 2016-06-15 14:25:19.391425, 00:00:03.57845

插入1000万

select f_bulk_insert1(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,10000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:25:46.647381
NOTICE:  end: 2016-06-15 14:25:54.362679, 00:00:07.715298

select f_bulk_insert2(kv) from (
  select array_agg(kv) as kv from (
    select hstore(t) as kv from (select k, md5(random()::text) v from generate_series(1,10000000) t(k)) t
  ) t
) t;
NOTICE:  start: 2016-06-15 14:27:24.782828
NOTICE:  end: 2016-06-15 14:27:36.035167, 00:00:11.252339

select f_loop_insert(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,10000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 14:28:04.030109
NOTICE:  end: 2016-06-15 14:28:40.120863, 00:00:36.090754



更新的例子

drop table test;
create table test(id int primary key, info text);

select f_bulk_insert1(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,1000000) t(k)
  ) t
) t;

CREATE OR REPLACE FUNCTION public.f_bulk_update(i_k integer[], i_v text[])
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare 
  i_length int := array_length(i_k,1);
  s timestamp;
  e timestamp;
begin 
  s := clock_timestamp(); 
  raise notice 'start: %', s;
  update test set info=i_v[i] from (select i from generate_series(1, i_length) as t(i)) t where id=i_k[i]; 
  e := clock_timestamp(); 
  raise notice 'end: %, %', e, e-s; 
end;
$function$;

select f_bulk_update(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,1000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 15:10:47.453093
NOTICE:  end: 2016-06-15 15:10:52.351686, 00:00:04.898593


CREATE OR REPLACE FUNCTION public.f_loop_update(i_k integer[], i_v text[])
 RETURNS void
 LANGUAGE plpgsql
 STRICT
AS $function$
declare 
  i_length int := array_length(i_k,1); 
  i int;
  s timestamp;
  e timestamp;
begin 
  s := clock_timestamp(); 
  raise notice 'start: %', s;
  for i in 1..i_length loop
    update test set info=i_v[i] where id=i_k[i]; 
  end loop;
  e := clock_timestamp(); 
  raise notice 'end: %, %', e, e-s; 
end;
$function$;

select f_loop_update(k,v) from (
  select array_agg(k) as k, array_agg(v) as v from (
    select k, md5(random()::text) v from generate_series(1,1000000) t(k)
  ) t
) t;
NOTICE:  start: 2016-06-15 15:11:08.170183
NOTICE:  end: 2016-06-15 15:11:21.350471, 00:00:13.180288

删除的例子就不再举例了,都差不多。

小结

.1. Oracle PL/SQL FORALL, BULK COLLATE 模式对于LOOP次数超过一定阈值时,性能提升非常多,可能和它的PL/sql语言设计有关。
.2. PostgreSQL plpgsql 语言的处理效率是非常高的,即使不使用BULK模式,你可以看到,LOOP的性能也已经非常好了,使用BULK模式后,性能更上一层。
.3. 目前可能有个query cache的问题,请注意,已报给了社区。后面会有改PG内核的解决方法。
https://www.postgresql.org/message-id/20160615054752.5792.1646%40wrigleys.postgresql.org
(建议可以设置function 的spi次数,或者直接使用generic plan)
http://blog.163.com/digoal@126/blog/static/1638770402012112452432251/
测试的时候你可以先调用5次小数据了(SPI),PLAN CACHE了再调用大的,否则会悲剧的,代码如下。

src/backend/utils/cache/plancache.c  
static bool
choose_custom_plan(CachedPlanSource *plansource, ParamListInfo boundParams)
{

00860     /* Generate custom plans until we have done at least 5 (arbitrary) */
00861     if (plansource->num_custom_plans < 5)
00862         return true;

    if (plansource->generic_cost < avg_custom_cost)
        return false;

    return true;

改动一下代码,重新编译PG,重启数据库即可。(让它一来就使用generic plan)

    /* Generate custom plans until we have done at least 5 (arbitrary) */
    // if (plansource->num_custom_plans < 5)
    //     return true;
...
    if (plansource->generic_cost < avg_custom_cost)
        return false;

    // return true;
    return false;

当然最好的解决办法是做成创建函数时的option,指定这个函数是否需要custom plan.

参考

http://docs.oracle.com/cd/B19306_01/appdev.102/b14261/tuning.htm#i48876
http://docs.oracle.com/cd/B19306_01/appdev.102/b14261/forall_statement.htm
http://www.postgresql.org/docs/9.5/static/arrays.html
http://www.oracle.com/technetwork/issue-archive/2012/12-sep/o52plsql-1709862.html
http://blog.csdn.net/leshami/article/details/7536926

上一篇:在Google Cloud platform上的Kubernetes集群部署HANA Express


下一篇:线程优先级|学习笔记