Continuous transforms 可以进行数据的转换,数据是不进行存储,主要是可以加入到其他的stream pipeline 中,或者写到其他外部
存储中,和存储过程结合使用,当前默认内置一个pipeline_stream_insert方便数据写入其他strem
注意不支持聚合操作
docker-compose
version: '3.6'
services:
postgres:
image: pipelinedb/pipelinedb
ports:
- "5432:5432"
参考语法
CREATE CONTINUOUS TRANSFORM name AS query [ THEN EXECUTE PROCEDURE function_name ( arguments ) ]
query 查询说明
SELECT expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
where any expression in the SELECT statement can't contain an aggregate and
from_item can be one of:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
Continuous transforms 输出流
Continuous transforms 输出流,可以方便其他transforms或者Continuous view 读取
- 参考
创建 CONTINUOUS TRANSFORM
CREATE CONTINUOUS TRANSFORM t AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
使用
CREATE CONTINUOUS VIEW v AS
SELECT sum(y) FROM output_of('t');
参考例子
- 创建两个stream
CREATE STREAM mystream3 (x integer, y integer);
CREATE STREAM mystream4 (x integer, y integer);
- 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW v4 AS
SELECT x,y FROM mystream3 ;
CREATE CONTINUOUS VIEW v5 AS
SELECT x,y FROM mystream4 ;
- 创建CONTINUOUS TRANSFORM
当insert 到mystream3 中的x为偶数的时候执行插入mystream4
CREATE CONTINUOUS TRANSFORM t3 AS
SELECT x::int, y::int FROM mystream3 WHERE mod(x, 2) = 0
THEN EXECUTE PROCEDURE pipeline_stream_insert('mystream4');
- 数据插入&& 查询结果
x 插入数据为1奇数
insert into mystream3(x,y) values(1,2);
select * from v4;
select * from v5;
x 插入数据为2奇数
insert into mystream3(x,y) values(2,5);
select * from v4;
select * from v5;
- 使用CONTINUOUS TRANSFORM 的output steam
CREATE CONTINUOUS VIEW v6 AS
SELECT x,y FROM output_of('t3') ;
插入数据&&查询
insert into mystream3(x,y) values(4,7);
select * from v6;
参考资料
http://docs.pipelinedb.com/continuous-transforms.html