用PostgreSQL支持含有更新,删除,插入的实时流式计算

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
简介: 大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:https://yq.aliyun.com/ar.

大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。
如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。
在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:
https://yq.aliyun.com/articles/166

要支持更新和删除,思路是这样的,加一张前置表,这个前置表的某个字段用来记录字段的最终状态,即到达这个状态后,记录不会被更新或删除。
通过触发器来控制什么记录插入到流中同时从前置表删除,什么记录现暂存在前置表。
下面是例子
本文假设flag=2是最终状态,应用层自己来定义这个FLAG。

pipeline=# create table pret1(id serial primary key, info text, flag smallint);
CREATE TABLE

pipeline=# create stream s0 (like pret1);
CREATE STREAM

pipeline=# create continuous view v0 as select count(*) from s0;
CREATE CONTINUOUS VIEW

flag=2的记录旁路到流,其他记录放到前置表。

pipeline=# create or replace function tg0() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*);
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg0 before insert on pret1 for each row execute procedure tg0();
CREATE TRIGGER

更新后flag=2的记录旁路到流,并删除前置表的对应记录。

pipeline=# create or replace function tg1() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*); 
     delete from pret1 where id=new.id; 
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg1 before update on pret1 for each row execute procedure tg1();
CREATE TRIGGER

测试

pipeline=# insert into pret1(info,flag) values ('test',0);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  1 | test |    0
  2 | test |    1
(2 rows)

pipeline=# update pret1 set flag=2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# delete from pret1 ;
DELETE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# update pret1 set flag =10;
UPDATE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  4 | test |   10
(1 row)

pipeline=# update pret1 set flag =2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     3
(1 row)

详情请参考
http://docs.pipelinedb.com/introduction.html

如果你觉得这还不够爽,PostgreSQL还有kafka插件,可以类lambda的模式从kafka持续读数据,进行流式计算。
PostgreSQL就是个"老流氓",因为任何软件可能都和这只大象有一腿。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
SQL 关系型数据库 数据库
PostgreSQL 删除数据库
PostgreSQL 删除数据库
356 0
|
8天前
|
存储 关系型数据库 Serverless
PostgreSQL计算两个点之间的距离
PostgreSQL计算两个点之间的距离
96 60
|
关系型数据库 PostgreSQL
PostgreSQL 计算字符串字符数函数(CHAR_LENGTH(str))和字符串长度函数(LENGTH(str))
PostgreSQL 计算字符串字符数函数(CHAR_LENGTH(str))和字符串长度函数(LENGTH(str))
1836 0
|
5月前
|
SQL 关系型数据库 C语言
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
82 0
|
存储 SQL Oracle
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
AnalyticDB PostgreSQL 7.0 新增了存储过程功能的支持,让用户在使用ADB PG时能够更方便高效地开发业务,并能够更好地兼容Oracle等传统数仓的业务。
474 1
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
|
搜索推荐 关系型数据库 数据库
《阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践》电子版地址
阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践
129 0
《阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践》电子版地址
|
存储 SQL Oracle
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
快速学习10 PostgreSQL 表级复制-物化视图篇,支持异地,异构如 Oracle 到 pg 的物化视图
514 0
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
|
SQL 存储 关系型数据库
RDS For SQL Server删除数据库报错
RDS For SQL Server删除数据库报错
|
关系型数据库 大数据 测试技术
AnalyticDB PostgreSQL 7.0 新能力介绍 : 利用JIT加速计算
AnalyticDB PostgreSQL 7.0 发布, 即时编译(Just-In-Time,JIT)可以将某种形式的解释程序计算转变成原生程序,由CPU原生执行,从而得到加速。
278 0
AnalyticDB PostgreSQL 7.0 新能力介绍 : 利用JIT加速计算
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 删除数据库
PostgreSQL 删除数据库
181 0
下一篇
无影云桌面