背景
在大数据分析中,通常会有一些估值的需求,例如估计某个时间段有多少新增用户,估计某个时间段有多少用户。
常用的估值算法如HyperLogLog,还有一些其他的估值算法。
可以参考
http://www.pipelinedb.com/
我在几年前写过如何在PostgreSQL中使用HLL,请参考
https://github.com/aggregateknowledge/postgresql-hll
http://blog.163.com/digoal@126/blog/static/16387704020131264480325/
http://blog.163.com/digoal@126/blog/static/1638770402013127917876/
这里要说的是在Greenplum中的使用。
Greenplum是个分布式数据库系统,特别需要注意的是它的聚合用法与单节点不一样。
我之前写过基于PostgreSQL的另一个分布式系统Postgres-XC的聚合函数的原理和写法,有兴趣的同学可以参考如下
http://blog.163.com/digoal@126/blog/static/16387704020134222140958/
Greenplum聚合函数原理简介
其实Greenplum的聚合函数用法和postgres-xc及其的相似。
语法如下
CREATE [ORDERED] AGGREGATE name (input_data_type [ , ... ])
( SFUNC = sfunc,
STYPE = state_data_type
[, PREFUNC = prefunc]
[, FINALFUNC = ffunc]
[, INITCOND = initial_condition]
[, SORTOP = sort_operator] )
有两种聚合运算模式可选
1. 如果只配置了sfunc,则相关数据全部收集到master节点,在master节点对所有数据依条加上sfunc的结果(第一次可选为initcond)输入给sfunc计算,直到所有数据都跑完sfunc,最后如果设置了finalfunc,则计算并得到最终结果。
2. 如果同时配置了sfunc和prefunc,则在segment节点并行完成sfunc,然后将segment节点执行的结果发给master,在master调用prefunc进行再次聚合,输出结果,如果配置了finalfunc,则这个结果再给finalfunc执行并输出最终结果。
过程如下 :
sfunc( internal-state, next-data-values ) ---> next-internal-state
prefunc( internal-state, internal-state ) ---> next-internal-state
ffunc( internal-state ) ---> aggregate-value
在Greenplum上编译安装hll
git clone https://github.com/aggregateknowledge/postgresql-hll
cd postgresql-hll
export PATH=/home/digoal/gp/bin:$PATH
make clean
make
make install
确认hll.so正确的安装
$ ll /home/digoal/gp/lib/postgresql/hll.so
-rwxr-xr-x 1 digoal digoal 116889 Aug 25 22:25 /home/digoal/gp/lib/postgresql/hll.so
因为postgres-hll是兼容9.0以上的create extension模式的,所以得看看控制文件
cat hll.control
comment = 'type for storing hyperloglog data'
default_version = '2.10.0'
module_pathname = '$libdir/hll'
然后需要修改sql文件
vi hll--2.10.0.sql
# MODULE_PATHNAME替换成$libdir/hll
:%s/MODULE_PATHNAME/\$libdir\/hll/g
在需要使用hll的数据库执行hll--2.10.0.sql
psql -f ./hll--2.10.0.sql
测试
测试的数据类型包括int2, int4, int8, text, bytea.
分成20个组,一共100万数据。
postgres=# create table test(groupid int, id int, info text, si int2, bi int8, f bytea );
CREATE TABLE
postgres=# insert into test select mod(id,20), id, md5(id::text), mod(id, 32760), id*2, md5(id::text)::bytea from generate_series(1,1000000) t(id);
INSERT 0 1000000
postgres=# create table hll_test(groupid int, id hll, info hll, si hll, bi hll, f hll);
CREATE TABLE
按分组聚合后,插入一张结果表
postgres=# insert into hll_test select groupid, hll_add_agg(hll_hash_integer(id)), hll_add_agg(hll_hash_text(info)), hll_add_agg(hll_hash_smallint(si)), hll_add_agg(hll_hash_bigint(bi)), hll_add_agg(hll_hash_bytea(f)) from test group by groupid;
INSERT 0 20
查询每个分组的唯一值以及按阶段的唯一值。
每组,每个维度有多少唯一值
postgres=# select groupid, hll_cardinality(id), hll_cardinality(si), hll_cardinality(bi), hll_cardinality(f), hll_cardinality(info) from hll_test order by 1;
groupid | hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality
---------+------------------+------------------+------------------+------------------+------------------
0 | 48978.0103680088 | 1647.82616880183 | 50061.344798351 | 50291.2509120929 | 50291.2509120929
1 | 51741.8614936294 | 1612.36207917836 | 49284.9020691283 | 49566.9703531116 | 49566.9703531116
2 | 50125.2622181007 | 1656.78898398222 | 48857.7015986003 | 50829.2972953538 | 50829.2972953538
3 | 49432.4073263084 | 1638.90240740621 | 51253.0385002203 | 50091.9306033461 | 50091.9306033461
4 | 48917.5103132368 | 1652.30267331635 | 50832.4721751387 | 50482.4523284526 | 50482.4523284526
5 | 48869.3346787424 | 1686.19202905657 | 48969.8013300078 | 49222.6882488941 | 49222.6882488941
6 | 51705.6275895237 | 1661.28514385613 | 49935.7562032158 | 50130.294716446 | 50130.294716446
7 | 50394.3167326675 | 1679.36914656619 | 51657.2414098721 | 48798.3228462258 | 48798.3228462258
8 | 48718.737519243 | 1668.04794581865 | 49049.1473914624 | 50781.921725461 | 50781.921725461
9 | 50838.4114711808 | 1697.61425609909 | 49083.5959035864 | 51081.0313252514 | 51081.0313252514
10 | 49902.2001005683 | 1627.802109229 | 50436.564830256 | 50081.537109579 | 50081.537109579
11 | 48111.38122336 | 1634.45506581054 | 49123.4624391955 | 49922.9150526361 | 49922.9150526361
12 | 48846.4713840826 | 1599.2197880897 | 49787.1437434036 | 50234.5603529724 | 50234.5603529724
13 | 50941.2225741094 | 1679.36914656619 | 49016.4627290325 | 49298.6342587365 | 49298.6342587365
14 | 49463.2045504603 | 1683.91520836626 | 48084.4839509038 | 48703.0775230689 | 48703.0775230689
15 | 51273.4575447966 | 1652.30267331635 | 50053.6251645345 | 50288.1045937257 | 50288.1045937257
16 | 48350.9891007992 | 1634.45506581054 | 49485.1660515215 | 49953.3728129477 | 49953.3728129477
17 | 49278.5780270676 | 1661.28514385613 | 50607.8043111473 | 49688.123157263 | 49688.123157263
18 | 48704.9059740248 | 1650.06319797018 | 52629.3748697963 | 49426.729576471 | 49426.729576471
19 | 49326.5758972919 | 1634.45506581054 | 52245.5594837649 | 48453.7968119413 | 48453.7968119413
(20 rows)
所有数据范围,每个维度有多少唯一值
postgres=# SELECT hll_cardinality(hll_union_agg(id)), hll_cardinality(hll_union_agg(si)), hll_cardinality(hll_union_agg(bi)), hll_cardinality(hll_union_agg(f)), hll_cardinality(hll_union_agg(info)) FROM hll_test;
hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality
------------------+------------------+------------------+------------------+------------------
981424.027645005 | 32006.8065955451 | 1008610.09767335 | 1027038.36191106 | 1027038.36191106
(1 row)
指定数据范围,每个维度又多少唯一值
postgres=# SELECT hll_cardinality(hll_union_agg(id)), hll_cardinality(hll_union_agg(si)), hll_cardinality(hll_union_agg(bi)), hll_cardinality(hll_union_agg(f)), hll_cardinality(hll_union_agg(info)) FROM hll_test where groupid in (1,3,9);
hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality | hll_cardinality
------------------+------------------+------------------+------------------+------------------
149863.294108911 | 5227.10834581951 | 150151.868022589 | 150497.046827851 | 150497.046827851
(1 row)
对比PostgreSQL结果一致
结果略
debug
postgres=# select hll_print(hll_add(hll_add_agg(hll_hash_bigint(t)),hll_hash_bigint(1))) from generate_series(3,10) g(t);
hll_print
---------------------------------------------------------------------------
EXPLICIT, 9 elements, nregs=2048, nbits=5, expthresh=-1(160), sparseon=1:
0: -5469109305088493887
1: 19144387141682250
2: 489182038263080531
3: 1140754268591781659
4: 1310323436750511730
5: 1960224177162737638
6: 3522142095546486706
7: 4145513480871534457
8: 6574508035858270988
(1 row)
调整精度
调整当前会话精度
返回为老的值
digoal=> select * from hll_set_defaults(15,5,-1,1);
o_log2m | o_regwidth | o_expthresh | o_sparseon
---------+------------+-------------+------------
12 | 5 | -1 | 1
(1 row)
Time: 0.330 ms
调整默认精度
需要重新编译, 在hll.c中 :
// Defaults if type modifier values are not specified.
//
#define DEFAULT_LOG2M 15
#define DEFAULT_REGWIDTH 5
#define DEFAULT_EXPTHRESH -1
#define DEFAULT_SPARSEON 1
static int32 g_default_log2m = DEFAULT_LOG2M;
static int32 g_default_regwidth = DEFAULT_REGWIDTH;
static int64 g_default_expthresh = DEFAULT_EXPTHRESH;
static int32 g_default_sparseon = DEFAULT_SPARSEON;
优化
我们前面讲了GP的聚合分两种使用,一种是全部收到MASTER节点执行,另一种是两阶段聚合。
如果记录数少,其实没有必要使用两阶段聚合。
以hll_union_agg聚合为例
临时结果保存为stype: internal类型.
每次调用hll_union_trans函数,输入一条hll值以及临时结果internal,输出internal。
最后调用hll_pack,将internal转换为hll输出
-- Union aggregate function, returns hll.
--
CREATE AGGREGATE hll_union_agg (hll) (
SFUNC = hll_union_trans,
STYPE = internal,
FINALFUNC = hll_pack
);
-- Union aggregate transition function, first arg internal data
-- structure, second arg is a packed multiset.
--
CREATE FUNCTION hll_union_trans(internal, hll)
RETURNS internal
AS '$libdir/hll'
LANGUAGE C;
-- Converts internal data structure into packed multiset.
--
CREATE FUNCTION hll_pack(internal)
RETURNS hll
AS '$libdir/hll'
LANGUAGE C;
优化方法如下
在节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype
sfunc( internal-state, next-data-values ) ---> next-internal-state
segment第一阶段收集结果传输到master调用prefunc,输入(stype , stype),得到的结果为stype
prefunc( internal-state, internal-state ) ---> next-internal-state
最后再将stype转换为聚合的输出类型即可(可选使用finalfunc)。
hll_union_agg 优化例子
CREATE AGGREGATE gp_hll_union_agg (hll) (
SFUNC = hll_union,
prefunc = hll_union, -- 第二阶段函数
STYPE = hll
);
hll_add_agg 优化例子
# select hll_empty();
hll_empty
--------------
\021\213\177
(1 row)
CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (
SFUNC = hll_add,
STYPE = hll,
prefunc = hll_union, -- 第二阶段函数
initcond='\021\213\177' -- 初始值
);