背景
场景:
- 重要数据在写入、更新、删除时实时告警或转存
- 流式数据(公务车电子围栏、刑侦数据探针、股票数据规则探针、服务器运行情况) 实时预警或事件触发
- 危险操作(DDL) 异步监控
规则+ 异步消息的优势:
1、通过规则过滤掉不需要写入的正常数据, 由于业务正常数据通常占比在99%以上, 从而大幅减轻写入量.
2、传统的利用定时器查询所有数据去发现问题, 还需要在时间、VAL、SID等层面去建立索引, 消耗大量存储, 同时索引增加写入RT,性能下降. 规则+异步完全规避这个问题.
3、可以实时发现并预警或触发其他动作
语法
postgres=# \h create rule
Command: CREATE RULE
Description: define a new rewrite rule
Syntax:
CREATE [ OR REPLACE ] RULE name AS ON event
TO table_name [ WHERE condition ]
DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }
where event can be one of:
SELECT | INSERT | UPDATE | DELETE
URL: https://www.postgresql.org/docs/14/sql-createrule.html
postgres=# \h listen
Command: LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
URL: https://www.postgresql.org/docs/14/sql-listen.html
postgres=# \h notify
Command: NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
URL: https://www.postgresql.org/docs/14/sql-notify.html
postgres=# \df *.*channel*
List of functions
Schema | Name | Result data type | Argument data types | Type
------------+-----------------------+------------------+---------------------+------
pg_catalog | pg_listening_channels | SETOF text | | func
(1 row)
postgres=# \df *.*notify*
List of functions
Schema | Name | Result data type | Argument data types | Type
------------+-----------+------------------+---------------------+------
pg_catalog | pg_notify | void | text, text | func
(1 row)
例子
机房传感器
create table tbl_sensor_log (
id serial8 primary key,
sid int,
val jsonb,
crt_time timestamp
);
定义规则, 发现异常数据向alert通道发送消息
create or replace rule r1 as on insert
to tbl_sensor_log
where coalesce(val['temp']::float4,0) >= 60
or coalesce(val['cpu_perct']::float4,0) >= 80
or coalesce(val['mem_perct']::float4,0) >= 80
or coalesce(val['io_perct']::float4,0) >= 80
do also
select pg_notify('alert', format('sensor: %s, ts:%s, val:%s', NEW.sid, NEW.crt_time, NEW.val));
定义规则(可选), 正常数据不写入
create or replace rule r2 as on insert
to tbl_sensor_log
where not (coalesce(val['temp']::float4,0) >= 60
or coalesce(val['cpu_perct']::float4,0) >= 80
or coalesce(val['mem_perct']::float4,0) >= 80
or coalesce(val['io_perct']::float4,0) >= 80)
do instead NOTHING;
postgres=# \d+ tbl_sensor_log;
Table "public.tbl_sensor_log"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
----------+-----------------------------+-----------+----------+--------------------------------------------+----------+-------------+--------------+-------------
id | bigint | | not null | nextval('tbl_sensor_log_id_seq'::regclass) | plain | | |
sid | integer | | | | plain | | |
val | jsonb | | | | extended | pglz | |
crt_time | timestamp without time zone | | | | plain | | |
Indexes:
"tbl_sensor_log_pkey" PRIMARY KEY, btree (id)
Rules:
r1 AS
ON INSERT TO tbl_sensor_log
WHERE COALESCE(new.val['temp'::text]::real, 0::real) >= 60::double precision OR COALESCE(new.val['cpu_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['mem_perct'::text]::real, 0::real) >= 80::double precision OR COALESCE(new.val['io_perct'::text]::real, 0::real) >= 80::double precision DO SELECT pg_notify('alert'::text, format('sensor: %s, val:%s'::text, new.sid, new.val)) AS pg_notify
Access method: heap
压测
CREATE TYPE sensor_js AS (temp float4, cpu_perct float4, mem_perct float4, io_perct float4);
insert into tbl_sensor_log (sid,val,crt_time)
values (
1,
row_to_json(row(1,80.1,2,99.11)::sensor_js)::jsonb,
now()
);
vi test.sql
\set sid random(1,1000000)
\set v1 random(1,61)
\set v2 random(1,81)
\set v3 random(1,81)
\set v4 random(1,81)
insert into tbl_sensor_log (sid,val,crt_time)
values (:sid, row_to_json(row(:v1,:v2,:v3,:v4)::sensor_js)::jsonb,now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 5 -j 5 -T 120
开启其他会话, 监听alert这个通道的异步消息.
PG 的异步消息为广播模式. 可以在多个会话监听同一个通道, 如果有多个业务希望接收同一类异步消息, 则可以这么做.
listen alter;
Asynchronous notification "alert" with payload "sensor: 459294, val:{"temp": 32, "io_perct": 81, "cpu_perct": 76, "mem_perct": 39}" received from server process with PID 1715.
Asynchronous notification "alert" with payload "sensor: 788337, val:{"temp": 60, "io_perct": 34, "cpu_perct": 12, "mem_perct": 53}" received from server process with PID 1714.
Asynchronous notification "alert" with payload "sensor: 421071, val:{"temp": 7, "io_perct": 81, "cpu_perct": 12, "mem_perct": 14}" received from server process with PID 1716.
Asynchronous notification "alert" with payload "sensor: 523366, val:{"temp": 13, "io_perct": 45, "cpu_perct": 70, "mem_perct": 80}" received from server process with PID 1713.
Asynchronous notification "alert" with payload "sensor: 94909, val:{"temp": 57, "io_perct": 1, "cpu_perct": 32, "mem_perct": 81}" received from server process with PID 1713.
Asynchronous notification "alert" with payload "sensor: 13910, val:{"temp": 61, "io_perct": 39, "cpu_perct": 39, "mem_perct": 2}" received from server process with PID 1714.
Asynchronous notification "alert" with payload "sensor: 252342, val:{"temp": 7, "io_perct": 31, "cpu_perct": 80, "mem_perct": 13}" received from server process with PID 1714.
Asynchronous notification "alert" with payload "sensor: 222983, val:{"temp": 56, "io_perct": 76, "cpu_perct": 80, "mem_perct": 25}" received from server process with PID 1715.
Asynchronous notification "alert" with payload "sensor: 913661, val:{"temp": 60, "io_perct": 23, "cpu_perct": 80, "mem_perct": 9}" received from server process with PID 1716.
压测数据分析:
1、在不开启rule时, 写入速度比开启rule快, 因为rule里面有CPU运算. 增加了RT.
但是这是纯计算, 没有IO, 内存等开销. 总体效率绝对比定时器后查询快很多.
progress: 1.0 s, 63373.9 tps, lat 0.078 ms stddev 0.066
progress: 2.0 s, 67591.2 tps, lat 0.074 ms stddev 0.044
progress: 3.0 s, 66330.3 tps, lat 0.075 ms stddev 0.039
progress: 4.0 s, 65786.8 tps, lat 0.076 ms stddev 0.038
progress: 5.0 s, 65436.3 tps, lat 0.076 ms stddev 0.043
progress: 6.0 s, 64276.1 tps, lat 0.077 ms stddev 0.042
progress: 7.0 s, 59162.6 tps, lat 0.084 ms stddev 0.045
progress: 8.0 s, 53887.5 tps, lat 0.092 ms stddev 0.048
progress: 1.0 s, 43413.8 tps, lat 0.114 ms stddev 0.084
progress: 2.0 s, 42803.5 tps, lat 0.116 ms stddev 0.040
progress: 3.0 s, 40092.0 tps, lat 0.124 ms stddev 0.176
progress: 4.0 s, 41419.0 tps, lat 0.120 ms stddev 0.046
progress: 5.0 s, 41637.6 tps, lat 0.120 ms stddev 0.040
progress: 6.0 s, 41918.2 tps, lat 0.119 ms stddev 0.040
progress: 7.0 s, 41753.3 tps, lat 0.119 ms stddev 0.038
progress: 8.0 s, 35983.6 tps, lat 0.139 ms stddev 0.042
在mac book pro上数据轻松破百万
postgres=# select count(*) from tbl_sensor_log;
count
---------
2624221
(1 row)
其他异步消息应用
202103/20210311_03.md 《Postgres Notify for Real Time Dashboards》
201807/20180716_01.md 《PostgreSQL 异步消息(LISTEN/NOTIFY)缓存多大?》
201807/20180713_03.md 《PostgreSQL 流式处理应用实践- 二手商品实时归类(异步消息notify/listen、阅后即焚)》
201711/20171111_01.md 《PostgreSQL 异步消息实践- Feed系统实时监测与响应(如电商主动服务) - 分钟级到毫秒级的实现》
201710/20171018_03.md 《[未完待续] PGQ 异步消息队列的使用》
201709/20170925_02.md 《PostgreSQL 事件触发器应用- DDL审计记录+ 异步通知(notify)》
201701/20170116_01.md 《从电波表到数据库小程序之- 数据库异步广播(notify/listen)》
201111/20111122_01.md 《PostgreSQL Notify/Listen Like ESB》
201701/20170113_03.md 《从微信小程序到数据库"小程序" , 鬼知道我经历了什么》