我们知道PostgreSQL 9.4新增了逻辑流复制的功能, 在客户端连接数据库服务器时, 通过发送给数据库的startup packet来判断是否要数据库启动wal sender, 并且如何来识别是逻辑复制还是普通的流复制.
数据库处理启动包的代码9.3和9.4的区别 :
PostgreSQL 9.4的
ProcessStartupPacket@src/backend/postmaster/postmaster.c
9.4根据am_db_walsender标示是否逻辑复制.
PostgreSQL 9.3的
/*
* Read a client's startup packet and do something according to it.
*
* Returns STATUS_OK or STATUS_ERROR, or might call ereport(FATAL) and
* not return at all.
*
* (Note that ereport(FATAL) stuff is sent to the client, so only use it
* if that's what you want. Return STATUS_ERROR if you don't want to
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
static int
ProcessStartupPacket(Port *port, bool SSLdone)
...
if (strcmp(nameptr, "database") == 0)
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
{
/*
* Due to backward compatibility concerns the replication
* parameter is a hybrid beast which allows the value to be
* either boolean or the string 'database'. The latter
* connects to a specific database which is e.g. required for
* logical decoding while.
*/
if (strcmp(valptr, "database") == 0)
{
am_walsender = true;
am_db_walsender = true;
}
else if (!parse_bool(valptr, &am_walsender))
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for parameter \"replication\""),
errhint("Valid values are: false, 0, true, 1, database.")));
}
else
{
/* Assume it's a generic GUC option */
port->guc_options = lappend(port->guc_options,
pstrdup(nameptr));
port->guc_options = lappend(port->guc_options,
pstrdup(valptr));
}
在9.4中, replication=database字符串时, 启动walsender, 并且连接到一个数据库, 逻辑复制必须连接到一个数据库 这是和9.3的差异
am_walsender = true;
am_db_walsender = true;
replication=0或1时, am_walsender=0/1, am_db_walsender = 默认的false.
9.4根据am_db_walsender标示是否逻辑复制.
/*
* Normal walsender backends, e.g. for streaming replication, are not
* connected to a particular database. But walsenders used for logical
* replication need to connect to a specific database. We allow streaming
* replication commands to be issued even if connected to a database as it
* can make sense to first make a basebackup and then stream changes
* starting from that.
*/
if (am_walsender && !am_db_walsender)
port->database_name[0] = '\0';
PostgreSQL 9.3的
ProcessStartupPacket@src/backend/postmaster/postmaster.c
ProcessStartupPacket@src/backend/postmaster/postmaster.c
if (strcmp(nameptr, "database") == 0)
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
{
if (!parse_bool(valptr, &am_walsender))
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"replication\"")));
}
else
{
/* Assume it's a generic GUC option */
port->guc_options = lappend(port->guc_options,
pstrdup(nameptr));
port->guc_options = lappend(port->guc_options,
pstrdup(valptr));
}
PostgreSQL 9.3 没有am_db_walsender 这个变量. 因为9.3不支持logical replication. 这个变量时9.4新增的.
从上面的内容可以看到, 我们在启动包中加入这些信息, 服务端是如何来处理的.
例如加入replication=database, 那么就表示这是一个逻辑复制链接, 数据库服务端要启动一个wal sender.
而如果加入其它的, 如database=mydbname, 则表示要连接到哪个数据库.
user=username则表示用哪个用户连接.
例如, 使用psql可以设置一些变量, 包括options :
pg94@db-172-16-3-150-> psql "user=digoal"
psql: FATAL: role "digoal" does not exist
pg94@db-172-16-3-150-> psql "user=postgres"
psql (9.4devel)
Type "help" for help.
pg94@db-172-16-3-150-> psql "options=-B=1024kB"
psql: FATAL: parameter "shared_buffers" cannot be changed without restarting the server
这个options是postgres 的options. 在psql中设置没有效果.
详见man postgres
其它还可以设置guc的变量
设置流复制连接 :
因为没有配置, 报连接不上, 修改pg_hba.conf后重试
pg94@db-172-16-3-150-> psql "encoding=sql_"
psql: invalid connection option "encoding"
pg94@db-172-16-3-150-> psql "client_encoding=sql_a"
psql (9.4devel)
Type "help" for help.
digoal=# show client_encoding;
client_encoding
-----------------
UTF8
(1 row)
设置流复制连接 :
pg94@db-172-16-3-150-> psql "replication=database"
psql: FATAL: no pg_hba.conf entry for replication connection from host "[local]", user "postgres", SSL off
因为没有配置, 报连接不上, 修改pg_hba.conf后重试
local replication all trust
vi postgresql.conf
wal_level = logical
max_wal_senders = 10
pg_ctl restart -m fast
pg94@db-172-16-3-150-> psql "replication=database"
psql (9.4devel)
Type "help" for help.
digoal=# create table test(id int);
ERROR: syntax error
digoal=# select 1;
ERROR: syntax error
digoal=# \set VERBOSITY verbose
digoal=# create table test(id int);
ERROR: 42601: syntax error
LOCATION: replication_yyerror, repl_scanner.l:214
注意到, 在startup packet中标识了replication这个变量后, 语法解析会加入repl_scanner.l的部分, 所以只支持流复制协议的语法.
流复制协议的语法参考 :
例如 :
[参考]
digoal=# IDENTIFY_SYSTEM;
systemid | timeline | xlogpos | dbname
---------------------+----------+-------------+--------
6010111239203060891 | 1 | 18/52969CB0 | digoal
(1 row)
digoal=# TIMELINE_HISTORY 1;
ERROR: 58P01: could not open file "pg_xlog/00000001.history": No such file or directory
LOCATION: SendTimeLineHistory, walsender.c:457
digoal=# CREATE_REPLICATION_SLOT slotname PHYSICAL;
ERROR: 53400: all replication slots are in use
HINT: Free one or increase max_replication_slots.
LOCATION: ReplicationSlotCreate, slot.c:243
vi postgresql.conf
max_replication_slots = 10
pg_ctl restart -m fast
pg94@db-172-16-3-150-> psql "replication=database"
psql (9.4devel)
Type "help" for help.
digoal=# CREATE_REPLICATION_SLOT slotname PHYSICAL;
slot_name | consistent_point | snapshot_name | output_plugin
-----------+------------------+---------------+---------------
slotname | 0/0 | |
(1 row)
好了, 写到这里, 大家要了解流复制协议, 可以看看文档, 代码
2. src/backend/postmaster/postmaster.c
3. src/backend/replication/repl_scanner.l