从电波表到数据库小程序之 - 数据库异步广播(notify/listen)

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
简介: 标签 PostgreSQL , notify , listen , 异步消息 背景 小时候就梦想有个酷酷的电波表(虽然现在还没有拥有),不过电波表和PostgreSQL有什么关系呢?听我道来。

标签

PostgreSQL , notify , listen , 异步消息


背景

小时候就梦想有个酷酷的电波表(虽然现在还没有拥有),不过电波表和PostgreSQL有什么关系呢?听我道来。

pic

http://baike.baidu.com/view/1124741.htm

电波表内置高感度小型天线,接收标准电波进行自动对时,因而可以实现时间上的精准。在国际上,德国、英国、美国、日本都已经有标准电波的发送。2007年7月,在中国河南商丘建成的新电波塔已经开始发送电波。

标准电波接收(6局电波接收)原理

中国标准时间以10万年误差1秒的铯原子钟为基准。通过手表内置的高敏感度接收器接收以无线电波传送的标准时间信号,并自动校准手表走时。使手表显示的时间与标准时间同步,精确计时。在建筑物密集的室内也可以接收标准电波信号,显示正确的时间。

标准电波的接收范围是半径约1000~3000km, 深夜(最多6次)自动接收电波信号,不能收到标准电波信号时,自动接收GPS卫星电波获取时间信息并校正时间。中国的电波塔在河南省商丘市。

而用于接收电波信号的防震天线采用的是非晶材质,能够实现高敏感度接收。不但具备对应极端户外环境下使用的牢靠性,还可以确保稳定接收全球6局电波信号。

电波表是一个非常典型的广播应用,类似的还有组播(注意不是主播哦),类似的应用也很多,比如广播电视,电台等。

数据库广播

在数据库中,其实也有类似的应用,比如我前几天在文章中写的数据库到WEB客户端的广播,详见 :

《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》

实际上是利用了PostgreSQL数据库的异步消息机制,数据库往消息通道发送数据,应用程序可以监听对应的消息通道,获取数据库发出的数据。

通过异步消息在数据库中实现了一对多的广播效果。

SQL语法参考文档:

1. 向通道发送消息

https://www.postgresql.org/docs/9.6/static/sql-notify.html

2. 监听某通道的消息

https://www.postgresql.org/docs/9.6/static/sql-listen.html

3. 取消监听某通道

https://www.postgresql.org/docs/9.6/static/sql-unlisten.html

4. 数据库函数

查看会话监听了哪些通道,以及当前数据库的异步消息队列使用了多少。

pg_listening_channels()         setof text  channel names that the session is currently listening on  
pg_notification_queue_usage()   double          fraction of the asynchronous notification queue currently occupied (0-1)  

pg_notification_queue_usage用来计算已使用的异步消息页面占比,如果有监听,但是一直不消费,可能导致溢出。

src/backend/commands/async.c  

/*  
 * slru.c currently assumes that all filenames are four characters of hex  
 * digits. That means that we can use segments 0000 through FFFF.  
 * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us  
 * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.  
 *  
 * It's of course possible to enhance slru.c, but this gives us so much  
 * space already that it doesn't seem worth the trouble.  
 *  
 * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2  
 * pages, because more than that would confuse slru.c into thinking there  
 * was a wraparound condition.  With the default BLCKSZ this means there  
 * can be up to 8GB of queued-and-not-read data.  
 *  
 * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of  
 * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.  
 */  
#define QUEUE_MAX_PAGE                  (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)  


src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT        32  

异步消息编程

除了使用SQL来编写异步消息,还可以使用数据库的驱动来编写异步消息

c

参考libpq的异步消息部分

https://www.postgresql.org/docs/9.6/static/libpq-notify.html

PGnotify *PQnotifies(PGconn *conn);  

typedef struct pgNotify  
{  
    char *relname;              /* notification channel name */  
    int  be_pid;                /* process ID of notifying server process */  
    char *extra;                /* notification payload string */  
} PGnotify;  

文档中有一个例子如下

https://www.postgresql.org/docs/9.6/static/libpq-example.html#LIBPQ-EXAMPLE-2

java

参考文档

https://jdbc.postgresql.org/documentation/head/listennotify.html

应用举例

Broadcasting PostgreSQL NOTIFY messages to WebSocket Clients

The system works like this:

Client subscribes to a WebSocket topic...  

NOTIFY event on database server ->  
  PGNotificationListener on web server ->  
      Send Websocket notification on server ->  
         Receive Websocket event on browser.   

With the code below, if you call NOTIFY dml_events, 'some message'; in Postgres, it will be broadcast to all WebSocket clients.

Follow this answer regarding proper listener setup

URL:

http://blog.databasepatterns.com/2014/04/postgresql-nofify-websocket-spring-mvc.html

http://stackoverflow.com/questions/37916489/listen-notify-pgconnection-goes-down-java

The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected.   
Check out the BasicContext class lines 642 - 655:  

---  
public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {  

    name = nullToEmpty(name);  
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";  

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);  

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);  

    synchronized (notificationListeners) {  
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));  
    }  

}  
---  

If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710  

---  
  @Override  
  public synchronized void reportNotification(int processId, String channelName, String payload) {  

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();  
    while (iter.hasNext()) {  

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();  

      NotificationListener listener = entry.getValue().get();  
      if (listener == null) {  

        iter.remove();  
      }  
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {  

        listener.notification(processId, channelName, payload);  
      }  

    }  

}  
---  

To fix this, add your notification listeners as such:  

---  
/// Do not let this reference go out of scope!  
PGNotificationListener listener = new PGNotificationListener() {  

@Override  
public void notification(int processId, String channelName, String payload) {  
    // interesting code  
};  
pgConnection.addNotificationListener(listener);  
---  

Quite an odd use-case for weak references in my opinion...  

代码:

https://bitbucket.org/neilmcg/postgresql-websocket-example

其他编程语言的驱动,大多数是基于libpq的,不再举例。

参考

《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
19天前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
3月前
|
小程序 数据库
【微信小程序-原生开发】实用教程10 - 动态的新增、修改、删除(含微信云数据库的新增、修改、删除,表单弹窗、确认弹窗、日期选择器、单行输入框、多行输入框、滑动组件的使用)
【微信小程序-原生开发】实用教程10 - 动态的新增、修改、删除(含微信云数据库的新增、修改、删除,表单弹窗、确认弹窗、日期选择器、单行输入框、多行输入框、滑动组件的使用)
68 0
|
4月前
|
存储 前端开发 小程序
表白墙完善(数据库,前端,后端Servlet),再谈Cookie和Session。以及一个关于Cookie的练习小程序
表白墙完善(数据库,前端,后端Servlet),再谈Cookie和Session。以及一个关于Cookie的练习小程序
|
2月前
|
存储 小程序 关系型数据库
原生小程序 获取手机号并进行存储到mysql数据库
原生小程序 获取手机号并进行存储到mysql数据库
|
4月前
|
JavaScript Java 测试技术
基于ssm+vue.js+uniapp小程序的数据库课程在线教学附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的数据库课程在线教学附带文章和源代码部署视频讲解等
39 4
|
3月前
|
前端开发 小程序 API
【微信小程序】使用 Promise、async 和 await 将异步API 改写为同步
【微信小程序】使用 Promise、async 和 await 将异步API 改写为同步
49 0
|
3月前
|
小程序 JavaScript 安全
【微信小程序-原生开发】转发给好友/群,分享到朋友圈(含单页模式访问云开发数据库的方法)
【微信小程序-原生开发】转发给好友/群,分享到朋友圈(含单页模式访问云开发数据库的方法)
105 0
|
3月前
|
小程序 数据库
【微信小程序-原生开发】实用教程15 - 列表的排序、搜索(含云数据库常用查询条件的使用方法,t-search 组件的使用)
【微信小程序-原生开发】实用教程15 - 列表的排序、搜索(含云数据库常用查询条件的使用方法,t-search 组件的使用)
69 0
|
3月前
|
存储 小程序 数据库
【微信小程序-原生开发】实用教程08 - 开通微信云开发,操作云数据库新增数据(含修改数据权限),初始化云服务(含获取微信云环境 id),获取云数据,滚动公告栏
【微信小程序-原生开发】实用教程08 - 开通微信云开发,操作云数据库新增数据(含修改数据权限),初始化云服务(含获取微信云环境 id),获取云数据,滚动公告栏
52 0
|
4月前
|
Java 测试技术 数据安全/隐私保护
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
32 0
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
下一篇
无影云桌面