标签
PostgreSQL , 多阶段聚合 , 直连segment , gp_dist_random('gp_id')
背景
聚合操作是分析型场景中最常见的需求之一,在Greenplum中,数据已分布存储,聚合操作需要多阶段执行。
实际上PostgreSQL 9.6开始支持并行聚合后,聚合的方法也与分布式数据库的多阶段聚合类似。
创建聚合函数时,必须实现多阶段的API才能够让聚合操作真正的并行起来。
有些插件可能因为某些原因没有实现多阶段聚合
《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》
还有什么方法可以让聚合并行起来呢?
1、gp_dist_random('gp_id')
2、直连segment
3、gpdb mapreduce接口
1 gp_dist_random('gp_id')
PG内部有一个函数接口,优化器会将调用这个函数接口的QUERY弄到SEGMENT直接执行。
统计数据库大小时也用到了
select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');
源码如下
Datum
pg_database_size_name(PG_FUNCTION_ARGS)
{
int64 size = 0;
Name dbName = PG_GETARG_NAME(0);
Oid dbOid = get_database_oid(NameStr(*dbName));
if (!OidIsValid(dbOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist",
NameStr(*dbName))));
size = calculate_database_size(dbOid);
if (Gp_role == GP_ROLE_DISPATCH)
{
StringInfoData buffer;
initStringInfo(&buffer);
appendStringInfo(&buffer, "select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');", NameStr(*dbName));
size += get_size_from_segDBs(buffer.data);
}
PG_RETURN_INT64(size);
}
1、以rb插件为例,当前的聚合是单阶段聚合,收到MASTER后才开始聚合,所以很慢。
test=# explain select rb_and_cardinality_agg(b) from testpay1;
QUERY PLAN
-------------------------------------------------------------------------------------------------
Aggregate (cost=908857.80..908857.81 rows=1 width=4)
-> Gather Motion 256:1 (slice1; segments: 256) (cost=0.00..907979.68 rows=351246 width=37)
-> Seq Scan on testpay1 (cost=0.00..5277.46 rows=1373 width=37)
Settings: effective_cache_size=8GB; gp_statistics_use_fkeys=on
Optimizer status: legacy query optimizer
(5 rows)
2、为了使用gp_dist_random('gp_id')来实现并行多阶段聚合,我们需要定义一个函数接口,让这个接口来执行prefunc的动作,比如先在SEGMENT聚合一次。
test=> create or replace function get_rb(v_sql text) returns roaringbitmap as $$
declare
res roaringbitmap;
begin
execute v_sql into res;
return res;
end;
$$ language plpgsql strict;
CREATE FUNCTION
3、但是很遗憾的是,GPDB内部做了保护,如果UDF里面访问的表不是系统表(replication table,或者叫全副本表,非分布式表),数据库会拒绝直接在segment访问。
(目标:rb_and_agg在所有的segment直接执行,返回rb类型,然后再返回给MASTER,执行candidate操作。实现并行)
test=> explain analyze select get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$) from gp_dist_random('gp_id');
NOTICE: function cannot execute on segment because it accesses relation "public.testpay1" (functions.c:155) (seg3 slice1 11.180.113.94:3068 pid=54354) (cdbdisp.c:1326)
DETAIL:
SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"
PL/pgSQL function "get_rb" line 4 at execute statement
test=> explain analyze select rb_and_cardinality_agg(get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$)) from gp_dist_random('gp_id');
NOTICE: query plan with multiple segworker groups is not supported (cdbdisp.c:302)
HINT: likely caused by a function that reads or modifies data in a distributed table
CONTEXT: SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"
PL/pgSQL function "get_rb" line 4 at execute statement
4、而如果UDF里面访问的是系统表(replication table,或者叫全副本表,非分布式表),数据库允许直接在segment访问。
create or replace function get_catalog(v_sql text) returns int8 as $$
declare
res int8;
begin
execute v_sql into res;
return res;
end;
$$ language plpgsql strict;
test=> explain analyze select get_catalog($$select max(oid::int8) from pg_class$$) from gp_dist_random('gp_id');
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 256:1 (slice1; segments: 256) (cost=0.00..1.01 rows=1 width=0)
Rows out: 256 rows at destination with 2.887 ms to first row, 6.589 ms to end, start offset by 1.203 ms.
-> Seq Scan on gp_id (cost=0.00..1.01 rows=1 width=0)
Rows out: Avg 1.0 rows x 256 workers. Max 1 rows (seg0) with 1.243 ms to first row, 1.244 ms to end, start offset by 3.534 ms.
Slice statistics:
(slice0) Executor memory: 495K bytes.
(slice1) Executor memory: 139K bytes avg x 256 workers, 139K bytes max (seg0).
Statement statistics:
Memory used: 2047000K bytes
Settings: effective_cache_size=8GB; gp_statistics_use_fkeys=on
Optimizer status: legacy query optimizer
Total runtime: 8.015 ms
(12 rows)
5、保护代码如下
src/backend/executor/functions.c
110 /**
111 * Walker for querytree_safe_for_segment.
112 */
113 bool querytree_safe_for_segment_walker(Node *expr, void *context)
114 {
115 Assert(context == NULL);
116
117 if (!expr)
118 {
119 /**
120 * Do not end recursion just because we have reached one leaf node.
121 */
122 return false;
123 }
124
125 switch(nodeTag(expr))
126 {
127 case T_Query:
128 {
129 Query *q = (Query *) expr;
130
131 if (!allow_segment_DML &&
132 (q->commandType != CMD_SELECT
133 || q->intoClause != NULL
134 || q->resultRelation > 0))
135 {
136 ereport(ERROR,
137 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
138 errmsg("function cannot execute on segment because it issues a non-SELECT statement")));
139 }
140
141 ListCell * f = NULL;
142 foreach(f,q->rtable)
143 {
144 RangeTblEntry *rte = (RangeTblEntry *) lfirst(f);
145
146 if (rte->rtekind == RTE_RELATION)
147 {
148 Assert(rte->relid != InvalidOid);
149
150 Oid namespaceId = get_rel_namespace(rte->relid);
151
152 Assert(namespaceId != InvalidOid);
153
154 if (!(IsSystemNamespace(namespaceId) ||
155 IsToastNamespace(namespaceId) ||
156 IsAoSegmentNamespace(namespaceId)))
157 {
158 ereport(ERROR,
159 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
160 errmsg("function cannot execute on segment because it accesses relation \"%s.%s\"",
161 quote_identifier(get_namespace_name(namespaceId)),
162 quote_identifier(get_rel_name(rte->relid)))));
163 }
164 }
165 }
166 query_tree_walker(q, querytree_safe_for_segment_walker, context, 0);
167 break;
168 }
169 default:
170 break;
171 }
172
173 return expression_tree_walker(expr, querytree_safe_for_segment_walker, context);
174 }
src/backend/cdb/dispatcher/cdbdisp.c
36 /*
37 * cdbdisp_dispatchToGang:
38 * Send the strCommand SQL statement to the subset of all segdbs in the cluster
39 * specified by the gang parameter. cancelOnError indicates whether an error
40 * occurring on one of the qExec segdbs should cause all still-executing commands to cancel
41 * on other qExecs. Normally this would be true. The commands are sent over the libpq
42 * connections that were established during cdblink_setup. They are run inside of threads.
43 * The number of segdbs handled by any one thread is determined by the
44 * guc variable gp_connections_per_thread.
45 *
46 * The caller must provide a CdbDispatchResults object having available
47 * resultArray slots sufficient for the number of QEs to be dispatched:
48 * i.e., resultCapacity - resultCount >= gp->size. This function will
49 * assign one resultArray slot per QE of the Gang, paralleling the Gang's
50 * db_descriptors array. Success or failure of each QE will be noted in
51 * the QE's CdbDispatchResult entry; but before examining the results, the
52 * caller must wait for execution to end by calling CdbCheckDispatchResult().
53 *
54 * The CdbDispatchResults object owns some malloc'ed storage, so the caller
55 * must make certain to free it by calling cdbdisp_destroyDispatcherState().
56 *
57 * When dispatchResults->cancelOnError is false, strCommand is to be
58 * dispatched to every connected gang member if possible, despite any
59 * cancellation requests, QE errors, connection failures, etc.
60 *
61 * NB: This function should return normally even if there is an error.
62 * It should not longjmp out via elog(ERROR, ...), ereport(ERROR, ...),
63 * PG_THROW, CHECK_FOR_INTERRUPTS, etc.
64 */
65 void
66 cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
67 struct Gang *gp,
68 int sliceIndex,
69 CdbDispatchDirectDesc *disp_direct)
70 {
71 struct CdbDispatchResults *dispatchResults = ds->primaryResults;
72
73 Assert(Gp_role == GP_ROLE_DISPATCH);
74 Assert(gp && gp->size > 0);
75 Assert(dispatchResults && dispatchResults->resultArray);
76
77 if (dispatchResults->writer_gang)
78 {
79 /*
80 * Are we dispatching to the writer-gang when it is already busy ?
81 */
82 if (gp == dispatchResults->writer_gang)
83 {
84 if (dispatchResults->writer_gang->dispatcherActive)
85 {
86 ereport(ERROR,
87 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
88 errmsg("query plan with multiple segworker groups is not supported"),
89 errhint("likely caused by a function that reads or modifies data in a distributed table")));
90 }
91
92 dispatchResults->writer_gang->dispatcherActive = true;
93 }
94 }
95
96 /*
97 * WIP: will use a function pointer for implementation later, currently just use an internal function to move dispatch
98 * thread related code into a separate file.
99 */
100 (pDispatchFuncs->dispatchToGang)(ds, gp, sliceIndex, disp_direct);
101 }
6、如果你的GPDB没有RB插件,可以使用普通类型测试模拟这个问题
postgres=# create table test(id int, info text, crt_time timestamp);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
postgres=# insert into test select id, md5(random()::text), clock_timestamp() from generate_series(1,1000000) t(id);
INSERT 0 1000000
create or replace function get_max(v_sql text) returns int as $$
declare
res int;
begin
execute v_sql into res;
return res;
end;
$$ language plpgsql strict;
postgres=# \set VERBOSITY verbose
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');
ERROR: 0A000: function cannot execute on segment because it accesses relation "public.test" (seg0 slice1 127.0.0.1:25432 pid=1443)
DETAIL:
SQL statement "select max(id) from test"
PL/pgSQL function "get_max" line 4 at EXECUTE statement
LOCATION: cdbdisp_finishCommand, cdbdisp.c:254
7、用元数据欺骗不了GPDB,因为保护不是在元数据层面判断,而是在执行层面。
postgres=# create table tmp_gp_distribution_policy as select * from gp_distribution_policy where localoid='test'::regclass;
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'localoid' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
SELECT 1
postgres=# set allow_system_table_mods=DML;
SET
postgres=# delete from gp_distribution_policy where localoid='test'::regclass;
DELETE 1
依旧的错误
postgres=# \set VERBOSITY verbose
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');
ERROR: 0A000: function cannot execute on segment because it accesses relation "public.test" (seg0 slice1 127.0.0.1:25432 pid=1443)
DETAIL:
SQL statement "select max(id) from test"
PL/pgSQL function "get_max" line 4 at EXECUTE statement
LOCATION: cdbdisp_finishCommand, cdbdisp.c:254
2 直连SEGMENT
《Greenplum segment节点直接读写配置与性能》
《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》
《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》
这个方法是可行的,不过过于麻烦,需要直连。
postgres=# select * from gp_segment_configuration where content<>'-1' and role='p';
dbid | content | role | preferred_role | mode | status | port | hostname | address | replication_port
------+---------+------+----------------+------+--------+-------+-------------------------+-----------+------------------
2 | 0 | p | p | s | u | 25432 | iZbp13nu0s9j3x3op4zpd4Z | localhost |
3 | 1 | p | p | s | u | 25433 | iZbp13nu0s9j3x3op4zpd4Z | localhost |
(2 rows)
PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432 -c "select max(id) from test"
max
---------
1000000
(1 row)
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25433 -c "select max(id) from test"
max
--------
999999
(1 row)
digoal@iZbp13nu0s9j3x3op4zpd4Z-> psql -c "select greatest(1000000,999999)"
greatest
----------
1000000
(1 row)
小结
1、gp_dist_random('gp_id') 的方法,因为内部做了保护,目前只使用与复制表,不适合分布式表。(用户感知)
2、使用直连SEGMENT的方法,可行,但是操作过于繁琐,而且需要用户直连SEGMENT。(用户感知)
3、最好的方法,依旧是聚合接口本身支持prefunc API,内部多阶段并行。(用户无感知)
参考
直连SEGMENT
《Greenplum segment节点直接读写配置与性能》
《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》
《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》
多阶段聚合
《PostgreSQL 11 preview - 多阶段并行聚合array_agg, string_agg》
《PostgreSQL 10 自定义并行计算聚合函数的原理与实践 - (含array_agg合并多个数组为单个一元数组的例子)》
《HybridDB PostgreSQL "Sort、Group、distinct 聚合、JOIN" 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合》
《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》
《Postgres-XC customized aggregate introduction》
《PostgreSQL aggregate function customize》
《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》