PostgreSQL 的小玩具, async Notification as a chat group

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介:
PostgreSQL 提供了一个很有意思的异步消息通信功能。
利用这个功能,可以实现很多有趣的想法,例如离线聊天组,数据逻辑复制,审计日志等。
notify就是往一个指定的通道发消息。
postgres=# \h notify
Command:     NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
listen就是监听一个指定的通道。
postgres=# \h listen
Command:     LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
一个客户端可以往多个通道发消息,也可以监听来自多个通道的消息。
例子:
1. 大伙首先要加到一个聊天组(channel)
session A:
postgres=# listen cnpug;
LISTEN

session B:
postgres=# listen cnpug;
LISTEN
2. 大伙往这个聊天组发消息,自己发的消息会立即收到。
SESSION A:
postgres=# notify cnpug, 'hello, every body.';
NOTIFY
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
3. 大伙可以随时去取聊天组的历史消息,不是自己发的消息,要使用listen去获取。
SESSION B:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
如果很长时间没有接收消息,会有很多堆积的。
SESSION B:
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello1';
NOTIFY
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
4. 如果有新的小伙伴要加入聊天组,只能看到加入后大家发的消息,以前的消息是看不到的。
postgres=# listen cnpug;
LISTEN
以前发的消息不会显示,即使还在队列里面也不会显示。
5. 退出聊天组
unlisten cnpug;

除此之外,异步消息还可以用于审计,数据复制等场景。
例如tcn插件就是一个类似审计或数据复制的场景。
test=# create table tcndata
test-#   (
test(#     a int not null,
test(#     b date not null,
test(#     c text,
test(#     primary key (a, b)
test(#   );
CREATE TABLE
创建触发器,当发生dml操作时,调用triggered_change_notification函数发出notify.
test=# create trigger tcndata_tcn_trigger
test-#   after insert or update or delete on tcndata
test-#   for each row execute procedure triggered_change_notification();
CREATE TRIGGER
监听tcn通道
test=# listen tcn;
LISTEN
现在你会发现每当执行DML时,我们可以从tcn通道接收到triggered_change_notification函数发出的异步消息。
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-#                            (1, date '2012-12-23', 'another'),
test-#                            (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
使用这个方法,可以用于审计或数据异步复制。

异步消息的代码见:
src/backend/commands/async.c
注意允许的队列长度限制,超出的话会报队列满的错误。
另外需要注意一条消息的长度,和数据块的大小接近。
/*-------------------------------------------------------------------------
 * Async Notification Model as of 9.0:
 *
 * 1. Multiple backends on same machine. Multiple backends listening on
 *        several channels. (Channels are also called "conditions" in other
 *        parts of the code.)
 *
 * 2. There is one central queue in disk-based storage (directory pg_notify/),
 *        with actively-used pages mapped into shared memory by the slru.c module.
 *        All notification messages are placed in the queue and later read out
 *        by listening backends.
 *
 *        There is no central knowledge of which backend listens on which channel;
 *        every backend has its own list of interesting channels.
 *
 *        Although there is only one queue, notifications are treated as being
 *        database-local; this is done by including the sender's database OID
 *        in each notification message.  Listening backends ignore messages
 *        that don't match their database OID.  This is important because it
 *        ensures senders and receivers have the same database encoding and won't
 *        misinterpret non-ASCII text in the channel name or payload string.
 *
 *        Since notifications are not expected to survive database crashes,
 *        we can simply clean out the pg_notify data at any reboot, and there
 *        is no need for WAL support or fsync'ing.
 *
 * 3. Every backend that is listening on at least one channel registers by
 *        entering its PID into the array in AsyncQueueControl. It then scans all
 *        incoming notifications in the central queue and first compares the
 *        database OID of the notification with its own database OID and then
 *        compares the notified channel with the list of channels that it listens
 *        to. In case there is a match it delivers the notification event to its
 *        frontend.  Non-matching events are simply skipped.
 *
 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
 *        a backend-local list which will not be processed until transaction end.
 *
 *        Duplicate notifications from the same transaction are sent out as one
 *        notification only. This is done to save work when for example a trigger
 *        on a 2 million row table fires a notification for each row that has been
 *        changed. If the application needs to receive every single notification
 *        that has been sent, it can easily add some unique string into the extra
 *        payload parameter.
 *
 *        When the transaction is ready to commit, PreCommit_Notify() adds the
 *        pending notifications to the head of the queue. The head pointer of the
 *        queue always points to the next free position and a position is just a
 *        page number and the offset in that page. This is done before marking the
 *        transaction as committed in clog. If we run into problems writing the
 *        notifications, we can still call elog(ERROR, ...) and the transaction
 *        will roll back.
 *
 *        Once we have put all of the notifications into the queue, we return to
 *        CommitTransaction() which will then do the actual transaction commit.
 *
 *        After commit we are called another time (AtCommit_Notify()). Here we
 *        make the actual updates to the effective listen state (listenChannels).
 *
 *        Finally, after we are out of the transaction altogether, we check if
 *        we need to signal listening backends.  In SignalBackends() we scan the
 *        list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
 *        to every listening backend (we don't know which backend is listening on
 *        which channel so we must signal them all). We can exclude backends that
 *        are already up to date, though.  We don't bother with a self-signal
 *        either, but just process the queue directly.
 *
 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
 *        can call inbound-notify processing immediately if this backend is idle
 *        (ie, it is waiting for a frontend command and is not within a transaction
 *        block).  Otherwise the handler may only set a flag, which will cause the
 *        processing to occur just before we next go idle.
 *
 *        Inbound-notify processing consists of reading all of the notifications
 *        that have arrived since scanning last time. We read every notification
 *        until we reach either a notification from an uncommitted transaction or
 *        the head pointer's position. Then we check if we were the laziest
 *        backend: if our pointer is set to the same position as the global tail
 *        pointer is set, then we move the global tail pointer ahead to where the
 *        second-laziest backend is (in general, we take the MIN of the current
 *        head position and all active backends' new tail pointers). Whenever we
 *        move the global tail pointer we also truncate now-unused pages (i.e.,
 *        delete files in pg_notify/ that are no longer used).
 *
 * An application that listens on the same channel it notifies will get
 * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
 * by comparing be_pid in the NOTIFY message to the application's own backend's
 * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
 * frontend during startup.)  The above design guarantees that notifies from
 * other backends will never be missed by ignoring self-notifies.
 *
 * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
 * can be varied without affecting anything but performance.  The maximum
 * amount of notification data that can be queued at one time is determined
 * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
 *-------------------------------------------------------------------------
 */

/*
 * Maximum size of a NOTIFY payload, including terminating NULL.  This
 * must be kept small enough so that a notification message fits on one
 * SLRU page.  The magic fudge factor here is noncritical as long as it's
 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
 * than that, so changes in that data structure won't affect user-visible
 * restrictions.
 */
#define NOTIFY_PAYLOAD_MAX_LENGTH       (BLCKSZ - NAMEDATALEN - 128)

/*
 * Struct representing an entry in the global notify queue
 *
 * This struct declaration has the maximal length, but in a real queue entry
 * the data area is only big enough for the actual channel and payload strings
 * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
 * entry size, if both channel and payload strings are empty (but note it
 * doesn't include alignment padding).
 *
 * The "length" field should always be rounded up to the next QUEUEALIGN
 * multiple so that all fields are properly aligned.
 */
typedef struct AsyncQueueEntry
{
        int                     length;                 /* total allocated length of entry */
        Oid                     dboid;                  /* sender's database OID */
        TransactionId xid;                      /* sender's XID */
        int32           srcPid;                 /* sender's PID */
        char            data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;

/*
 * slru.c currently assumes that all filenames are four characters of hex
 * digits. That means that we can use segments 0000 through FFFF.
 * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
 * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
 *
 * It's of course possible to enhance slru.c, but this gives us so much
 * space already that it doesn't seem worth the trouble.
 *
 * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
 * pages, because more than that would confuse slru.c into thinking there
 * was a wraparound condition.  With the default BLCKSZ this means there
 * can be up to 8GB of queued-and-not-read data.
 *
 * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
 * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
 */
#define QUEUE_MAX_PAGE                  (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

// src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT        32

[参考]
1. http://www.postgresql.org/docs/devel/static/tcn.html
2. http://www.postgresql.org/docs/9.4/static/libpq-notify.html
3. http://www.postgresql.org/docs/9.4/static/libpq-example.html#LIBPQ-EXAMPLE-2
4. contrib/tcn/tcn.c
5. http://www.postgresql.org/docs/9.4/static/sql-notify.html
6. http://www.postgresql.org/docs/9.4/static/sql-listen.html
7. http://www.postgresql.org/docs/9.4/static/sql-unlisten.html
8. http://www.postgresql.org/docs/9.4/static/contrib-dblink-get-notify.html
9. src/backend/commands/async.c
10. src/include/commands/async.h
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
3天前
|
消息中间件 Java 数据库连接
实时计算 Flink版产品使用合集之将sdkMode从rpc模式改为jdbc模式后,table.exec.mini-batch.enabled参数还生效吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
12 0
|
存储 SQL 算法
【新闻推荐系统】(task2)mysql和pymysql使用
本文属于新闻推荐实战—数据层—构建物料池之MySQL。MySQL数据库在该项目中会用来存储结构化的数据(用户、新闻特征),作为算法工程师需要了解常用的MySQL语法(比如增删改查,排序等),因为在实际的工作经常会用来统计相关数据或者抽取相关特征。
197 0
【新闻推荐系统】(task2)mysql和pymysql使用
this.counter$ = store.select(fromExample.getCounterCounter)之后马上subscribe
this.counter$ = store.select(fromExample.getCounterCounter)之后马上subscribe
65 0
this.counter$ = store.select(fromExample.getCounterCounter)之后马上subscribe
|
关系型数据库 数据库
【DB吐槽大会】第57期 - PG multi-master 支持不友好
大家好,这里是DB吐槽大会,第57期 - PG multi-master 支持不友好
|
弹性计算 安全 关系型数据库
PostgreSQL 12 preview - 可靠性提升 - data_sync_retry 消除os层write back failed status不可靠的问题
标签 PostgreSQL , data_sync_retry , write back , retry , failed status 背景 有些OS系统,对fsync的二次调用不敏感,因为OS层可能有自己的CACHE,如果使用了buffer write,并且出现write back failed的情况,有些OS可能在下次fsync时并不能正确的反馈fsync的可靠性与否。(因为这个B
524 0
|
关系型数据库 MySQL
组复制官方翻译六、Upgrading Group Replication
https://dev.mysql.com/doc/refman/8.0/en/group-replication-upgrade.html 这个章节主要描述升级MGR的计划基本的升级MGR成员的方法基本跟单独的实例升级一样(可参考 Section 2.
1514 0
|
SQL 关系型数据库 MySQL
组复制官方翻译九、Group Replication Technical Details
https://dev.mysql.com/doc/refman/8.0/en/group-replication-technical-details.html 这一章主要描述MGR的更多细节 18.
1723 0
|
监控
组复制官方翻译四、Monitoring Group Replication
https://dev.mysql.com/doc/refman/8.0/en/group-replication-monitoring.html 使用Perfomance Schema来监控MGR MGR主要添加了这两个表 performance_schema.
1602 0
|
IDE AliOS-Things 物联网