标签
PostgreSQL , notify , listen , 异步消息
背景
小时候就梦想有个酷酷的电波表(虽然现在还没有拥有),不过电波表和PostgreSQL有什么关系呢?听我道来。
http://baike.baidu.com/view/1124741.htm
电波表内置高感度小型天线,接收标准电波进行自动对时,因而可以实现时间上的精准。在国际上,德国、英国、美国、日本都已经有标准电波的发送。2007年7月,在中国河南商丘建成的新电波塔已经开始发送电波。
标准电波接收(6局电波接收)原理
中国标准时间以10万年误差1秒的铯原子钟为基准。通过手表内置的高敏感度接收器接收以无线电波传送的标准时间信号,并自动校准手表走时。使手表显示的时间与标准时间同步,精确计时。在建筑物密集的室内也可以接收标准电波信号,显示正确的时间。
标准电波的接收范围是半径约1000~3000km, 深夜(最多6次)自动接收电波信号,不能收到标准电波信号时,自动接收GPS卫星电波获取时间信息并校正时间。中国的电波塔在河南省商丘市。
而用于接收电波信号的防震天线采用的是非晶材质,能够实现高敏感度接收。不但具备对应极端户外环境下使用的牢靠性,还可以确保稳定接收全球6局电波信号。
电波表是一个非常典型的广播应用,类似的还有组播(注意不是主播哦),类似的应用也很多,比如广播电视,电台等。
数据库广播
在数据库中,其实也有类似的应用,比如我前几天在文章中写的数据库到WEB客户端的广播,详见 :
《从微信小程序 到 数据库"小程序" , 鬼知道我经历了什么》
实际上是利用了PostgreSQL数据库的异步消息机制,数据库往消息通道发送数据,应用程序可以监听对应的消息通道,获取数据库发出的数据。
通过异步消息在数据库中实现了一对多的广播效果。
SQL语法参考文档:
1. 向通道发送消息
https://www.postgresql.org/docs/9.6/static/sql-notify.html
2. 监听某通道的消息
https://www.postgresql.org/docs/9.6/static/sql-listen.html
3. 取消监听某通道
https://www.postgresql.org/docs/9.6/static/sql-unlisten.html
4. 数据库函数
查看会话监听了哪些通道,以及当前数据库的异步消息队列使用了多少。
pg_listening_channels() setof text channel names that the session is currently listening on
pg_notification_queue_usage() double fraction of the asynchronous notification queue currently occupied (0-1)
pg_notification_queue_usage用来计算已使用的异步消息页面占比,如果有监听,但是一直不消费,可能导致溢出。
src/backend/commands/async.c
/*
* slru.c currently assumes that all filenames are four characters of hex
* digits. That means that we can use segments 0000 through FFFF.
* Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
* the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
*
* It's of course possible to enhance slru.c, but this gives us so much
* space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT 32
异步消息编程
除了使用SQL来编写异步消息,还可以使用数据库的驱动来编写异步消息
c
参考libpq的异步消息部分
https://www.postgresql.org/docs/9.6/static/libpq-notify.html
PGnotify *PQnotifies(PGconn *conn);
typedef struct pgNotify
{
char *relname; /* notification channel name */
int be_pid; /* process ID of notifying server process */
char *extra; /* notification payload string */
} PGnotify;
文档中有一个例子如下
https://www.postgresql.org/docs/9.6/static/libpq-example.html#LIBPQ-EXAMPLE-2
java
参考文档
https://jdbc.postgresql.org/documentation/head/listennotify.html
应用举例
Broadcasting PostgreSQL NOTIFY messages to WebSocket Clients
The system works like this:
Client subscribes to a WebSocket topic...
NOTIFY event on database server ->
PGNotificationListener on web server ->
Send Websocket notification on server ->
Receive Websocket event on browser.
With the code below, if you call NOTIFY dml_events, 'some message'; in Postgres, it will be broadcast to all WebSocket clients.
Follow this answer regarding proper listener setup
URL:
http://blog.databasepatterns.com/2014/04/postgresql-nofify-websocket-spring-mvc.html
http://stackoverflow.com/questions/37916489/listen-notify-pgconnection-goes-down-java
The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected.
Check out the BasicContext class lines 642 - 655:
---
public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {
name = nullToEmpty(name);
channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";
Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);
NotificationKey key = new NotificationKey(name, channelNameFilterPattern);
synchronized (notificationListeners) {
notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
}
}
---
If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710
---
@Override
public synchronized void reportNotification(int processId, String channelName, String payload) {
Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();
NotificationListener listener = entry.getValue().get();
if (listener == null) {
iter.remove();
}
else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {
listener.notification(processId, channelName, payload);
}
}
}
---
To fix this, add your notification listeners as such:
---
/// Do not let this reference go out of scope!
PGNotificationListener listener = new PGNotificationListener() {
@Override
public void notification(int processId, String channelName, String payload) {
// interesting code
};
pgConnection.addNotificationListener(listener);
---
Quite an odd use-case for weak references in my opinion...
代码:
https://bitbucket.org/neilmcg/postgresql-websocket-example
其他编程语言的驱动,大多数是基于libpq的,不再举例。