用PostgreSQL支持含有更新,删除,插入的实时流式计算

大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。
如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。
在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:
https://yq.aliyun.com/articles/166

要支持更新和删除,思路是这样的,加一张前置表,这个前置表的某个字段用来记录字段的最终状态,即到达这个状态后,记录不会被更新或删除。
通过触发器来控制什么记录插入到流中同时从前置表删除,什么记录现暂存在前置表。
下面是例子
本文假设flag=2是最终状态,应用层自己来定义这个FLAG。

pipeline=# create table pret1(id serial primary key, info text, flag smallint);
CREATE TABLE

pipeline=# create stream s0 (like pret1);
CREATE STREAM

pipeline=# create continuous view v0 as select count(*) from s0;
CREATE CONTINUOUS VIEW

flag=2的记录旁路到流,其他记录放到前置表。

pipeline=# create or replace function tg0() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*);
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg0 before insert on pret1 for each row execute procedure tg0();
CREATE TRIGGER

更新后flag=2的记录旁路到流,并删除前置表的对应记录。

pipeline=# create or replace function tg1() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*); 
     delete from pret1 where id=new.id; 
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg1 before update on pret1 for each row execute procedure tg1();
CREATE TRIGGER

测试

pipeline=# insert into pret1(info,flag) values ('test',0);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  1 | test |    0
  2 | test |    1
(2 rows)

pipeline=# update pret1 set flag=2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# delete from pret1 ;
DELETE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# update pret1 set flag =10;
UPDATE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  4 | test |   10
(1 row)

pipeline=# update pret1 set flag =2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     3
(1 row)

详情请参考
http://docs.pipelinedb.com/introduction.html

如果你觉得这还不够爽,PostgreSQL还有kafka插件,可以类lambda的模式从kafka持续读数据,进行流式计算。
PostgreSQL就是个"老流氓",因为任何软件可能都和这只大象有一腿。

上一篇:检查网络是否连接


下一篇:PostgreSQL 动态更新 C 语言函数