hll 估值插件 在Greenplum中的使用 以及 分布式聚合函数优化思路

背景

在大数据分析中,通常会有一些估值的需求,例如估计某个时间段有多少新增用户,估计某个时间段有多少用户。

常用的估值算法如HyperLogLog,还有一些其他的估值算法。

可以参考
http://www.pipelinedb.com/

我在几年前写过如何在PostgreSQL中使用HLL,请参考
https://github.com/aggregateknowledge/postgresql-hll
http://blog.163.com/digoal@126/blog/static/16387704020131264480325/
http://blog.163.com/digoal@126/blog/static/1638770402013127917876/

这里要说的是在Greenplum中的使用。

Greenplum是个分布式数据库系统,特别需要注意的是它的聚合用法与单节点不一样。

我之前写过基于PostgreSQL的另一个分布式系统Postgres-XC的聚合函数的原理和写法,有兴趣的同学可以参考如下

http://blog.163.com/digoal@126/blog/static/16387704020134222140958/

Greenplum聚合函数原理简介

其实Greenplum的聚合函数用法和postgres-xc及其的相似。

语法如下

CREATE [ORDERED] AGGREGATE name (input_data_type [ , ... ])  
( SFUNC = sfunc,  
STYPE = state_data_type  
[, PREFUNC = prefunc]  
[, FINALFUNC = ffunc]  
[, INITCOND = initial_condition]  
[, SORTOP = sort_operator] )  

有两种聚合运算模式可选
1. 如果只配置了sfunc,则相关数据全部收集到master节点,在master节点对所有数据依条加上sfunc的结果(第一次可选为initcond)输入给sfunc计算,直到所有数据都跑完sfunc,最后如果设置了finalfunc,则计算并得到最终结果。

2. 如果同时配置了sfunc和prefunc,则在segment节点并行完成sfunc,然后将segment节点执行的结果发给master,在master调用prefunc进行再次聚合,输出结果,如果配置了finalfunc,则这个结果再给finalfunc执行并输出最终结果。

过程如下 :

sfunc( internal-state, next-data-values ) ---> next-internal-state  
prefunc( internal-state, internal-state ) ---> next-internal-state  
ffunc( internal-state ) ---> aggregate-value  

在Greenplum上编译安装hll

git clone https://github.com/aggregateknowledge/postgresql-hll  
cd postgresql-hll  
export PATH=/home/digoal/gp/bin:$PATH  
make clean  
make  
make install  

确认hll.so正确的安装

$ ll /home/digoal/gp/lib/postgresql/hll.so  
-rwxr-xr-x 1 digoal digoal 116889 Aug 25 22:25 /home/digoal/gp/lib/postgresql/hll.so  

因为postgres-hll是兼容9.0以上的create extension模式的,所以得看看控制文件

cat hll.control   

comment = 'type for storing hyperloglog data'  
default_version = '2.10.0'  
module_pathname = '$libdir/hll'  

然后需要修改sql文件

vi hll--2.10.0.sql   

# MODULE_PATHNAME替换成$libdir/hll  

:%s/MODULE_PATHNAME/\$libdir\/hll/g  

在需要使用hll的数据库执行hll--2.10.0.sql

psql -f ./hll--2.10.0.sql  

测试

测试的数据类型包括int2, int4, int8, text, bytea.
分成20个组,一共100万数据。

postgres=# create table test(groupid int, id int, info text, si int2, bi int8, f bytea );  
CREATE TABLE  
postgres=# insert into test select mod(id,20), id, md5(id::text), mod(id, 32760), id*2, md5(id::text)::bytea from generate_series(1,1000000) t(id);  
INSERT 0 1000000  
postgres=# create table hll_test(groupid int, id hll, info hll, si hll, bi hll, f hll);  
CREATE TABLE  

按分组聚合后,插入一张结果表

postgres=# insert into hll_test select groupid, hll_add_agg(hll_hash_integer(id)), hll_add_agg(hll_hash_text(info)), hll_add_agg(hll_hash_smallint(si)), hll_add_agg(hll_hash_bigint(bi)), hll_add_agg(hll_hash_bytea(f)) from test group by groupid;  
INSERT 0 20  

查询每个分组的唯一值以及按阶段的唯一值。

每组,每个维度有多少唯一值

postgres=# select groupid, hll_cardinality(id), hll_cardinality(si), hll_cardinality(bi), hll_cardinality(f), hll_cardinality(info) from hll_test order by 1;  
 groupid | hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality    
---------+------------------+------------------+------------------+------------------+------------------  
       0 | 48978.0103680088 | 1647.82616880183 |  50061.344798351 | 50291.2509120929 | 50291.2509120929  
       1 | 51741.8614936294 | 1612.36207917836 | 49284.9020691283 | 49566.9703531116 | 49566.9703531116  
       2 | 50125.2622181007 | 1656.78898398222 | 48857.7015986003 | 50829.2972953538 | 50829.2972953538  
       3 | 49432.4073263084 | 1638.90240740621 | 51253.0385002203 | 50091.9306033461 | 50091.9306033461  
       4 | 48917.5103132368 | 1652.30267331635 | 50832.4721751387 | 50482.4523284526 | 50482.4523284526  
       5 | 48869.3346787424 | 1686.19202905657 | 48969.8013300078 | 49222.6882488941 | 49222.6882488941  
       6 | 51705.6275895237 | 1661.28514385613 | 49935.7562032158 |  50130.294716446 |  50130.294716446  
       7 | 50394.3167326675 | 1679.36914656619 | 51657.2414098721 | 48798.3228462258 | 48798.3228462258  
       8 |  48718.737519243 | 1668.04794581865 | 49049.1473914624 |  50781.921725461 |  50781.921725461  
       9 | 50838.4114711808 | 1697.61425609909 | 49083.5959035864 | 51081.0313252514 | 51081.0313252514  
      10 | 49902.2001005683 |   1627.802109229 |  50436.564830256 |  50081.537109579 |  50081.537109579  
      11 |   48111.38122336 | 1634.45506581054 | 49123.4624391955 | 49922.9150526361 | 49922.9150526361  
      12 | 48846.4713840826 |  1599.2197880897 | 49787.1437434036 | 50234.5603529724 | 50234.5603529724  
      13 | 50941.2225741094 | 1679.36914656619 | 49016.4627290325 | 49298.6342587365 | 49298.6342587365  
      14 | 49463.2045504603 | 1683.91520836626 | 48084.4839509038 | 48703.0775230689 | 48703.0775230689  
      15 | 51273.4575447966 | 1652.30267331635 | 50053.6251645345 | 50288.1045937257 | 50288.1045937257  
      16 | 48350.9891007992 | 1634.45506581054 | 49485.1660515215 | 49953.3728129477 | 49953.3728129477  
      17 | 49278.5780270676 | 1661.28514385613 | 50607.8043111473 |  49688.123157263 |  49688.123157263  
      18 | 48704.9059740248 | 1650.06319797018 | 52629.3748697963 |  49426.729576471 |  49426.729576471  
      19 | 49326.5758972919 | 1634.45506581054 | 52245.5594837649 | 48453.7968119413 | 48453.7968119413  
(20 rows)  

所有数据范围,每个维度有多少唯一值

postgres=# SELECT hll_cardinality(hll_union_agg(id)), hll_cardinality(hll_union_agg(si)), hll_cardinality(hll_union_agg(bi)), hll_cardinality(hll_union_agg(f)), hll_cardinality(hll_union_agg(info)) FROM hll_test;  
 hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality    
------------------+------------------+------------------+------------------+------------------  
 981424.027645005 | 32006.8065955451 | 1008610.09767335 | 1027038.36191106 | 1027038.36191106  
(1 row)  

指定数据范围,每个维度又多少唯一值

postgres=# SELECT hll_cardinality(hll_union_agg(id)), hll_cardinality(hll_union_agg(si)), hll_cardinality(hll_union_agg(bi)), hll_cardinality(hll_union_agg(f)), hll_cardinality(hll_union_agg(info)) FROM hll_test where groupid in (1,3,9);  
 hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality  | hll_cardinality    
------------------+------------------+------------------+------------------+------------------  
 149863.294108911 | 5227.10834581951 | 150151.868022589 | 150497.046827851 | 150497.046827851  
(1 row)  

对比PostgreSQL结果一致

结果略

debug

postgres=# select hll_print(hll_add(hll_add_agg(hll_hash_bigint(t)),hll_hash_bigint(1))) from generate_series(3,10) g(t);  
                                 hll_print                                   
---------------------------------------------------------------------------  
 EXPLICIT, 9 elements, nregs=2048, nbits=5, expthresh=-1(160), sparseon=1:   
 0: -5469109305088493887                                                     
 1:    19144387141682250                                                     
 2:   489182038263080531                                                     
 3:  1140754268591781659                                                     
 4:  1310323436750511730                                                     
 5:  1960224177162737638                                                     
 6:  3522142095546486706                                                     
 7:  4145513480871534457                                                     
 8:  6574508035858270988   
(1 row)  

调整精度

调整当前会话精度

返回为老的值

digoal=> select * from hll_set_defaults(15,5,-1,1);  
 o_log2m | o_regwidth | o_expthresh | o_sparseon   
---------+------------+-------------+------------  
      12 |          5 |          -1 |          1  
(1 row)  
Time: 0.330 ms  

调整默认精度
需要重新编译, 在hll.c中 :

// Defaults if type modifier values are not specified.  
//  
#define DEFAULT_LOG2M           15        
#define DEFAULT_REGWIDTH        5  
#define DEFAULT_EXPTHRESH       -1  
#define DEFAULT_SPARSEON        1  

static int32 g_default_log2m = DEFAULT_LOG2M;  
static int32 g_default_regwidth = DEFAULT_REGWIDTH;  
static int64 g_default_expthresh = DEFAULT_EXPTHRESH;  
static int32 g_default_sparseon = DEFAULT_SPARSEON;  

优化

我们前面讲了GP的聚合分两种使用,一种是全部收到MASTER节点执行,另一种是两阶段聚合。
如果记录数少,其实没有必要使用两阶段聚合。

以hll_union_agg聚合为例

临时结果保存为stype: internal类型.

每次调用hll_union_trans函数,输入一条hll值以及临时结果internal,输出internal。

最后调用hll_pack,将internal转换为hll输出

-- Union aggregate function, returns hll.  
--  
CREATE AGGREGATE hll_union_agg (hll) (  
       SFUNC = hll_union_trans,  
       STYPE = internal,  
       FINALFUNC = hll_pack  
);  

-- Union aggregate transition function, first arg internal data  
-- structure, second arg is a packed multiset.  
--  
CREATE FUNCTION hll_union_trans(internal, hll)  
     RETURNS internal  
     AS '$libdir/hll'  
     LANGUAGE C;  

-- Converts internal data structure into packed multiset.  
--  
CREATE FUNCTION hll_pack(internal)  
     RETURNS hll  
     AS '$libdir/hll'  
     LANGUAGE C;  

优化方法如下

在节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype

sfunc( internal-state, next-data-values ) ---> next-internal-state  

segment第一阶段收集结果传输到master调用prefunc,输入(stype , stype),得到的结果为stype

prefunc( internal-state, internal-state ) ---> next-internal-state  

最后再将stype转换为聚合的输出类型即可(可选使用finalfunc)。

hll_union_agg 优化例子

CREATE AGGREGATE gp_hll_union_agg (hll) ( 
  SFUNC = hll_union, 
  prefunc = hll_union, -- 第二阶段函数
  STYPE = hll 
); 

hll_add_agg 优化例子

# select hll_empty();
  hll_empty   
--------------
 \021\213\177
(1 row)

CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (
  SFUNC = hll_add, 
  STYPE = hll, 
  prefunc = hll_union, -- 第二阶段函数
  initcond='\021\213\177'  -- 初始值
); 
上一篇:阿里云服务器优惠购买教程:教你优惠购买阿里云服务器


下一篇:Storm概念学习系列之storm流程图