PostgreSQL 的小玩具, async Notification as a chat group

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介:
PostgreSQL 提供了一个很有意思的异步消息通信功能。
利用这个功能,可以实现很多有趣的想法,例如离线聊天组,数据逻辑复制,审计日志等。
notify就是往一个指定的通道发消息。
postgres=# \h notify
Command:     NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
listen就是监听一个指定的通道。
postgres=# \h listen
Command:     LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
一个客户端可以往多个通道发消息,也可以监听来自多个通道的消息。
例子:
1. 大伙首先要加到一个聊天组(channel)
session A:
postgres=# listen cnpug;
LISTEN

session B:
postgres=# listen cnpug;
LISTEN
2. 大伙往这个聊天组发消息,自己发的消息会立即收到。
SESSION A:
postgres=# notify cnpug, 'hello, every body.';
NOTIFY
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
3. 大伙可以随时去取聊天组的历史消息,不是自己发的消息,要使用listen去获取。
SESSION B:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
如果很长时间没有接收消息,会有很多堆积的。
SESSION B:
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello1';
NOTIFY
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.

SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
4. 如果有新的小伙伴要加入聊天组,只能看到加入后大家发的消息,以前的消息是看不到的。
postgres=# listen cnpug;
LISTEN
以前发的消息不会显示,即使还在队列里面也不会显示。
5. 退出聊天组
unlisten cnpug;

除此之外,异步消息还可以用于审计,数据复制等场景。
例如tcn插件就是一个类似审计或数据复制的场景。
test=# create table tcndata
test-#   (
test(#     a int not null,
test(#     b date not null,
test(#     c text,
test(#     primary key (a, b)
test(#   );
CREATE TABLE
创建触发器,当发生dml操作时,调用triggered_change_notification函数发出notify.
test=# create trigger tcndata_tcn_trigger
test-#   after insert or update or delete on tcndata
test-#   for each row execute procedure triggered_change_notification();
CREATE TRIGGER
监听tcn通道
test=# listen tcn;
LISTEN
现在你会发现每当执行DML时,我们可以从tcn通道接收到triggered_change_notification函数发出的异步消息。
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-#                            (1, date '2012-12-23', 'another'),
test-#                            (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
使用这个方法,可以用于审计或数据异步复制。

异步消息的代码见:
src/backend/commands/async.c
注意允许的队列长度限制,超出的话会报队列满的错误。
另外需要注意一条消息的长度,和数据块的大小接近。
/*-------------------------------------------------------------------------
 * Async Notification Model as of 9.0:
 *
 * 1. Multiple backends on same machine. Multiple backends listening on
 *        several channels. (Channels are also called "conditions" in other
 *        parts of the code.)
 *
 * 2. There is one central queue in disk-based storage (directory pg_notify/),
 *        with actively-used pages mapped into shared memory by the slru.c module.
 *        All notification messages are placed in the queue and later read out
 *        by listening backends.
 *
 *        There is no central knowledge of which backend listens on which channel;
 *        every backend has its own list of interesting channels.
 *
 *        Although there is only one queue, notifications are treated as being
 *        database-local; this is done by including the sender's database OID
 *        in each notification message.  Listening backends ignore messages
 *        that don't match their database OID.  This is important because it
 *        ensures senders and receivers have the same database encoding and won't
 *        misinterpret non-ASCII text in the channel name or payload string.
 *
 *        Since notifications are not expected to survive database crashes,
 *        we can simply clean out the pg_notify data at any reboot, and there
 *        is no need for WAL support or fsync'ing.
 *
 * 3. Every backend that is listening on at least one channel registers by
 *        entering its PID into the array in AsyncQueueControl. It then scans all
 *        incoming notifications in the central queue and first compares the
 *        database OID of the notification with its own database OID and then
 *        compares the notified channel with the list of channels that it listens
 *        to. In case there is a match it delivers the notification event to its
 *        frontend.  Non-matching events are simply skipped.
 *
 * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
 *        a backend-local list which will not be processed until transaction end.
 *
 *        Duplicate notifications from the same transaction are sent out as one
 *        notification only. This is done to save work when for example a trigger
 *        on a 2 million row table fires a notification for each row that has been
 *        changed. If the application needs to receive every single notification
 *        that has been sent, it can easily add some unique string into the extra
 *        payload parameter.
 *
 *        When the transaction is ready to commit, PreCommit_Notify() adds the
 *        pending notifications to the head of the queue. The head pointer of the
 *        queue always points to the next free position and a position is just a
 *        page number and the offset in that page. This is done before marking the
 *        transaction as committed in clog. If we run into problems writing the
 *        notifications, we can still call elog(ERROR, ...) and the transaction
 *        will roll back.
 *
 *        Once we have put all of the notifications into the queue, we return to
 *        CommitTransaction() which will then do the actual transaction commit.
 *
 *        After commit we are called another time (AtCommit_Notify()). Here we
 *        make the actual updates to the effective listen state (listenChannels).
 *
 *        Finally, after we are out of the transaction altogether, we check if
 *        we need to signal listening backends.  In SignalBackends() we scan the
 *        list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
 *        to every listening backend (we don't know which backend is listening on
 *        which channel so we must signal them all). We can exclude backends that
 *        are already up to date, though.  We don't bother with a self-signal
 *        either, but just process the queue directly.
 *
 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
 *        can call inbound-notify processing immediately if this backend is idle
 *        (ie, it is waiting for a frontend command and is not within a transaction
 *        block).  Otherwise the handler may only set a flag, which will cause the
 *        processing to occur just before we next go idle.
 *
 *        Inbound-notify processing consists of reading all of the notifications
 *        that have arrived since scanning last time. We read every notification
 *        until we reach either a notification from an uncommitted transaction or
 *        the head pointer's position. Then we check if we were the laziest
 *        backend: if our pointer is set to the same position as the global tail
 *        pointer is set, then we move the global tail pointer ahead to where the
 *        second-laziest backend is (in general, we take the MIN of the current
 *        head position and all active backends' new tail pointers). Whenever we
 *        move the global tail pointer we also truncate now-unused pages (i.e.,
 *        delete files in pg_notify/ that are no longer used).
 *
 * An application that listens on the same channel it notifies will get
 * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
 * by comparing be_pid in the NOTIFY message to the application's own backend's
 * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
 * frontend during startup.)  The above design guarantees that notifies from
 * other backends will never be missed by ignoring self-notifies.
 *
 * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
 * can be varied without affecting anything but performance.  The maximum
 * amount of notification data that can be queued at one time is determined
 * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
 *-------------------------------------------------------------------------
 */

/*
 * Maximum size of a NOTIFY payload, including terminating NULL.  This
 * must be kept small enough so that a notification message fits on one
 * SLRU page.  The magic fudge factor here is noncritical as long as it's
 * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
 * than that, so changes in that data structure won't affect user-visible
 * restrictions.
 */
#define NOTIFY_PAYLOAD_MAX_LENGTH       (BLCKSZ - NAMEDATALEN - 128)

/*
 * Struct representing an entry in the global notify queue
 *
 * This struct declaration has the maximal length, but in a real queue entry
 * the data area is only big enough for the actual channel and payload strings
 * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
 * entry size, if both channel and payload strings are empty (but note it
 * doesn't include alignment padding).
 *
 * The "length" field should always be rounded up to the next QUEUEALIGN
 * multiple so that all fields are properly aligned.
 */
typedef struct AsyncQueueEntry
{
        int                     length;                 /* total allocated length of entry */
        Oid                     dboid;                  /* sender's database OID */
        TransactionId xid;                      /* sender's XID */
        int32           srcPid;                 /* sender's PID */
        char            data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;

/*
 * slru.c currently assumes that all filenames are four characters of hex
 * digits. That means that we can use segments 0000 through FFFF.
 * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
 * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
 *
 * It's of course possible to enhance slru.c, but this gives us so much
 * space already that it doesn't seem worth the trouble.
 *
 * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
 * pages, because more than that would confuse slru.c into thinking there
 * was a wraparound condition.  With the default BLCKSZ this means there
 * can be up to 8GB of queued-and-not-read data.
 *
 * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
 * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
 */
#define QUEUE_MAX_PAGE                  (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)

// src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT        32

[参考]
1. http://www.postgresql.org/docs/devel/static/tcn.html
2. http://www.postgresql.org/docs/9.4/static/libpq-notify.html
3. http://www.postgresql.org/docs/9.4/static/libpq-example.html#LIBPQ-EXAMPLE-2
4. contrib/tcn/tcn.c
5. http://www.postgresql.org/docs/9.4/static/sql-notify.html
6. http://www.postgresql.org/docs/9.4/static/sql-listen.html
7. http://www.postgresql.org/docs/9.4/static/sql-unlisten.html
8. http://www.postgresql.org/docs/9.4/static/contrib-dblink-get-notify.html
9. src/backend/commands/async.c
10. src/include/commands/async.h
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
机器学习/深度学习 人工智能 数据管理
文生图的基石CLIP模型的发展综述
CLIP(Contrastive Language-Image Pre-training)是OpenAI在2021年发布的多模态模型,用于学习文本-图像对的匹配。模型由文本和图像编码器组成,通过对比学习使匹配的输入对在向量空间中靠近,非匹配对远离。预训练后,CLIP被广泛应用于各种任务,如零样本分类和语义搜索。后续研究包括ALIGN、K-LITE、OpenCLIP、MetaCLIP和DFN,它们分别在数据规模、知识增强、性能缩放和数据过滤等方面进行了改进和扩展,促进了多模态AI的发展。
2619 0
|
机器学习/深度学习 分布式计算 DataWorks
EasyRec 使用介绍|学习笔记
快速学习 EasyRec 使用介绍。
1969 0
|
SQL 缓存 算法
香,聊聊TiDB的分布式事务模型
香,聊聊TiDB的分布式事务模型
774 0
香,聊聊TiDB的分布式事务模型
|
10月前
|
人工智能 JSON 安全
MCP Server 实践之旅第 1 站:MCP 协议解析与云上适配
本文深入解析了Model Context Protocol(MCP)协议,探讨其在AI领域的应用与技术挑战。MCP作为AI协作的“USB-C接口”,通过标准化数据交互解决大模型潜力释放的关键瓶颈。文章详细分析了MCP的生命周期、传输方式(STDIO与SSE),并提出针对SSE协议不足的优化方案——MCP Proxy,实现从STDIO到SSE的无缝转换。同时,函数计算平台被推荐为MCP Server的理想运行时,因其具备自动弹性扩缩容、高安全性和按需计费等优势。最后,展望了MCP技术演进方向及对AI基础设施普及的推动作用,强调函数计算助力MCP大规模落地,加速行业创新。
2529 77
|
4月前
|
人工智能 移动开发 数据可视化
魔笔 AI Chat Builder:让 AI 对话秒变可交互界面
在 AI 应用高速发展的今天,开发者不仅要懂模型和接口,还要解决交互设计、功能集成、发布运维等“最后一公里”问题。 魔笔 AI Chat Builder 的使命,就是以 低门槛 + 高效率 帮助 开发者与非技术人员 在极短时间内构建、发布并运行专业 AI 应用,让 AI 真正快速落地业务。
魔笔 AI Chat Builder:让 AI 对话秒变可交互界面
|
10月前
|
数据采集 JSON 网络安全
移动端数据抓取:Android App的TLS流量解密方案
本文介绍了一种通过TLS流量解密技术抓取知乎App热榜数据的方法。利用Charles Proxy解密HTTPS流量,分析App与服务器通信内容;结合Python Requests库模拟请求,配置特定请求头以绕过反爬机制。同时使用代理IP隐藏真实IP地址,确保抓取稳定。最终成功提取热榜标题、内容简介、链接等信息,为分析热点话题和用户趋势提供数据支持。此方法也可应用于其他Android App的数据采集,但需注意选择可靠的代理服务。
433 11
移动端数据抓取:Android App的TLS流量解密方案
|
机器学习/深度学习 人工智能 数据可视化
# Python的一个非常cool的库Gradio
# Python的一个非常cool的库Gradio
564 0
|
Python
Python 设置环境变量方法
Python 设置环境变量方法
488 0
|
存储 API
milvus insert api的数据结构源码分析
milvus insert api的数据结构源码分析
1453 6
milvus insert api的数据结构源码分析
|
存储 安全 测试技术
iOS Local Authentication 本地身份认证
iOS Local Authentication 本地身份认证
759 0