PostgreSQL 10.0 逻辑复制原理与最佳实践

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

标签

PostgreSQL , logical replication , 逻辑复制 , 最佳实践


背景

PostgreSQL 从2010年发布的9.0开始支持流式物理复制,备库可以作为只读库打开,提供给用户使用。

物理复制的好处

1. 物理层面完全一致,这是许多商业数据库的惯用手段。例如Oracle的DG。

2. 延迟低,事务执行过程中产生REDO record,实时的在备库apply,事务结束时,备库立马能见到数据。不论事务多大,都一样。

3. 物理复制的一致性、可靠性达到了金融级的需求,不必担心数据逻辑层面不一致。

但是物理复制要求主备块级完全一致,所以有一些无法覆盖的应用场景,例如备库不仅要只读,还要可写。又比如备库不需要完全和主库一致,只需要复制部分数据,或者备库要从多个数据源复制数据,等等。

物理复制无法覆盖的场景

1. 数据库实例的部分,例如单个数据库或者某些表的复制需求。

例如某个游戏业务,账号体系是一套数据库,如果全国各地有多个接入点,全部都连到中心数据库进行认证可能不太科学。那么就希望将登陆需要用到的一些数据表同步到多个数据中心,而不是整个数据库实例。

2. 数据到达subcriber后,针对不同数据,设置触发器。

3. 将多个数据库实例的数据,同步到一个目标数据库。

例如多个数据库同步到一个大的数据仓库。

4. 在不同的数据库版本之间,复制数据

5. 将一个数据库实例的不同数据,复制到不同的目标库。

例如省级数据库的数据,按地区划分,分别复制到不同的地区。

6. 在多个数据库实例之间,共享部分数据。

例如某个业务按用户ID哈希,拆分成了8个数据库,但是有些小的维度表,需要在多个数据库之间共享。

以上场景是物理复制无法覆盖的。

逻辑复制应运而生,实际上,从2014年发布的9.4版本开始,PostgreSQL就支持逻辑复制了,只是一直没有将其引入内核。

2017年即将发布的10.0,将会在内核层面支持基于REDO流的逻辑复制。

另一个好消息是,你可以针对同一个数据库实例,同时使用逻辑复制和物理复制,因为他们都是基于REDO的。

下面我们来看一下逻辑复制的概念、架构、监控、安全、最佳实践。

逻辑复制概念

PostgreSQL 逻辑复制是事务级别的复制,引入了几个概念

publication - 发布者

发布者指数据上游节点,你需要将哪些表发布出去?

上游节点需要配置这些东西

1. 需要将数据库的REDO的wal_level配置为logical。

2. 需要发布逻辑复制的表,必须配置表的REPLICA IDENTITY,即如何标示老的记录。

被复制的表,建议有PK约束。

alter table table_name  
    REPLICA IDENTITY { DEFAULT | USING INDEX index_name | FULL | NOTHING }  

解释

REPLICA IDENTITY  

This form changes the information which is written to the write-ahead log to identify rows which are updated or deleted.   

This option has no effect except when logical replication is in use.   

记录PK列的  
1. DEFAULT (the default for non-system tables) records the old values of the columns of the primary key, if any.   

记录指定索引列(索引的所有列须是not null列,其实和PK一样,但是某些情况下,你可以选一个比PK更小的UK)  
2. USING INDEX records the old values of the columns covered by the named index, which must be unique, not partial, not deferrable, and include only columns marked NOT NULL.   

记录完整记录  
3. FULL records the old values of all columns in the row.   

啥也不记录,这样做是否不支持update, delete?   
user_catalog_table=true或者系统表,默认为replica identity nothing啥也不记录。如果这种表发布出去了,允许insert,但是执行delete或者update时,会报错。  
4. NOTHING records no information about the old row (This is the default for system tables.)   

仅仅当数据有变更时才会记录old value,比如delete。 或者update前后old.*<>new.*。      
In all cases, no old values are logged unless at least one of the columns that would be logged differs between the old and new versions of the row.  

什么是system table?

指catalog或者user_catalog_table = true的表,不允许修改列的数据类型。

postgres=# \d+ c  
                           Table "public.c"  
 Column |  Type   | Modifiers | Storage  | Stats target | Description   
--------+---------+-----------+----------+--------------+-------------  
 id     | integer |           | plain    |              |   
 info   | text    |           | extended |              |   
 c      | text    |           | extended |              |   
Replica Identity: FULL  
Options: user_catalog_table=true  

postgres=# alter table c alter column c type int8;  
ERROR:  column "c" cannot be cast automatically to type bigint  
HINT:  You might need to specify "USING c::bigint".  

解释

create table table_name () with (user_catalog_table =true);  


user_catalog_table (boolean)  
Declare the table as an additional catalog table for purposes of logical replication. See Section 48.6.2, “Capabilities” for details. This parameter cannot be set for TOAST tables.  


To decode, format and output changes, output plugins can use most of the backend's normal infrastructure, including calling output functions.   
Read only access to relations is permitted as long as only relations are accessed that either have been created by initdb in the pg_catalog schema,   
or have been marked as user provided catalog tables using  

ALTER TABLE user_catalog_table SET (user_catalog_table = true);  
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);  

Any actions leading to transaction ID assignment are prohibited.   
That, among others, includes writing to tables, performing DDL changes, and calling txid_current().  

3. output plugin

发布者还需要一个output plugin,将redo按发布的定义,解析成需要的格式,等待订阅者的订阅。

https://www.postgresql.org/docs/devel/static/logicaldecoding-output-plugin.html

是不是有点像这个呢?

《PostgreSQL 闪回 - flash back query emulate by trigger》

发布语法

创建发布

Command:     CREATE PUBLICATION  
Description: define a new publication  
Syntax:  
CREATE PUBLICATION name  
    [ FOR TABLE table_name [, ...]  
      | FOR ALL TABLES ]  
    [ WITH ( option [, ... ] ) ]  

where option can be:  

      PUBLISH INSERT | NOPUBLISH INSERT  
    | PUBLISH UPDATE | NOPUBLISH UPDATE  
    | PUBLISH DELETE | NOPUBLISH DELETE  

默认发布insert,update,delete。  

修改发布

Command:     ALTER PUBLICATION  
Description: change the definition of a publication  
Syntax:  
ALTER PUBLICATION name WITH ( option [, ... ] )  

where option can be:  

      PUBLISH INSERT | NOPUBLISH INSERT  
    | PUBLISH UPDATE | NOPUBLISH UPDATE  
    | PUBLISH DELETE | NOPUBLISH DELETE  

ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }  
ALTER PUBLICATION name ADD TABLE table_name [, ...]  
ALTER PUBLICATION name SET TABLE table_name [, ...]  
ALTER PUBLICATION name DROP TABLE table_name [, ...]  

发布者小结

1. 目前仅仅支持发布表,不允许发布其他对象。

2. 同一张表,可以发布多次。

3. 在同一个数据库中,可以创建多个publication,但是不能重名,通过系统表查看已创建的publication

postgres=# \d pg_publication  
            Table "pg_catalog.pg_publication"  
    Column    |  Type   | Collation | Nullable | Default   
--------------+---------+-----------+----------+---------  
 pubname      | name    |           | not null |   
 pubowner     | oid     |           | not null |   
 puballtables | boolean |           | not null |   
 pubinsert    | boolean |           | not null |   
 pubupdate    | boolean |           | not null |   
 pubdelete    | boolean |           | not null |   
Indexes:  
    "pg_publication_oid_index" UNIQUE, btree (oid)  
    "pg_publication_pubname_index" UNIQUE, btree (pubname)  

4. 允许使用all tables发布所有表。

5. 一个publication允许有多个订阅者。

6. 目前publication仅支持insert, update, delete。

7. 允许发布时,选择发布insert、update、delete,比如只发布insert,而不发布update, delete。

8. 当发布了表的update, delete时,表必须设置replica identity,即如何标示OLD TUPLE,通过pk或者uk或者full。如果设置了nothing,则执行update,delete时会报错

alter table table_name  
    REPLICA IDENTITY { DEFAULT | USING INDEX index_name | FULL | NOTHING }  

报错例子

postgres=# delete from c;  
ERROR:  cannot delete from table "c" because it does not have replica identity and publishes deletes  
HINT:  To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.  

9. create publication或者alter publication,发布或者修改发布内容中添加或者删除表时,都是事务级别,不会出现复制了部分事务的情况。 so the table will start or stop replicating at the correct snapshot once the transaction has committed。

10. 发布者需要设置wal_level=logical,同时开启足够的worker,设置足够大的replication slot,设置足够多的sender。

因为每一个订阅,都要消耗掉一个replication slot,需要消耗一个wal sender,一个worker进程。

发布者的pg_hba.conf需要设置replication条目,允许订阅者连接。

发布者的数据库中,必须有replication角色的用户,或者超级用户,并且订阅者要使用它通过流复制协议连接到发布者。

subscription - 订阅者

订阅者,需要指定发布者的连接信息,以及 publication name,同时指定需要在publication数据库中创建的slot name。

在同一个数据库中,可以创建多个订阅。

订阅者和发布者的角色可以同时出现在同一个实例的同一个数据库中。

订阅语法

创建订阅

Command:     CREATE SUBSCRIPTION  
Description: define a new subscription  
Syntax:  
CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION { publication_name [, ...] } [ WITH ( option [, ... ] ) ]  

where option can be:  

    | ENABLED | DISABLED  
    | CREATE SLOT | NOCREATE SLOT  
    | SLOT NAME = slot_name  

修改订阅

Command:     ALTER SUBSCRIPTION  
Description: change the definition of a subscription  
Syntax:  
ALTER SUBSCRIPTION name WITH ( option [, ... ] ) ]  

where option can be:  

  SLOT NAME = slot_name  

ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }  
ALTER SUBSCRIPTION name CONNECTION 'conninfo'  
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...]  
ALTER SUBSCRIPTION name ENABLE  
ALTER SUBSCRIPTION name DISABLE  

订阅者小结

1. 订阅者需要通过流复制协议连接到发布者,同时需要在发布者创建replication slot。

因此发布者的pg_hba.conf中需要配置相应的replication条目,允许订阅者通过流复制协议连接。

同时连接发布者的用户,必须具备replication权限,或者具备超级用户权限。

2. 同一个数据库中,可以创建多个subscription,这些subscription可以连自一个或多个发布者。

3. 当同一个数据库中有多个subscription时,如果这些subscriptions是来自同一个发布者,那么他们之间发布的表不能重叠。

也就是说,订阅者的同一张表,不能接受来自同一个源的多个发布。

例如

发布者

create table public.a (id int primary key, info text);  

create publication pub1 for table a;  

create publication pub2 for table a;  

订阅者

A表接受了同一个源的多次发布,会报错。

create table public.a (id int primary key, info text);  

create subscription sub1 connection 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres' publication pub1;  

create subscription sub2 connection 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres' publication pub2;  

4. 每一个订阅,都需要在发布端创建一个slot,可以使用slot name = ?指定,或者默认为subscription name。

即使是同一个发布端,只要订阅了多次,就需要创建多个SLOT,因为slot中记录了同步的LSN信息。

例如

create table public.a (id int primary key, info text);  
create table public.b (id int primary key, info text);  

create publication pub1 for table a;  

create publication pub2 for table b;  

订阅者

create table public.a (id int primary key, info text);  
create table public.b (id int primary key, info text);  

create subscription sub1 connection 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres' publication pub1;  

create subscription sub2 connection 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres' publication pub2;  

这种情况,对于这个订阅者,建议合并成一个,例如

create subscription sub1 connection 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres' publication pub1, pub2;  

5. pg_dump导出数据库逻辑数据时,默认不会导出subscription的定义,除非使用选项 --include-subscriptions

6. 在创建subscription或者alter subscription时,可以使用enable来启用该订阅,或者使用disable暂停该订阅。

7. 如果要完全删除订阅,使用drop subscription,注意,删除订阅后,本地的表不会被删除,数据也不会清除,仅仅是不在接收该订阅的上游信息。

这个也很好理解,因为同一个表可能接收多个订阅。删订阅和删表是两码事。

8. 删除订阅后,如果要重新使用该订阅,数据需要resync,比如订阅的上游节点有100万数据,resync会将这100万数据同步过来。随后进入增量同步。

将来10.0正式发布时,也许会提供一个选项,选择要不要resync。

(目前来说,一次订阅,意味着这些被订阅的表会和发布端一模一样(只要发布端发布了insert,update,delete语句)。如果发布端只发布了insert,那么源表的update和delete不会被订阅)

9. 订阅时,不会自动创建发布端的表,所以表需要在订阅端先创建好。

将来10.0正式发布时,也许会填补这个功能。

目前发布端和订阅端的表定义必须完全一致,包括

schema,表名必须一致。

字段名和字段类型必须一致。

字段顺序可以不一致。

除了表,其他对象都不能被订阅,例如你不能将表订阅到一张视图中。

10. 必须使用超级用户创建订阅

逻辑复制的冲突

逻辑复制,本质上是事务层级的复制,需要在订阅端执行SQL。

如果订阅端执行SQL失败(或者说引发了任何错误,包括约束等),都会导致该订阅暂停。

注意,update, delete没有匹配的记录时,不会报错,也不会导致订阅暂停。

用户可以在订阅端数据库日志中查看错误原因。

冲突修复方法

1. 通过修改订阅端的数据,解决冲突。例如insert违反了唯一约束时,可以删除订阅端造成唯一约束冲突的记录先DELETE掉。然后使用ALTER SUBSCRIPTION name ENABLE让订阅继续。

2. 在订阅端调用pg_replication_origin_advance(node_name text, pos pg_lsn)函数,node_name就是subscription name,pos指重新开始的LSN,从而跳过有冲突的事务。

pg_replication_origin_advance(node_name text, pos pg_lsn)     

Set replication progress for the given node to the given position.   

This primarily is useful for setting up the initial position or a new position after configuration changes and similar.   

Be aware that careless use of this function can lead to inconsistently replicated data.  

当前的lsn通过pg_replication_origin_status.remote_lsn查看。

https://www.postgresql.org/docs/devel/static/view-pg-replication-origin-status.html

逻辑复制架构

pic

pic

1. 在创建subscription后,订阅者会在发布端创建一个快照,同时将发布端的数据,在同一个快照内的视角,发送给订阅端。

例如订阅了发布端的A,B,C三张表,那么这三张表的当前快照下的数据,会发送给订阅端。

2. 订阅端接收完快照后,发布端会从快照的这个LSN开始,从WAL(REDO)日志中,根据发布定义的表以及过滤条件(INSERT\UPDATE\DELETE),按事务组装复制的消息包,通过流复制协议发送给订阅端的apply(wal receiver)进程。

3. 订阅端接收到消息包之后,对于同一个订阅(wal reciever或applyer进程)来说,会按照事务的先后顺序,按事务apply。所以在订阅端,apply也是事务一致的。

将来可能会考虑组复制,提高并发性。

PS

其实你可以把不同的表分别放在不同的订阅中,这样就是并行的了。但是消耗的wal sender进程与连接会多一些。

4. 在订阅端,wal receiver(applyer)的session_replication_role会设置为replica。

这个影响数据库的trigger和rule。

参考

《PostgreSQL trigger/rule based replication configure, DISABLE/ENABLE [ REPLICA | ALWAYS ] TRIGGER | RULE》

《Can session_replication_role used like MySQL's BlackHole Engine?》

逻辑复制的监控

逻辑复制依旧使用的是2010年推出的流复制协议,所以监控手段差别不大。

订阅端视图pg_stat_subscription

                      View "pg_catalog.pg_stat_subscription"  
        Column         |           Type           | Collation | Nullable | Default   
-----------------------+--------------------------+-----------+----------+---------  
 subid                 | oid                      |           |          |   
 subname               | name                     |           |          |   
 pid                   | integer                  |           |          |   
 received_lsn          | pg_lsn                   |           |          |   
 last_msg_send_time    | timestamp with time zone |           |          |   
 last_msg_receipt_time | timestamp with time zone |           |          |   
 latest_end_lsn        | pg_lsn                   |           |          |   
 latest_end_time       | timestamp with time zone |           |          |   

每一个subcription有一条记录,一个订阅可能有多个active subscription workers。

对于一个已激活(enabled)的订阅,对应有1个apply进程,所以在这个视图中有一条记录。

一个暂停或者crash的订阅,在这个视图中不会有记录。

逻辑复制安全、权限

发布端

1. 必须设置pg_hba.conf,允许订阅端通过流复制连接发布端

2. wal_level必须设置为logical,记录逻辑复制的一些额外信息

3. 订阅端配置的conninfo中,发布端的角色必须具备replication权限,或者超级用户权限

4. 使用某个用户在某个数据库中创建publication,这个用户必须对该数据库具备create权限。

订阅端

1. 订阅端创建subscription的用户,必须是超级用户

权限检测仅仅在连接发布端的时候,后期不会检测,比如从发布端获取数据,或者apply数据时,不再检测是否为超级用户。

逻辑复制的postgresql.conf配置

发布端

1. wal_level=logical

2. max_replication_slots,每一个订阅需要消耗一个slot,每一个指定了slot的流式物理复制也要消耗一个slot。

3. max_wal_senders,每一个slot要使用一个wal sender,每一个流式物理复制也要使用一个wal sender。

4. max_worker_processes,必须大于等于max_wal_senders加并行计算进程,或者其他插件需要fork的进程数。

订阅端

1. max_replication_slots,大于等于该实例总共需要创建的订阅数

2. max_logical_replication_workers,大于等于该实例总共需要创建的订阅数

3. max_worker_processes, 大于等于max_logical_replication_workers + 1 + CPU并行计算 + 其他插件需要fork的进程数.

逻辑复制最佳实践case

1. 部署PostgreSQL 10.0

wget https://ftp.postgresql.org/pub/snapshot/dev/postgresql-snapshot.tar.bz2  

tar -jxvf postgresql-snapshot.tar.bz2  

cd postgresql-10devel  

export USE_NAMED_POSIX_SEMAPHORES=1  
LIBS=-lpthread CC="/home/digoal/gcc6.2.0/bin/gcc" CFLAGS="-O3 -flto" ./configure --prefix=/home/digoal/pgsql10  
LIBS=-lpthread CC="/home/digoal/gcc6.2.0/bin/gcc" CFLAGS="-O3 -flto" make world -j 64  
LIBS=-lpthread CC="/home/digoal/gcc6.2.0/bin/gcc" CFLAGS="-O3 -flto" make install-world  

2. 规划数据目录

mkdir /disk1/digoal/pgdata/pg_root1922  

3. 配置环境变量

vi env_pg10.sh   
export PS1="$USER@`/bin/hostname -s`-> "  
export PGPORT=1922  
export PGDATA=/disk1/digoal/pgdata/pg_root1922  
export LANG=en_US.utf8  
export PGHOME=/home/digoal/pgsql10  
export LD_LIBRARY_PATH=/home/digoal/python2.7.12/lib:/home/digoal/gcc6.2.0/lib64:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH  
export LD_RUN_PATH=$LD_LIBRARY_PATH  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=/home/digoal/python2.7.12/bin:/home/digoal/gcc6.2.0/bin:$PGHOME/bin:$PATH:.  
export MANPATH=$PGHOME/share/man:$MANPATH  
export PGHOST=$PGDATA  
export PGUSER=postgres  
export PGDATABASE=postgres  
alias rm='rm -i'  
alias ll='ls -lh'  
unalias vi  

4. 初始化数据库

initdb -D $PGDATA -U postgres -E SQL_ASCII --locale=C  

5. 配置postgresql.conf

listen_addresses = '0.0.0.0'  
port = 1922  
max_connections = 1000  
superuser_reserved_connections = 3  
unix_socket_directories = '.'  
shared_buffers = 32GB  
maintenance_work_mem = 2GB  
autovacuum_work_mem = 2GB  
dynamic_shared_memory_type = posix  
vacuum_cost_delay = 0  
bgwriter_delay = 10ms  
bgwriter_lru_maxpages = 1000  
bgwriter_lru_multiplier = 10.0  
bgwriter_flush_after = 0  
max_worker_processes = 128  
max_parallel_workers_per_gather = 8  
max_parallel_workers = 32             
max_logical_replication_workers = 32  
backend_flush_after = 0  
wal_level = logical  
synchronous_commit = off  
full_page_writes = off  
wal_buffers = 2047MB  
wal_writer_delay = 10ms  
wal_writer_flush_after = 0  
checkpoint_timeout = 55min  
max_wal_size = 64GB  
min_wal_size = 32GB  
checkpoint_completion_target = 0.05  
checkpoint_flush_after = 0  
max_wal_senders = 32  
max_replication_slots = 32  
random_page_cost = 1.5  
parallel_tuple_cost = 0  
parallel_setup_cost = 0  
min_parallel_table_scan_size = 8MB  
min_parallel_index_scan_size = 512kB  
effective_cache_size = 512GB  
wal_receiver_status_interval = 1s  
hot_standby_feedback = on  
log_destination = 'csvlog'  
logging_collector = on  
log_truncate_on_rotation = on  
log_checkpoints = on  
log_connections = on  
log_disconnections = on  
log_error_verbosity = verbose    
log_timezone = 'PRC'  
log_autovacuum_min_duration = 0  
autovacuum_naptime = 20s  
autovacuum_vacuum_scale_factor = 0.01  
autovacuum_analyze_scale_factor = 0.05  
autovacuum_vacuum_cost_delay = 0  
datestyle = 'iso, mdy'  
timezone = 'PRC'  
lc_messages = 'C'  
lc_monetary = 'C'  
lc_numeric = 'C'  
lc_time = 'C'  
default_text_search_config = 'pg_catalog.english'  

6. 配置pg_hba.conf

host    replication     postgres        127.0.0.1/32            trust  
host    replication     postgres        ::1/128                 trust  
host all all 0.0.0.0/0 md5  
host replication postgres 0.0.0.0/0 md5  

7. 配置发布端

postgres=# create database src with template template0 ;  
CREATE DATABASE  
src=# create table public.t1(id int primary key, info text, crt_time timestamp);  
CREATE TABLE  

src=# create publication pub1 for table public.t1;  
CREATE PUBLICATION  

查看当前数据库有哪些发布

src=# select * from pg_publication;  
 pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete   
---------+----------+--------------+-----------+-----------+-----------  
 pub1    |       10 | f            | t         | t         | t  
(1 row)  

订阅端创建成功后,可以查看发布端的slot

postgres=# select * from pg_stat_replication ;  
-[ RECORD 1 ]----+------------------------------  
pid              | 48097  
usesysid         | 10  
usename          | postgres  
application_name | sub1_from_pub1  
client_addr      | xxx.xxx.xxx.xxx  
client_hostname  |   
client_port      | 57452  
backend_start    | 2017-02-28 14:01:37.686917+08  
backend_xmin     |   
state            | streaming  
sent_location    | 0/170EB58   -- 已发送截止至该LSN  
write_location   | 0/170EB58  
flush_location   | 0/170EB58  
replay_location  | 0/170EB58  
sync_priority    | 0  
sync_state       | async  

postgres=# select * from pg_replication_slots ;  
-[ RECORD 1 ]-------+---------------  
slot_name           | sub1_from_pub1  
plugin              | pgoutput  
slot_type           | logical  
datoid              | 24669  
database            | src  
temporary           | f  
active              | t  
active_pid          | 48097  
xmin                |   
catalog_xmin        | 31813036  
restart_lsn         | 1/DFE44F68   -- 订阅者也许需要从这个LSN开始接收,或者说,这个位置以及以后的日志都要保留,不会被checkpoint清除。  
confirmed_flush_lsn | 2/90BAEE70   -- 订阅者报告的,已经接收到该LSN的位置。  

8. 配置订阅端

postgres=# create database dst with template template0;  
CREATE DATABASE  
postgres=# \c dst  
You are now connected to database "dst" as user "postgres".  
dst=# create table public.t1(id int primary key, info text, crt_time timestamp);  
CREATE TABLE  

dst=# create subscription sub1_from_pub1 connection 'hostaddr=xxx.xxx.xxx.xxx port=1922 user=postgres dbname=src' publication pub1 with (enabled, create slot, slot name='sub1_from_pub1');  
NOTICE:  created replication slot "sub1_from_pub1" on publisher  
CREATE SUBSCRIPTION  

查看整个数据库集群有哪些订阅

dst=# select * from pg_subscription ;  
 subdbid |    subname     | subowner | subenabled |                      subconninfo                      |  subslotname   | subpublications   
---------+----------------+----------+------------+-------------------------------------------------------+----------------+-----------------  
   24680 | sub1_from_pub1 |       10 | t          | hostaddr=xxx.xxx.xxx.xxx port=1922 user=postgres dbname=src | sub1_from_pub1 | {pub1}  
(3 rows)  

postgres=# select * from pg_stat_subscription ;  
-[ RECORD 1 ]---------+------------------------------  
subid                 | 24689  
subname               | sub1_from_pub1  
pid                   | 48096  
received_lsn          | 0/7BE6AA68  -- 这是个相对LSN,从0开始算。         
last_msg_send_time    |   
last_msg_receipt_time | 2017-02-28 14:12:43.363582+08  
latest_end_lsn        | 0/7AED52D8  -- 这个也是相对LSN,订阅者会FEEDBACK LSN给发布,这个LSN表示已经反馈给发布者wal sender进程的LSN。所以比received_lsn略小    
latest_end_time       | 2017-02-28 14:12:41.334091+08  

9. 小事务压测

vi test.sql  
\set id random(1,100000000)  
insert into public.t1 values (:id,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 120 src  

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 21912364  
latency average = 0.350 ms  
latency stddev = 4.398 ms  
tps = 182522.845458 (including connections establishing)  
tps = 182543.043852 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set id random(1,100000000)  
         0.349  insert into t1 values (:id,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

观测延迟

发布端decode, output plugin压力较大,写入TPS达到18万时,延迟较严重

48096 digoal  20   0 38.3g 5.4g 2.1g R 100.0  1.1   1:40.37 postgres: bgworker: logical replication worker for subscription 24689                      
48097 digoal  20   0 34.9g 5408 3856 S 40.3   0.0   0:38.24 postgres: wal sender process postgres 127.0.0.1(57452) idle  

发布端

postgres=# select pg_size_pretty(pg_wal_location_diff(pg_current_wal_insert_location(), sent_location)), pg_size_pretty(pg_wal_location_diff(pg_current_wal_insert_location(), replay_location)), * from pg_stat_replication ;  
-[ RECORD 1 ]----+------------------------------  
pg_size_pretty   | 5072 MB  
pg_size_pretty   | 5263 MB  
pid              | 48097  
usesysid         | 10  
usename          | postgres  
application_name | sub1_from_pub1  
client_addr      | 127.0.0.1  
client_hostname  |   
client_port      | 57452  
backend_start    | 2017-02-28 14:01:37.686917+08  
backend_xmin     |   
state            | streaming  
sent_location    | 0/FC673B10  
write_location   | 0/F0761CC0  
flush_location   | 0/F075FB70  
replay_location  | 0/F0761CC0  
sync_priority    | 0  
sync_state       | async  

\watch 1  

订阅端

postgres=# select pg_size_pretty(pg_wal_location_diff(received_lsn, latest_end_lsn)), * from pg_stat_subscription ;  
-[ RECORD 1 ]---------+------------------------------  
pg_size_pretty        | 68 MB  
subid                 | 24689  
subname               | sub1_from_pub1  
pid                   | 48096  
received_lsn          | 1/2F894A60  
last_msg_send_time    |   
last_msg_receipt_time | 2017-02-28 14:18:48.447466+08  
latest_end_lsn        | 1/2B498FC8  
latest_end_time       | 2017-02-28 14:18:38.431168+08  

\watch 1  

订阅端还可以查看pg_replication_origin_status,

remote_lsn指已复制到订阅者的发布者的LSN。

local_lsn指本地已持久化REDO的LSN,只有在这个LSN之前的DIRTY PAGE才能flush到磁盘。

dst=# select * from pg_replication_origin_status;  
 local_id | external_id | remote_lsn | local_lsn    
----------+-------------+------------+------------  
        1 |             | 0/0        | 0/0  
        3 | pg_24689    | 6/5931F948 | 7/A79CA450  
(2 rows)  

对应的数据结构

/*  
 * Replay progress of a single remote node.  
 */  
typedef struct ReplicationState  
{  
        /*  
         * Local identifier for the remote node.  
         */  
        RepOriginId roident;  

        /*  
         * Location of the latest commit from the remote side.  
         */  
        XLogRecPtr      remote_lsn;  

        /*  
         * Remember the local lsn of the commit record so we can XLogFlush() to it  
         * during a checkpoint so we know the commit record actually is safe on  
         * disk.  
         */  
        XLogRecPtr      local_lsn;  

        /*  
         * PID of backend that's acquired slot, or 0 if none.  
         */  
        int                     acquired_by;  

        /*  
         * Lock protecting remote_lsn and local_lsn.  
         */  
        LWLock          lock;  
} ReplicationState;  

延迟较大

物理流复制几乎0延迟,而逻辑复制带来的延迟是比较大的。

10. 大事务压测

\c src  

delete from t1;  
DELETE 46844120  

观测延迟

\c dst  

select * from t1 limit 1;  
\watch 1  

开始时间  
Tue 28 Feb 2017 04:04:08 PM CST (every 1s)  

 id | info |          crt_time            
----+------+----------------------------  
  1 | test | 2017-02-28 14:04:03.120126  
(1 row)  

结束时间  

延迟较大

逻辑复制实时性

实际上PostgreSQL的逻辑复制也是流式的,事务没有结束时,发布端事务过程中产生的变更会实时的同步并在订阅端APPLY。

但是逻辑复制始终是基于行的,延迟和块级复制还是差很大。

关于订阅端异步提交

订阅端如果开启了异步提交,那么哪些脏页(shared buffer)能flush到磁盘呢?

查看pg_replication_origin_status,

remote_lsn指已复制到订阅者的发布者的LSN。

local_lsn指本地已持久化REDO的LSN,只有在这个LSN之前的DIRTY PAGE才能flush到磁盘。(这样做可以保证可靠性和一致性)

dst=# select * from pg_replication_origin_status;  
 local_id | external_id | remote_lsn | local_lsn    
----------+-------------+------------+------------  
        1 |             | 0/0        | 0/0  
        3 | pg_24689    | 6/5931F948 | 7/A79CA450  
(2 rows)  

注意事项

1. 如果被订阅的数据有主外键约束,请将其作为一个订阅。否则可能会有约束的问题

2. 因为逻辑复制在订阅端实际上是逻辑写操作,请尽量避免锁冲突。比如truncate , alter table.

3. 在订阅端,delete, update 不存在记录,不报错

参考

https://www.postgresql.org/docs/devel/static/logical-replication.html

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
3月前
|
SQL 关系型数据库 数据库
[postgresql]逻辑备份与还原
[postgresql]逻辑备份与还原
|
关系型数据库 数据管理 Go
《PostgreSQL数据分区:原理与实战》
《PostgreSQL数据分区:原理与实战》
201 0
|
6月前
|
缓存 运维 关系型数据库
PostgreSQL技术大讲堂 - 第43讲:流复制原理
PostgreSQL技术大讲堂 - 第43讲:流复制原理
287 2
|
6月前
|
SQL 关系型数据库 分布式数据库
PolarDB for PostgreSQL逻辑复制问题之逻辑复制冲突如何解决
PolarDB for PostgreSQL是基于PostgreSQL开发的一款云原生关系型数据库服务,它提供了高性能、高可用性和弹性扩展的特性;本合集将围绕PolarDB(pg)的部署、管理和优化提供指导,以及常见问题的排查和解决办法。
|
11月前
|
存储 人工智能 关系型数据库
postgresql从入门到精通教程 - 第36讲:postgresql逻辑备份
postgresql从入门到精通教程 - 第36讲:postgresql逻辑备份
201 1
|
存储 关系型数据库 Go
《深入PostgreSQL的存储引擎:原理与性能》
《深入PostgreSQL的存储引擎:原理与性能》
684 0
|
安全 关系型数据库 数据库
《确保安全:PostgreSQL安全配置与最佳实践》
《确保安全:PostgreSQL安全配置与最佳实践》
655 0
|
存储 关系型数据库 Go
《PostgreSQL备份与恢复:步骤与最佳实践》
《PostgreSQL备份与恢复:步骤与最佳实践》
529 0
|
监控 关系型数据库 API
PostgreSQL 13、14中逻辑复制/解码改进
PostgreSQL 13、14中逻辑复制/解码改进
239 0
|
存储 关系型数据库 分布式数据库
PostgreSQL 14中两阶段提交的逻辑解码
PostgreSQL 14中两阶段提交的逻辑解码
197 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版