【重新发现PostgreSQL之美】- 7 垂帘听政 异步消息-阿里云开发者社区

开发者社区> 传说中的德哥> 正文

【重新发现PostgreSQL之美】- 7 垂帘听政 异步消息

简介: 大家好,这里是重新发现PostgreSQL之美 - 7 垂帘听政 异步消息
+关注继续查看

背景


场景:

  • 重要数据在写入、更新、删除时实时告警或转存
  • 流式数据(公务车电子围栏、刑侦数据探针、股票数据规则探针、服务器运行情况) 实时预警或事件触发
  • 危险操作(DDL) 异步监控

规则+ 异步消息的优势:
1、通过规则过滤掉不需要写入的正常数据, 由于业务正常数据通常占比在99%以上, 从而大幅减轻写入量.
2、传统的利用定时器查询所有数据去发现问题, 还需要在时间、VAL、SID等层面去建立索引, 消耗大量存储, 同时索引增加写入RT,性能下降. 规则+异步完全规避这个问题.
3、可以实时发现并预警或触发其他动作


postgres=# \h create rule  

Command:     CREATE RULE  

Description: define a new rewrite rule  

Syntax:  

CREATE [ OR REPLACE ] RULE name AS ON event  

    TO table_name [ WHERE condition ]  

    DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }  

  

where event can be one of:  

  

    SELECT | INSERT | UPDATE | DELETE  

  

URL: https://www.postgresql.org/docs/14/sql-createrule.html  

  

  

postgres=# \h listen  

Command:     LISTEN 

Description: listen for a notification  

Syntax:  

LISTEN channel  

  

URL: https://www.postgresql.org/docs/14/sql-listen.html  

  

postgres=# \h notify  

Command:     NOTIFY 

Description: generate a notification  

Syntax:  

NOTIFY channel [ , payload ]  

  

URL: https://www.postgresql.org/docs/14/sql-notify.html  

  

postgres=# \df *.*channel*  

                                 List of functions  

   Schema  |         Name          | Result data type | Argument data types | Type   

------------+-----------------------+------------------+---------------------+------  

 pg_catalog | pg_listening_channels | SETOF text       |                     | func  

(1 row)  

  

postgres=# \df *.*notify*  

                           List of functions  

   Schema  |   Name    | Result data type | Argument data types | Type   

------------+-----------+------------------+---------------------+------  

 pg_catalog | pg_notify | void             | text, text          | func  

(1 row)  

例子


机房传感器

create table tbl_sensor_log (  

id serial8 primary key,  

sid int,  

val jsonb,  

crt_time timestamp  

);  

定义规则, 发现异常数据向alert通道发送消息

create or replace rule r1 as on insert  

to tbl_sensor_log  

where coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['cpu_perct']::float4,0) >= 80 

or coalesce(val['mem_perct']::float4,0) >= 80 

or coalesce(val['io_perct']::float4,0) >= 80 

do also  

select pg_notify('alert', format('sensor: %s, ts:%s, val:%s', NEW.sid, NEW.crt_time, NEW.val));   

定义规则(可选), 正常数据不写入

create or replace rule r2 as on insert  

to tbl_sensor_log  

where not (coalesce(val['temp']::float4,0) >= 60  

or coalesce(val['cpu_perct']::float4,0) >= 80 

or coalesce(val['mem_perct']::float4,0) >= 80 

or coalesce(val['io_perct']::float4,0) >= 80) 

do instead NOTHING;   

postgres=# \d+ tbl_sensor_log;  

                                                                 Table "public.tbl_sensor_log" 

  Column |            Type             | Collation | Nullable |                  Default                   | Storage  | Compression | Stats target | Description   

----------+-----------------------------+-----------+----------+--------------------------------------------+----------+-------------+--------------+-------------  

 id      | bigint                     |           | not null | nextval('tbl_sensor_log_id_seq'::regclass) | plain    |             |              |   

 sid     | integer                    |           |          |                                            | plain   |             |              |   

 val     | jsonb                      |           |          |                                            | extended | pglz        |              |   

 crt_time | timestamp without time zone |           |         |                                            | plain    |             |              |   

Indexes:  

    "tbl_sensor_log_pkey" PRIMARY KEY, btree (id)  

Rules:  

    r1 AS 

    ON INSERT TO tbl_sensor_log  

   WHERE COALESCE(new.val['temp'::text]::real, 0::real) >= 60::double precision OR COALESCE(new.val['cpu_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['mem_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['io_perct'::text]::real, 0::real) >= 80::double precision DO  SELECT pg_notify('alert'::text, format('sensor: %s, val:%s'::text, new.sid, new.val)) AS pg_notify  

Access method: heap  

压测

CREATE TYPE sensor_js AS (temp float4, cpu_perct float4, mem_perct float4, io_perct float4);   

  

insert into tbl_sensor_log (sid,val,crt_time)  

values (  

  1,  

 row_to_json(row(1,80.1,2,99.11)::sensor_js)::jsonb,  

  now()  

);  

  

  

vi test.sql  

\set sid random(1,1000000)  

\set v1 random(1,61)  

\set v2 random(1,81)  

\set v3 random(1,81)  

\set v4 random(1,81)  

insert into tbl_sensor_log (sid,val,crt_time)  

values (:sid, row_to_json(row(:v1,:v2,:v3,:v4)::sensor_js)::jsonb,now());  

  

  

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 5 -j 5 -T 120  

开启其他会话, 监听alert这个通道的异步消息.

PG 的异步消息为广播模式. 可以在多个会话监听同一个通道, 如果有多个业务希望接收同一类异步消息, 则可以这么做.

listen alter;  

  

Asynchronous notification "alert" with payload "sensor: 459294, val:{"temp": 32, "io_perct": 81, "cpu_perct": 76, "mem_perct": 39}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 788337, val:{"temp": 60, "io_perct": 34, "cpu_perct": 12, "mem_perct": 53}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 421071, val:{"temp": 7, "io_perct": 81, "cpu_perct": 12, "mem_perct": 14}" received from server process with PID 1716.  

Asynchronous notification "alert" with payload "sensor: 523366, val:{"temp": 13, "io_perct": 45, "cpu_perct": 70, "mem_perct": 80}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 94909, val:{"temp": 57, "io_perct": 1, "cpu_perct": 32, "mem_perct": 81}" received from server process with PID 1713.  

Asynchronous notification "alert" with payload "sensor: 13910, val:{"temp": 61, "io_perct": 39, "cpu_perct": 39, "mem_perct": 2}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 252342, val:{"temp": 7, "io_perct": 31, "cpu_perct": 80, "mem_perct": 13}" received from server process with PID 1714.  

Asynchronous notification "alert" with payload "sensor: 222983, val:{"temp": 56, "io_perct": 76, "cpu_perct": 80, "mem_perct": 25}" received from server process with PID 1715.  

Asynchronous notification "alert" with payload "sensor: 913661, val:{"temp": 60, "io_perct": 23, "cpu_perct": 80, "mem_perct": 9}" received from server process with PID 1716.  

压测数据分析:

1、在不开启rule时, 写入速度比开启rule快, 因为rule里面有CPU运算. 增加了RT.

但是这是纯计算, 没有IO, 内存等开销. 总体效率绝对比定时器后查询快很多.

progress: 1.0 s, 63373.9 tps, lat 0.078 ms stddev 0.066 

progress: 2.0 s, 67591.2 tps, lat 0.074 ms stddev 0.044 

progress: 3.0 s, 66330.3 tps, lat 0.075 ms stddev 0.039 

progress: 4.0 s, 65786.8 tps, lat 0.076 ms stddev 0.038 

progress: 5.0 s, 65436.3 tps, lat 0.076 ms stddev 0.043 

progress: 6.0 s, 64276.1 tps, lat 0.077 ms stddev 0.042 

progress: 7.0 s, 59162.6 tps, lat 0.084 ms stddev 0.045 

progress: 8.0 s, 53887.5 tps, lat 0.092 ms stddev 0.048 

progress: 1.0 s, 43413.8 tps, lat 0.114 ms stddev 0.084 

progress: 2.0 s, 42803.5 tps, lat 0.116 ms stddev 0.040 

progress: 3.0 s, 40092.0 tps, lat 0.124 ms stddev 0.176 

progress: 4.0 s, 41419.0 tps, lat 0.120 ms stddev 0.046 

progress: 5.0 s, 41637.6 tps, lat 0.120 ms stddev 0.040 

progress: 6.0 s, 41918.2 tps, lat 0.119 ms stddev 0.040 

progress: 7.0 s, 41753.3 tps, lat 0.119 ms stddev 0.038 

progress: 8.0 s, 35983.6 tps, lat 0.139 ms stddev 0.042 

在mac book pro上数据轻松破百万

postgres=# select count(*) from tbl_sensor_log;  

  count   

---------  

 2624221 

(1 row)  

其他异步消息

202103/20210311_03.md  Postgres Notify for Real Time Dashboards

201807/20180716_01.md  PostgreSQL 异步消息(LISTEN/NOTIFY)缓存多大?》

201807/20180713_03.md  PostgreSQL 流式二手商品实时归类(异步消息notify/listen后即焚)

201711/20171111_01.md  PostgreSQL 异步消息- Feed统实时监测与响(电商主动服务) - 钟级到毫秒实现

201710/20171018_03.md  [未完待] PGQ 异步消息列的使用》

201709/20170925_02.md  PostgreSQL 事件触- DDL审计记录异步通知(notify)

201701/20170116_01.md  《从波表到数据小程序之数据异步广播(notify/listen)

201111/20111122_01.md  PostgreSQL Notify/Listen Like ESB

201701/20170113_03.md  《从微信小程序数据"小程序" , 鬼知道我经历了什么》

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
RocketMQ与MYSQL事务消息整合
rocketmq事务消息与mysql事物整合
8511 0
java Http消息传递之POST和GET两种方法
/** * 通过Get方法来向服务器传值和获取信息, * 这里举例假设的前提是,链接上服务器,服务器直接发送数据给本地 * * 大体的思路: * 1、首先通过URL地址来获得链接的借口 * 通过接口,来设置链接超时的时间,请求方式,是否可以输入输出数据 * 得到读取服务器内容的...
683 0
【问题】sql数据库报无效的数据证书,需重新安装
事情的经过:   今天打开sql2014数据库,没有成功运行,但是给我弹出一个“无效的数据证书,需要重新安装!”提示。什么情况。为什么,应该是前一天弄vs导致的。因为升级了vs2017,所以把以前的东西做了删除卸载,可能是我有些依赖组件给卸载了。
995 0
企业级数据库最佳选择!阿里云云数据库SQL Server 2019新版本重磅发布 !
2020年3月26日,阿里云云数据库SQL Server 2019版正式上线,并随之发布共享型规格族及云数据库SQL Server专属集群两个产品形态。企业客户的数据库上云有了更多选择。
910 0
重新学习Mysql数据库1:无废话MySQL入门
开始使用 我下面所有的SQL语句是基于MySQL 5.6+运行。 MySQL 为关系型数据库(Relational Database Management System),一个关系型数据库由一个或数个表格组成, 如图所.
851 0
Ubuntu移除mysql后重新安装
Ubuntu移除mysql后重新安装首先删除mysql: sudo apt-get remove mysql-*然后清理残留的数据 dpkg -l |grep ^rc|awk '{print $2}' |sudo xargs dpkg -P 它会跳出一个对话框,你选择yes就好了 然后安装mysq...
1009 0
179
文章
0
问答
来源圈子
更多
德哥:阿里巴巴高级产品专家, 阿里巴巴钻石布道师, 42项数据库专利, 目前任职于数据库架构组, 负责数据库开源,同时也是PostgreSQL 中国社区发起人之一, 负责PostgreSQL数据库在中国的技术落地与推广、人才培养,开设有开源社、德说、重新发现PG之美等专栏。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载