PostgreSQL 提供了一个很有意思的异步消息通信功能。
利用这个功能,可以实现很多有趣的想法,例如离线聊天组,数据逻辑复制,审计日志等。
notify就是往一个指定的通道发消息。
postgres=# \h notify
Command: NOTIFY
Description: generate a notification
Syntax:
NOTIFY channel [ , payload ]
listen就是监听一个指定的通道。
postgres=# \h listen
Command: LISTEN
Description: listen for a notification
Syntax:
LISTEN channel
一个客户端可以往多个通道发消息,也可以监听来自多个通道的消息。
例子:
1. 大伙首先要加到一个聊天组(channel)
session A:
postgres=# listen cnpug;
LISTEN
session B:
postgres=# listen cnpug;
LISTEN
2. 大伙往这个聊天组发消息,自己发的消息会立即收到。
SESSION A:
postgres=# notify cnpug, 'hello, every body.';
NOTIFY
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
3. 大伙可以随时去取聊天组的历史消息,不是自己发的消息,要使用listen去获取。
SESSION B:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello, every body." received from server process with PID 45729.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
如果很长时间没有接收消息,会有很多堆积的。
SESSION B:
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello';
NOTIFY
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
postgres=# notify cnpug, 'hello1';
NOTIFY
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
SESSION A:
postgres=# listen cnpug;
LISTEN
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello" received from server process with PID 46882.
Asynchronous notification "cnpug" with payload "hello1" received from server process with PID 46882.
4. 如果有新的小伙伴要加入聊天组,只能看到加入后大家发的消息,以前的消息是看不到的。
postgres=# listen cnpug;
LISTEN
以前发的消息不会显示,即使还在队列里面也不会显示。
5. 退出聊天组
unlisten cnpug;
除此之外,异步消息还可以用于审计,数据复制等场景。
例如tcn插件就是一个类似审计或数据复制的场景。
test=# create table tcndata
test-# (
test(# a int not null,
test(# b date not null,
test(# c text,
test(# primary key (a, b)
test(# );
CREATE TABLE
创建触发器,当发生dml操作时,调用triggered_change_notification函数发出notify.
test=# create trigger tcndata_tcn_trigger
test-# after insert or update or delete on tcndata
test-# for each row execute procedure triggered_change_notification();
CREATE TRIGGER
监听tcn通道
test=# listen tcn;
LISTEN
现在你会发现每当执行DML时,我们可以从tcn通道接收到triggered_change_notification函数发出的异步消息。
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-# (1, date '2012-12-23', 'another'),
test-# (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
使用这个方法,可以用于审计或数据异步复制。
异步消息的代码见:
src/backend/commands/async.c
注意允许的队列长度限制,超出的话会报队列满的错误。
另外需要注意一条消息的长度,和数据块的大小接近。
/*-------------------------------------------------------------------------
* Async Notification Model as of 9.0:
*
* 1. Multiple backends on same machine. Multiple backends listening on
* several channels. (Channels are also called "conditions" in other
* parts of the code.)
*
* 2. There is one central queue in disk-based storage (directory pg_notify/),
* with actively-used pages mapped into shared memory by the slru.c module.
* All notification messages are placed in the queue and later read out
* by listening backends.
*
* There is no central knowledge of which backend listens on which channel;
* every backend has its own list of interesting channels.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
* in each notification message. Listening backends ignore messages
* that don't match their database OID. This is important because it
* ensures senders and receivers have the same database encoding and won't
* misinterpret non-ASCII text in the channel name or payload string.
*
* Since notifications are not expected to survive database crashes,
* we can simply clean out the pg_notify data at any reboot, and there
* is no need for WAL support or fsync'ing.
*
* 3. Every backend that is listening on at least one channel registers by
* entering its PID into the array in AsyncQueueControl. It then scans all
* incoming notifications in the central queue and first compares the
* database OID of the notification with its own database OID and then
* compares the notified channel with the list of channels that it listens
* to. In case there is a match it delivers the notification event to its
* frontend. Non-matching events are simply skipped.
*
* 4. The NOTIFY statement (routine Async_Notify) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
* Duplicate notifications from the same transaction are sent out as one
* notification only. This is done to save work when for example a trigger
* on a 2 million row table fires a notification for each row that has been
* changed. If the application needs to receive every single notification
* that has been sent, it can easily add some unique string into the extra
* payload parameter.
*
* When the transaction is ready to commit, PreCommit_Notify() adds the
* pending notifications to the head of the queue. The head pointer of the
* queue always points to the next free position and a position is just a
* page number and the offset in that page. This is done before marking the
* transaction as committed in clog. If we run into problems writing the
* notifications, we can still call elog(ERROR, ...) and the transaction
* will roll back.
*
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
* make the actual updates to the effective listen state (listenChannels).
*
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that
* are already up to date, though. We don't bother with a self-signal
* either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle
* (ie, it is waiting for a frontend command and is not within a transaction
* block). Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
* the head pointer's position. Then we check if we were the laziest
* backend: if our pointer is set to the same position as the global tail
* pointer is set, then we move the global tail pointer ahead to where the
* second-laziest backend is (in general, we take the MIN of the current
* head position and all active backends' new tail pointers). Whenever we
* move the global tail pointer we also truncate now-unused pages (i.e.,
* delete files in pg_notify/ that are no longer used).
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
* PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies.
*
* The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
*-------------------------------------------------------------------------
*/
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
* must be kept small enough so that a notification message fits on one
* SLRU page. The magic fudge factor here is noncritical as long as it's
* more than AsyncQueueEntryEmptySize --- we make it significantly bigger
* than that, so changes in that data structure won't affect user-visible
* restrictions.
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
/*
* Struct representing an entry in the global notify queue
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
* (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
* The "length" field should always be rounded up to the next QUEUEALIGN
* multiple so that all fields are properly aligned.
*/
typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
/*
* slru.c currently assumes that all filenames are four characters of hex
* digits. That means that we can use segments 0000 through FFFF.
* Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
* the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
*
* It's of course possible to enhance slru.c, but this gives us so much
* space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
// src/include/access/slru.h:#define SLRU_PAGES_PER_SEGMENT 32
[参考]
1. http://www.postgresql.org/docs/devel/static/tcn.html
2. http://www.postgresql.org/docs/9.4/static/libpq-notify.html
3. http://www.postgresql.org/docs/9.4/static/libpq-example.html#LIBPQ-EXAMPLE-2
4. contrib/tcn/tcn.c
5. http://www.postgresql.org/docs/9.4/static/sql-notify.html
6. http://www.postgresql.org/docs/9.4/static/sql-listen.html
7. http://www.postgresql.org/docs/9.4/static/sql-unlisten.html
8. http://www.postgresql.org/docs/9.4/static/contrib-dblink-get-notify.html
9. src/backend/commands/async.c
10. src/include/commands/async.h