【重新发现PostgreSQL之美】- 7 垂帘听政 异步消息

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 大家好,这里是重新发现PostgreSQL之美 - 7 垂帘听政 异步消息

背景


场景:

  • 重要数据在写入、更新、实时告警或转
  • 流式数据(务车电围栏、刑数据探、股票数据规则、服器运行情况) 实时预警或事件触发
  • 操作(DDL) 异步

规则+ 异步消息的优势:
1
、通过规则过滤掉不需要写入的正常数据, 由于业务正常数据通常占比在99%以上, 从而大幅减轻写入量.
2
、传统的利用定时器查询所有数据去发现问题, 还需要在时间、VALSID等层面去建立索引, 消耗大量存储, 同时索引增加写入RT,性能下降. 规则+异步完全规避这个问题.
3
、可以实时发现并预警或触发其他动作


postgres=# \h create rule  

Command:     CREATE RULE  

Description: define a new rewrite rule  

Syntax:  

CREATE [ OR REPLACE ] RULE name AS ON event  

   TO table_name [ WHERE condition ]  

   DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }  

 

where event can be one of:  

 

   SELECT | INSERT | UPDATE | DELETE  

 

URL: https://www.postgresql.org/docs/14/sql-createrule.html  

 

 

postgres=# \h listen  

Command:     LISTEN

Description: listen for a notification  

Syntax:  

LISTEN channel  

 

URL: https://www.postgresql.org/docs/14/sql-listen.html  

 

postgres=# \h notify  

Command:     NOTIFY

Description: generate a notification  

Syntax:  

NOTIFY channel [ , payload ]  

 

URL: https://www.postgresql.org/docs/14/sql-notify.html  

 

postgres=# \df *.*channel*  

                                List of functions  

  Schema  |         Name          | Result data type | Argument data types | Type  

------------+-----------------------+------------------+---------------------+------  

pg_catalog | pg_listening_channels | SETOF text       |                     | func  

(1 row)  

 

postgres=# \df *.*notify*  

                          List of functions  

  Schema  |   Name    | Result data type | Argument data types | Type  

------------+-----------+------------------+---------------------+------  

pg_catalog | pg_notify | void             | text, text          | func  

(1 row)  

例子


机房传感器

create table tbl_sensor_log (  

id serial8 primary key,  

sid int,  

val jsonb,  

crt_time timestamp  

);  

定义规则, 发现异常数据向alert通道发送消息

create or replace rule r1 as on insert  

to tbl_sensor_log  

where coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['cpu_perct']::float4,0) >= 80

or coalesce(val['mem_perct']::float4,0) >= 80

or coalesce(val['io_perct']::float4,0) >= 80

do also  

select pg_notify('alert', format('sensor: %s, ts:%s, val:%s', NEW.sid, NEW.crt_time, NEW.val));  

定义规则(可选), 正常数据不写入

create or replace rule r2 as on insert  

to tbl_sensor_log  

where not (coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['cpu_perct']::float4,0) >= 80

or coalesce(val['mem_perct']::float4,0) >= 80

or coalesce(val['io_perct']::float4,0) >= 80)

do instead NOTHING;  

postgres=# \d+ tbl_sensor_log;  

                                                                Table "public.tbl_sensor_log"

 Column |            Type             | Collation | Nullable |                  Default                   | Storage  | Compression | Stats target | Description  

----------+-----------------------------+-----------+----------+--------------------------------------------+----------+-------------+--------------+-------------  

id      | bigint                     |           | not null | nextval('tbl_sensor_log_id_seq'::regclass) | plain    |             |              |  

sid     | integer                    |           |          |                                            | plain   |             |              |  

val     | jsonb                      |           |          |                                            | extended | pglz        |              |  

crt_time | timestamp without time zone |           |         |                                            | plain    |             |              |  

Indexes:  

   "tbl_sensor_log_pkey" PRIMARY KEY, btree (id)  

Rules:  

   r1 AS

   ON INSERT TO tbl_sensor_log  

  WHERE COALESCE(new.val['temp'::text]::real, 0::real) >= 60::double precision OR COALESCE(new.val['cpu_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['mem_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['io_perct'::text]::real, 0::real) >= 80::double precision DO  SELECT pg_notify('alert'::text, format('sensor: %s, val:%s'::text, new.sid, new.val)) AS pg_notify  

Access method: heap  

压测

CREATE TYPE sensor_js AS (temp float4, cpu_perct float4, mem_perct float4, io_perct float4);  

 

insert into tbl_sensor_log (sid,val,crt_time)  

values (  

 1,  

row_to_json(row(1,80.1,2,99.11)::sensor_js)::jsonb,  

 now()  

);  

 

 

vi test.sql  

\set sid random(1,1000000)  

\set v1 random(1,61)  

\set v2 random(1,81)  

\set v3 random(1,81)  

\set v4 random(1,81)  

insert into tbl_sensor_log (sid,val,crt_time)  

values (:sid, row_to_json(row(:v1,:v2,:v3,:v4)::sensor_js)::jsonb,now());  

 

 

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 5 -j 5 -T 120  

开启其他会话, 监听alert这个通道的异步消息.

PG 的异步消息为广播模式. 可以在多个会话监听同一个通道, 如果有多个业务希望接收同一类异步消息, 则可以这么做.

listen alter;  

 

Asynchronous notification "alert" with payload "sensor: 459294, val:{"temp": 32, "io_perct": 81, "cpu_perct": 76, "mem_perct": 39}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 788337, val:{"temp": 60, "io_perct": 34, "cpu_perct": 12, "mem_perct": 53}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 421071, val:{"temp": 7, "io_perct": 81, "cpu_perct": 12, "mem_perct": 14}" received from server process with PID 1716.  

Asynchronous notification "alert" with payload "sensor: 523366, val:{"temp": 13, "io_perct": 45, "cpu_perct": 70, "mem_perct": 80}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 94909, val:{"temp": 57, "io_perct": 1, "cpu_perct": 32, "mem_perct": 81}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 13910, val:{"temp": 61, "io_perct": 39, "cpu_perct": 39, "mem_perct": 2}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 252342, val:{"temp": 7, "io_perct": 31, "cpu_perct": 80, "mem_perct": 13}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 222983, val:{"temp": 56, "io_perct": 76, "cpu_perct": 80, "mem_perct": 25}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 913661, val:{"temp": 60, "io_perct": 23, "cpu_perct": 80, "mem_perct": 9}" received from server process with PID 1716.  

压测数据分析:

1、在不开启rule, 写入速度比开启rule, 因为rule里面有CPU运算. 增加了RT.

但是这是纯计算, 没有IO, 内存等开销. 总体效率绝对比定时器后查询快很多.

progress: 1.0 s, 63373.9 tps, lat 0.078 ms stddev 0.066

progress: 2.0 s, 67591.2 tps, lat 0.074 ms stddev 0.044

progress: 3.0 s, 66330.3 tps, lat 0.075 ms stddev 0.039

progress: 4.0 s, 65786.8 tps, lat 0.076 ms stddev 0.038

progress: 5.0 s, 65436.3 tps, lat 0.076 ms stddev 0.043

progress: 6.0 s, 64276.1 tps, lat 0.077 ms stddev 0.042

progress: 7.0 s, 59162.6 tps, lat 0.084 ms stddev 0.045

progress: 8.0 s, 53887.5 tps, lat 0.092 ms stddev 0.048

progress: 1.0 s, 43413.8 tps, lat 0.114 ms stddev 0.084

progress: 2.0 s, 42803.5 tps, lat 0.116 ms stddev 0.040

progress: 3.0 s, 40092.0 tps, lat 0.124 ms stddev 0.176

progress: 4.0 s, 41419.0 tps, lat 0.120 ms stddev 0.046

progress: 5.0 s, 41637.6 tps, lat 0.120 ms stddev 0.040

progress: 6.0 s, 41918.2 tps, lat 0.119 ms stddev 0.040

progress: 7.0 s, 41753.3 tps, lat 0.119 ms stddev 0.038

progress: 8.0 s, 35983.6 tps, lat 0.139 ms stddev 0.042

mac book pro上数据轻松破百万

postgres=# select count(*) from tbl_sensor_log;  

 count  

---------  

2624221

(1 row)  

其他异步消息

202103/20210311_03.md  Postgres Notify for Real Time Dashboards

201807/20180716_01.md  PostgreSQL 异步消息(LISTEN/NOTIFY)缓存多大?》

201807/20180713_03.md  PostgreSQL 流式- 二手商品实时归类(异步消息notify/listen后即焚)

201711/20171111_01.md  PostgreSQL 异步消息- Feed统实时监测与响(电商主动服务) - 钟级到毫秒实现

201710/20171018_03.md  [未完待] PGQ 异步消息列的使用》

201709/20170925_02.md  PostgreSQL 事件触- DDL审计记录+ 异步通知(notify)

201701/20170116_01.md  《从波表到数据小程序之- 数据异步广播(notify/listen)

201111/20111122_01.md  PostgreSQL Notify/Listen Like ESB

201701/20170113_03.md  《从微信小程序数据"小程序" , 鬼知道我经历了什么》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
缓存 关系型数据库 数据库
|
18天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
44 3
|
18天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
47 3
|
18天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE 'log_%';`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
62 2
|
1月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
210 15
|
25天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
1月前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。
|
1月前
|
SQL 关系型数据库 MySQL
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。