为什么叫你是我的眼,因为你看到的世界,就是我的世界。
PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。
pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。
Allow pg_dump to share a snapshot taken by another session using --snapshot (Simon Riggs, Michael Paquier)
The remote snapshot must have been exported by pg_export_snapshot() or been defined when creating a logical replication slot. This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.
测试:
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# select pg_export_snapshot();
pg_export_snapshot
--------------------
0000072C-1
(1 row)
先不要断开这个事务。等备份启动后再关闭即可(不需要等待备份结束)。
pg95@db-172-16-3-150-> pg_dump --snapshot=0000072C-1
使用这个SNAPSHOT导出。
对于逻辑复制,我们需要使用逻辑流复制协议创建slot,然后开启备份。
例子:
vi pg_hba.conf
# replication privilege.
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
host replication postgres ::1/128 trust
pg_ctl reload
在数据库端使用test_encoding记录逻辑变更到WAL中。
使用流复制协议连接数据库。
pg95@db-172-16-3-150-> psql 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres replication=database'
psql (9.5devel)
Type "help" for help.
postgres=# CREATE_REPLICATION_SLOT ab12 LOGICAL "/opt/pgsql9.5/lib/test_decoding.so";
slot_name | consistent_point | snapshot_name | output_plugin
-----------+------------------+---------------+------------------------------------
ab12 | 7/77B59A00 | 00000736-1 | /opt/pgsql9.5/lib/test_decoding.so
(1 row)
使用这个SNAPSHOT导出。
pg95@db-172-16-3-150-> pg_dump --snapshot=00000736-1
另外我们可以使用pg_recvlogical从这个slot开始接收逻辑变更:
pg_recvlogical -S ab12 -d postgres -v -f - --start
BEGIN 1849
table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:12
COMMIT 1849
有了dump(基础备份)+逻辑变更(SQL)+exec sql模块,就可以完成基于SQL的逻辑复制。
使用SQL函数也可以消费slot的变更。
-[ RECORD 9 ]+-----------------------------------
slot_name | ab12
plugin | /opt/pgsql9.5/lib/test_decoding.so
slot_type | logical
datoid | 13181
database | postgres
active | f
active_pid |
xmin |
catalog_xmin | 1850
restart_lsn | 7/77B5A3B0
postgres=# select * from pg_replication_slots ;
postgres=# insert into t1 values (1);
INSERT 0 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('ab12', NULL, NULL, 'include-timestamp', '1', 'include-xids', '1');
location | xid | data
------------+------+----------------------------------------------------------------------------------------------------------------
----------
7/77B5ABB8 | 1852 | BEGIN 1852
7/77B5ABB8 | 1852 | table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[b
igint]:15
7/77B5AD98 | 1852 | COMMIT 1852 (at 2000-01-01 08:00:00+08)
(3 rows)
但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.
[其他]
1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。
例如:
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2. 流复制协议详见
[参考]
5. test_decoding支持的options, contrib/test_decoding/test_decoding.c
include-xids
include-timestamp
force-binary
skip-empty-xacts
only-local
如下:
foreach(option, ctx->output_plugin_options)
{
DefElem *elem = lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));
if (strcmp(elem->defname, "include-xids") == 0)
{
/* if option does not provide a value, it means its value is true */
if (elem->arg == NULL)
data->include_xids = true;
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-timestamp") == 0)
{
if (elem->arg == NULL)
data->include_timestamp = true;
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "force-binary") == 0)
{
bool force_binary;
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &force_binary))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
}
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "only-local") == 0)
{
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown",
elem->defname,
elem->arg ? strVal(elem->arg) : "(null)")));
}
6. src/backend/replication/walsender.c
创建逻辑slot时,自动创建snapshot。
/*
* Create a new replication slot.
*/
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
......
if (cmd->kind == REPLICATION_KIND_LOGICAL)
{
......
/*
* Export a plain (not of the snapbuild.c type) snapshot to the user
* that can be imported into another session.
*/
snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
......
注意在执行replication命令时,会先释放snapshot,因此务必在这之前将这个snapshot 给pg_dump导入。
/*
* Execute an incoming replication command.
*/
void
exec_replication_command(const char *cmd_string)
{
......
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
* command arrives. Clean up the old stuff if there's anything.
*/
SnapBuildClearExportedSnapshot();
......