大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。
如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。
在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:
https://yq.aliyun.com/articles/166
要支持更新和删除,思路是这样的,加一张前置表,这个前置表的某个字段用来记录字段的最终状态,即到达这个状态后,记录不会被更新或删除。
通过触发器来控制什么记录插入到流中同时从前置表删除,什么记录现暂存在前置表。
下面是例子
本文假设flag=2是最终状态,应用层自己来定义这个FLAG。
pipeline=# create table pret1(id serial primary key, info text, flag smallint);
CREATE TABLE
pipeline=# create stream s0 (like pret1);
CREATE STREAM
pipeline=# create continuous view v0 as select count(*) from s0;
CREATE CONTINUOUS VIEW
flag=2的记录旁路到流,其他记录放到前置表。
pipeline=# create or replace function tg0() returns trigger as
$$
declare
begin
if new.flag=2 then
insert into s0 values (new.*);
return null;
end if;
return new;
end;
$$
language plpgsql strict;
CREATE FUNCTION
pipeline=# create trigger tg0 before insert on pret1 for each row execute procedure tg0();
CREATE TRIGGER
更新后flag=2的记录旁路到流,并删除前置表的对应记录。
pipeline=# create or replace function tg1() returns trigger as
$$
declare
begin
if new.flag=2 then
insert into s0 values (new.*);
delete from pret1 where id=new.id;
return null;
end if;
return new;
end;
$$
language plpgsql strict;
CREATE FUNCTION
pipeline=# create trigger tg1 before update on pret1 for each row execute procedure tg1();
CREATE TRIGGER
测试
pipeline=# insert into pret1(info,flag) values ('test',0);
INSERT 0 1
pipeline=# select * from v0;
count
-------
(0 rows)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
count
-------
(0 rows)
pipeline=# select * from pret1;
id | info | flag
----+------+------
1 | test | 0
2 | test | 1
(2 rows)
pipeline=# update pret1 set flag=2;
UPDATE 0
pipeline=# select * from pret1;
id | info | flag
----+------+------
(0 rows)
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# delete from pret1 ;
DELETE 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# update pret1 set flag =10;
UPDATE 1
pipeline=# select * from v0;
count
-------
2
(1 row)
pipeline=# select * from pret1;
id | info | flag
----+------+------
4 | test | 10
(1 row)
pipeline=# update pret1 set flag =2;
UPDATE 0
pipeline=# select * from pret1;
id | info | flag
----+------+------
(0 rows)
pipeline=# select * from v0;
count
-------
3
(1 row)
详情请参考
http://docs.pipelinedb.com/introduction.html
如果你觉得这还不够爽,PostgreSQL还有kafka插件,可以类lambda的模式从kafka持续读数据,进行流式计算。
PostgreSQL就是个"老流氓",因为任何软件可能都和这只大象有一腿。