PostgreSQL 的小玩具, async Notification as a chat group

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介:
PostgreSQL 提供了一个很有意思的异步消息通信功能。
利用这个功能,可以实现很多有趣的想法,例如离线聊天组,数据逻辑复制,审计日志等。
notify就是往一个指定的通道发消息。
postgres=# \h notify
Command:     NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
listen就是监听一个指定的通道。
postgres=# \h listen
Command:     LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
一个客户端可以往多个通道发消息,也可以监听来自多个通道的消息。
例子:
1. 大伙首先要加到一个聊天组(channel)
session A:
postgres=# listen cnpug;
LISTEN

session B:
postgres=# listen cnpug;
LISTEN
2. 大伙往这个聊天组发消息,自己发的消息会立即收到。
SESSION A:
postgres=# notify cnpug, 'hello, every body.';
NOTIFY
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
3. 大伙可以随时去取聊天组的历史消息,不是自己发的消息,要使用listen去获取。
SESSION B:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
如果很长时间没有接收消息,会有很多堆积的。
SESSION B:
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello1';
NOTIFY
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
4. 如果有新的小伙伴要加入聊天组,只能看到加入后大家发的消息,以前的消息是看不到的。
postgres=# listen cnpug;
LISTEN
以前发的消息不会显示,即使还在队列里面也不会显示。
5. 退出聊天组
unlisten cnpug;

除此之外,异步消息还可以用于审计,数据复制等场景。
例如tcn插件就是一个类似审计或数据复制的场景。
test=# create table tcndata
test-#   (
test(#     a int not null,
test(#     b date not null,
test(#     c text,
test(#     primary key (a, b)
test(#   );
CREATE TABLE
创建触发器,当发生dml操作时,调用triggered_change_notification函数发出notify.
test=# create trigger tcndata_tcn_trigger
test-#   after insert or update or delete on tcndata
test-#   for each row execute procedure triggered_change_notification();
CREATE TRIGGER
监听tcn通道
test=# listen tcn;
LISTEN
现在你会发现每当执行DML时,我们可以从tcn通道接收到triggered_change_notification函数发出的异步消息。
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-#                            (1, date '2012-12-23', 'another'),
test-#                            (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
使用这个方法,可以用于审计或数据异步复制。

异步消息的代码见:
src/backend/commands/async.c
注意允许的队列长度限制,超出的话会报队列满的错误。
另外需要注意一条消息的长度,和数据块的大小接近。
/*-------------------------------------------------------------------------
 * Async Notification Model as of 9.0:
 *
 * 1. Multiple backends on same machine. Multiple backends listening on
 *        several channels. (Channels are also called "conditions" in other
 *        parts of the code.)
 *
 * 2. There is one central queue in disk-based storage (directory pg_notify/),
 *        with actively-used pages mapped into shared memory by the slru.c module.
 *        All notification messages are placed in the queue and later read out
 *        by listening backends.
 *
 *        There is no central knowledge of which backend listens on which channel;
 *        every backend has its own list of interesting channels.
 *
 *        Although there is only one queue, notifications are treated as being
 *        database-local; this is done by including the sender's database OID
 *        in each notification message.  Listening backends ignore messages
 *        that don't match their database OID.  This is important because it
 *        ensures senders and receivers have the same database encoding and won't
 *        misinterpret non-ASCII text in the channel name or payload string.
 *
 *        Since notifications are not expected to survive database crashes,
 *        we can simply clean out the pg_notify data at any reboot, and there
 *        is no need for WAL support or fsync'ing.
 *
 * 3. Every backend that is listening on at least one channel registers by
 *        entering its PID into the array in AsyncQueueControl. It then scans all
 *        incoming notifications in the central queue and first compares the
 *        database OID of the notification with its own database OID and then
 *        compares the notified channel with the list of channels that it listens
 *        to. In case there is a match it delivers the notification event to its
 *        frontend.  Non-matching events are simply skipped.
 *
 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
 *        a backend-local list which will not be processed until transaction end.
 *
 *        Duplicate notifications from the same transaction are sent out as one
 *        notification only. This is done to save work when for example a trigger
 *        on a 2 million row table fires a notification for each row that has been
 *        changed. If the application needs to receive every single notification
 *        that has been sent, it can easily add some unique string into the extra
 *        payload parameter.
 *
 *        When the transaction is ready to commit, PreCommit_Notify() adds the
 *        pending notifications to the head of the queue. The head pointer of the
 *        queue always points to the next free position and a position is just a
 *        page number and the offset in that page. This is done before marking the
 *        transaction as committed in clog. If we run into problems writing the
 *        notifications, we can still call elog(ERROR, ...) and the transaction
 *        will roll back.
 *
 *        Once we have put all of the notifications into the queue, we return to
 *        CommitTransaction() which will then do the actual transaction commit.
 *
 *        After commit we are called another time (AtCommit_Notify()). Here we
 *        make the actual updates to the effective listen state (listenChannels).
 *
 *        Finally, after we are out of the transaction altogether, we check if
 *        we need to signal listening backends.  In SignalBackends() we scan the
 *        list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
 *        to every listening backend (we don't know which backend is listening on
 *        which channel so we must signal them all). We can exclude backends that
 *        are already up to date, though.  We don't bother with a self-signal
 *        either, but just process the queue directly.
 *
 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
 *        can call inbound-notify processing immediately if this backend is idle
 *        (ie, it is waiting for a frontend command and is not within a transaction
 *        block).  Otherwise the handler may only set a flag, which will cause the
 *        processing to occur just before we next go idle.
 *
 *        Inbound-notify processing consists of reading all of the notifications
 *        that have arrived since scanning last time. We read every notification
 *        until we reach either a notification from an uncommitted transaction or
 *        the head pointer's position. Then we check if we were the laziest
 *        backend: if our pointer is set to the same position as the global tail
 *        pointer is set, then we move the global tail pointer ahead to where the
 *        second-laziest backend is (in general, we take the MIN of the current
 *        head position and all active backends' new tail pointers). Whenever we
 *        move the global tail pointer we also truncate now-unused pages (i.e.,
 *        delete files in pg_notify/ that are no longer used).
 *
 * An application that listens on the same channel it notifies will get
 * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
 * by comparing be_pid in the NOTIFY message to the application's own backend's
 * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
 * frontend during startup.)  The above design guarantees that notifies from
 * other backends will never be missed by ignoring self-notifies.
 *
 * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
 * can be varied without affecting anything but performance.  The maximum
 * amount of notification data that can be queued at one time is determined
 * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
 *-------------------------------------------------------------------------
 */

/*
 * Maximum size of a NOTIFY payload, including terminating NULL.  This
 * must be kept small enough so that a notification message fits on one
 * SLRU page.  The magic fudge factor here is noncritical as long as it's
 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
 * than that, so changes in that data structure won't affect user-visible
 * restrictions.
 */
#define NOTIFY_PAYLOAD_MAX_LENGTH       (BLCKSZ - NAMEDATALEN - 128)

/*
 * Struct representing an entry in the global notify queue
 *
 * This struct declaration has the maximal length, but in a real queue entry
 * the data area is only big enough for the actual channel and payload strings
 * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
 * entry size, if both channel and payload strings are empty (but note it
 * doesn't include alignment padding).
 *
 * The "length" field should always be rounded up to the next QUEUEALIGN
 * multiple so that all fields are properly aligned.
 */
typedef struct AsyncQueueEntry
{
        int                     length;                 /* total allocated length of entry */
        Oid                     dboid;                  /* sender's database OID */
        TransactionId xid;                      /* sender's XID */
        int32           srcPid;                 /* sender's PID */
        char            data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;

/*
 * slru.c currently assumes that all filenames are four characters of hex
 * digits. That means that we can use segments 0000 through FFFF.
 * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
 * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
 *
 * It's of course possible to enhance slru.c, but this gives us so much
 * space already that it doesn't seem worth the trouble.
 *
 * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
 * pages, because more than that would confuse slru.c into thinking there
 * was a wraparound condition.  With the default BLCKSZ this means there
 * can be up to 8GB of queued-and-not-read data.
 *
 * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
 * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
 */
#define QUEUE_MAX_PAGE                  (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

// src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT        32

[参考]
1. http://www.postgresql.org/docs/devel/static/tcn.html
2. http://www.postgresql.org/docs/9.4/static/libpq-notify.html
3. http://www.postgresql.org/docs/9.4/static/libpq-example.html#LIBPQ-EXAMPLE-2
4. contrib/tcn/tcn.c
5. http://www.postgresql.org/docs/9.4/static/sql-notify.html
6. http://www.postgresql.org/docs/9.4/static/sql-listen.html
7. http://www.postgresql.org/docs/9.4/static/sql-unlisten.html
8. http://www.postgresql.org/docs/9.4/static/contrib-dblink-get-notify.html
9. src/backend/commands/async.c
10. src/include/commands/async.h
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
5月前
|
Java 容器
【Azure 媒体服务】记使用 Media Service 的官网示例代码 Audio Analyzer 出现卡顿在 Creating event processor host .. 直到 Timeout 问题
【Azure 媒体服务】记使用 Media Service 的官网示例代码 Audio Analyzer 出现卡顿在 Creating event processor host .. 直到 Timeout 问题
|
5月前
|
JSON 数据格式 Python
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?
|
关系型数据库 Java 数据库连接
PostgreSQL 14中连接参数target_session_attrs增强
PostgreSQL 14中连接参数target_session_attrs增强
142 0
|
存储 SQL 关系型数据库
FAQ系列 | index extensions特性介绍
FAQ系列 | index extensions特性介绍
|
关系型数据库 数据库 PostgreSQL
postgresql :ERROR: role “user001“ cannot be dropped because some objects depend on it
postgresql :ERROR: role “user001“ cannot be dropped because some objects depend on it
922 0
rocket:The broker does not support consumer to filter message by SQL92
rocket:The broker does not support consumer to filter message by SQL92
297 0