PgSQL · 功能分析 · Listen/Notify 功能

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS AI 助手,专业版
简介: Listen 和 Notify 是PG很有意思的一个功能,可以用来进行多应用间的通信。它们可以在SQL中使用,也可以用C、JDBC里面的API调用。下面介绍一下其使用方法和内核实现。 使用方法 用一个简单的例子,来看一下Listen/Notify如何使用。假设我们有两个应用A和B,部署在不同的机

Listen 和 Notify 是PG很有意思的一个功能,可以用来进行多应用间的通信。它们可以在SQL中使用,也可以用C、JDBC里面的API调用。下面介绍一下其使用方法和内核实现。

使用方法

用一个简单的例子,来看一下Listen/Notify如何使用。假设我们有两个应用A和B,部署在不同的机器上:A机器处理前端用户请求,同时需要将一些可以异步执行的任务,分配给后台服务器B。B接收到任务并处理完成后反馈给A结果。

--分别在A和B两台机器上初始化PG连接,它们互相监听对方消息,A负责派发任务给B

--机器A初始化与PG的连接
session A:
listen workA;
commit;

--机器B初始化与PG的连接
session B:
listen workerB;
commit;

--A派发任务给B
session A:
begin;
notify workerB, 'do job 1001';
commit;

--B接受消息
session B:
begin;
Asynchronous notification "worker1" with payload "1001" received from server process with PID 29826.
commit;

--B解析消息(可用脚本或应用实现),然后完成任务,发送反馈给A
session B:
....
begin;
notify workA 'job 1001 done';
commit;

利用上面的步骤,A和B两个机器通过PG完成了通信。在上面的过程中,需要注意的是:

  1. B要想接受到消息,必须在A Notify之前运行了Listen命令;
  2. A需要使用事务commit操作来触发消息发送;
  3. 消息是异步发送到B的,即无论B的状态如何,消息都会先到达PG的消息队列(每个PG实例只有一个唯一的存放所有消息的队列);B要查看消息,如果使用的是psql客户端,则需要先发送带有事务操作的命令(如begin、commit或rollback)给PG;
  4. A 如果连续发送多个消息,B会一次性收到这些消息;
  5. 在C代码里面,你可以使用如下的调用来获取所有已到达的消息,如果没有消息到达,则进入睡眠状态。
    while (1)
    {
        sock = PQsocket(conn);

        /* Monitor socket. Sleep if there is no message */
        select(sock + 1, &input_mask, NULL, NULL, NULL) 

        /* Now check for input */
        PQconsumeInput(conn);

        /* Loop until all notifications currently received have been handled */
        while ((notify = PQnotifies(conn)) != NULL)
        {
            /* Received some message, print it out */
            fprintf(stderr,
                    "ASYNC NOTIFY of '%s' received from backend PID %d\n",
                    notify->relname, notify->be_pid);
            PQfreemem(notify);
         }
    }

内核实现

Listen/Notify的实现其实比较简单。主要的数据结构是一个消息队列(asyncQueueControl->tailasyncQueueControl->head分别指向队列尾和队列头)和一个进程状态数组(asyncQueueControl->backend),如下图所示:

消息通知机制

消息队列里面存放了所有进程的所有通知消息,而状态数组存放了所有执行了Listen命令、准备接收异步消息的进程的状态信息。状态数组中含有每个进程已经读取到的消息在队列里面的位置指针。如果有了新消息,进程就从此指针往后取,直到读取全部消息。

当一个连接的后台进程接收到Listen命令时,先将Listen的信息记录下来,然后在事务提交时,执行Listen操作,即把本进程放入状态数组(参见Exec_ListenPreCommit函数)。

执行Notify命令时,Async_Notify函数负责把通知放入pendingNotifies链表。在事务Commit操作前后,执行下面的逻辑:

  1. 调用PreCommit_Notify函数,将pendingNotifies链表中的消息,放入全局消息队列;
  2. 执行Commit操作;
  3. 利用调用链ProcessCompletedNotifies->SignalBackends->SendProcSignal->kill,向其他所有状态数组中的进程,发出通知信号。

另一方面,每个进程在接收到信号后,利用函数HandleNotifyInterrupt处理信号。如果当前进程处于事务中,则不立即处理消息,等到事务提交完毕,调用prepare_for_client_read读取下一个用户命令时,利用ProcessIncomingNotify处理消息;否则,立即调用ProcessIncomingNotify处理消息。ProcessIncomingNotify最终调用NotifyMyFrontEnd发送消息到客户端:

ProcessIncomingNotify --> asyncQueueReadAllNotifications() --> NotifyMyFrontEnd

注意,客户端收到消息后,并不立即显示出来,而是需要用API进行获取。例如,psql就是在执行下一个命令时(如begin、commit),会顺便把收到的消息显示出来的。

总结

Listen/Notify是一个轻量级的应用间通信机制,有了它,具有访问数据库能力的应用可以轻易的利用PG实现互操作。当然,由于消息队列是存放在内存里面的,在发生实例宕机等问题时,消息将丢失,对可靠性要求高的应用,需要自己进行消息持久化(如利用PG存储消息,进行持久化)。

目录
相关文章
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 内存表可选项 - unlogged table
标签 PostgreSQL , 内存表 , unlogged table 背景 内存表,通常被用于不需要持久化,变更频繁,访问RT低的场景。 目前社区版本PostgreSQL没有内存表的功能,postgrespro提供了两个插件可以实现类似内存表的功能。
4058 0
|
存储 安全 关系型数据库
PostgreSQL物化视图增量更新扩展 -- pg_ivm
PostgreSQL不支持物化视图增量更新,需要定期执行REFRESH MATERIALIZED VIEW命令刷新物化视图。Incremental View Maintenance (IVM)是一种使物化视图保持最新的方法,其中只计算增量更改并将其应用于视图,而不是REFRESH MATERIALIZED VIEW那样从头开始重新计算内容。当只更改视图的一小部分时,IVM可以比重新计算更高效地更新物化视图。
|
6月前
|
消息中间件 SQL 关系型数据库
什么是实时数据同步?纯干货解读!
在数据处理中,数据同步问题常常导致报表不准、决策滞后。本文深入解析实时数据同步的重要性与实现方法,帮助你解决80%的同步难题,提升数据效率与业务响应速度。
什么是实时数据同步?纯干货解读!
|
SQL 关系型数据库 PostgreSQL
|
6月前
|
人工智能 JavaScript 前端开发
用 Go 语言轻松构建 MCP 服务器
本文介绍了使用 Go 语言构建 MCP 服务器的完整过程,涵盖创建服务器实例、注册工具、资源和提示词,以及通过 stdio 和 sse 模式启动服务的方法,帮助开发者快速集成 LLM 应用与外部系统。
|
存储 Oracle 关系型数据库
【实操】单表数据量 200 GB,PostgreSQL 怎么应对??
【实操】单表数据量 200 GB,PostgreSQL 怎么应对??
734 1
|
SQL 关系型数据库 Go
《增强你的PostgreSQL:最佳扩展和插件推荐》
《增强你的PostgreSQL:最佳扩展和插件推荐》
1649 0
|
11月前
|
关系型数据库 Linux 数据库
PostgreSQL 入门指南:安装、配置与基本命令
本文从零开始,详细介绍如何在 Windows、Linux 和 macOS 上安装和配置 PostgreSQL,涵盖30+个实操代码示例。内容包括安装步骤、配置远程访问和用户权限、基础数据库操作命令(如创建表、插入和查询数据),以及常见问题的解决方案。通过学习,你将掌握 PostgreSQL 的基本使用方法,并为后续深入学习打下坚实基础。
12655 1
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
5422 1
|
JavaScript
基于Vue2.X/Vue3.X对Monaco Editor在线代码编辑器进行封装与使用
这篇文章介绍了如何在Vue 2.X和Vue 3.X项目中封装和使用Monaco Editor在线代码编辑器,包括安装所需依赖、创建封装组件、在父组件中调用以及处理Vue 3中可能遇到的问题。
3455 1
基于Vue2.X/Vue3.X对Monaco Editor在线代码编辑器进行封装与使用