PostgreSQL Logical Replication

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 限制及特性 1、只支持普通表生效,不支持序列、视图、物化视图、外部表、分区表和大对象 2、只支持普通表的DML(INSERT、UPDATE、DELETE)操作,不支持truncate、DDL操作 3、需要同步的表必须设置REPLICA IDENTITY 不能为noting(默认值是default).

限制及特性

1、只支持普通表生效,不支持序列、视图、物化视图、外部表、分区表和大对象
2、只支持普通表的DML(INSERT、UPDATE、DELETE)操作,不支持truncate、DDL操作
3、需要同步的表必须设置REPLICA IDENTITY 不能为noting(默认值是default),同时表中必须包含主键,否则delete和update报错
4、一个publisher可以包含一张或多张表,一张表可以有一个或多个publishers
5、一个发布者可以有多个订阅者订阅,一个订阅者也可以同时订阅多个发布者,在同一个数据库下订阅者不能对同一个发布者的表重复订阅(避免数据冲突)
6、逻辑复制不同于流复制,不是严格的主从关系,订阅者端的普通表依然可以进行增删改操作
7、同步表的表结构需要在发布者和订阅者两边保持一致(列的顺序允许不一样,但是列对应的数据类型必须一致)
8、如果订阅者端的数据被误删,想要从发布者重新copy同步表的数据,只能以重建同步表所在的订阅者的方式来实现

环境搭建

配置数据库参数

1、修改postgresql.conf数据库参数文件(修改这些参数需要重启数据库)
   a、发布者端设置
      设置wal_level级别为logical:wal_level = logical
      设置max_wal_senders,此参数值要不小于max_replication_slots的参数值,默认值是10
      设置max_replication_slots,此参数值不少于subscriptions的个数,默认值是10
   b、订阅者端设置
      设置wal_level级别为logical:wal_level = logical
      设置max_logical_replication_workers,此参数值不少于订阅者的个数,默认是4
      设置max_worker_processes,此参数值不少于max_logical_replication_workers值+1 

2、在pg_hba.conf添加白名单(根据真实情况自行限制网段)
   host     all     repuser     0.0.0.0/0     md5

3、创建专门用于逻辑复制的超户(建议使用uuid作为密码)
   create user repuser superuser login password 'repuser1234';

创建发布者

publisher是逻辑复制的起点

--查看pub_db数据库的发布者
pub_db=# \dRp
                  List of publications
 Name | Owner | All tables | Inserts | Updates | Deletes 
------+-------+------------+---------+---------+---------
(0 rows)

--在pub_db数据库上创建名为mypub的发布者
pub_db=# CREATE PUBLICATION mypub;
CREATE PUBLICATION
pub_db=# 
pub_db=# \dRp
                    List of publications
 Name  |  Owner   | All tables | Inserts | Updates | Deletes 
-------+----------+------------+---------+---------+---------
 mypub | postgres | f          | t       | t       | t
(1 row)

--查看mypub发布的详细信息
pub_db=# \dRp+
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
(1 row)

创建订阅者

subscriber是逻辑复制的下游

--查看sub_db数据库下的订阅者
sub_db=# \dRs
        List of subscriptions
 Name | Owner | Enabled | Publication 
------+-------+---------+-------------
(0 rows)

--在sub_db数据库上创建名为mysub的订阅者
sub_db=# CREATE SUBSCRIPTION mysub CONNECTION 'dbname=pub_db host=l-test1 user=repuser password=repuser1234 port=6432' PUBLICATION mypub;
NOTICE:  created replication slot "mysub" on publisher
CREATE SUBSCRIPTION
sub_db=# 
sub_db=# 
sub_db=# \dRs
          List of subscriptions
 Name  |  Owner   | Enabled | Publication 
-------+----------+---------+-------------
 mysub | postgres | t       | {mypub}
(1 row)

--查看订阅者mysub的详细信息
sub_db=# \dRs+
                                                                List of subscriptions
 Name  |  Owner   | Enabled | Publication | Synchronous commit |                                      Conninfo                                       
-------+----------+---------+-------------+--------------------+-------------------------------------------------------------------------------------
 mysub | postgres | t       | {mypub}     | off                | dbname=pub_db host=l-test1 user=repuser password=repuser1234 port=6432
(1 row)

添加需要同步的表

发布者端

--创建表
pub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE
pub_db=# insert into logical_tb1(id,col1) select generate_series(1,2000),'tester';
INSERT 0 2000

--添加到发布者mypub
pub_db=# alter publication mypub add table logical_tb1;
ALTER PUBLICATION

--查看发布者的详细信息
pub_db=# \dRp+ mypub
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
Tables:
    "public.logical_tb1"

订阅者端

--创建相同的表
sub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE

--刷新一下订阅者
sub_db=# ALTER SUBSCRIPTION mysub REFRESH PUBLICATION;
ALTER SUBSCRIPTION

--查询数据是否已经同步过来
sub_db=# select count(*) from logical_tb1;
 count 
-------
  2000
(1 row)

约束条件表

对于没有任何约束条件的普通表实现逻辑同步很简单,直接将表添加到发布者即可,但是有约束条件的表如何实现逻辑同步呢?

条件约束

先给出结论:对于条件符合约束的数据在订阅端不影响同步,不符合条件约束的数据在订阅端会同步报错

--在sub_db上的同步表添加一个check 约束
sub_db=# alter table logical_tb1 add constraint col1_check check(col1 = 'test');
ALTER TABLE
sub_db=# \d logical_tb1
                     Table "public.logical_tb1"
 Column |           Type           | Collation | Nullable | Default 
--------+--------------------------+-----------+----------+---------
 id     | integer                  |           | not null | 
 col1   | character varying(8)     |           |          | 
 col2   | numeric(10,2)            |           |          | 
 col3   | json                     |           |          | 
 col4   | hstore                   |           |          | 
 col5   | timestamp with time zone |           |          | now()
 col6   | integer[]                |           |          | 
 col7   | ltree                    |           |          | 
Indexes:
    "logical_tb1_pkey" PRIMARY KEY, btree (id)
Check constraints:
    "col1_check" CHECK (col1::text = 'test'::text)

--在pub_db上的同步表插入两条测试数据
pub_db=# insert into logical_tb1 values(1,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1
pub_db=# insert into logical_tb1 values(2,'test2','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1

--检查sub_db上同步表的数据同步情况
sub_db=# select * from logical_tb1 ;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+------+--------+------------+-----------+-------------------------------+------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
(1 row)

--检查日志发现了报错:
ERROR:  new row for relation "logical_tb1" violates check constraint "col1_check"
DETAIL:  Failing row contains (2, test2, 999.99, {"ja":"1"}, "ha"=>"1", 2018-04-11 15:29:12.395614+08, {9}, null).

--删除sub_db上同步表的约束后,数据继续同步
sub_db=# alter table logical_tb1 drop CONSTRAINT col1_check;
ALTER TABLE
sub_db=# select * from logical_tb1 ;
 id | col1  |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+-------+--------+------------+-----------+-------------------------------+------+------
  1 | test  | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
  2 | test2 | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:29:12.395614+08 | {9}  | 
(2 rows)

外键约束

先给出结论:如果发布端没有外键约束条件,而订阅端有外键约束条件,同步数据不受订阅端外键约束条件影响

--订阅者端创建非同步表
sub_db=# create table logical_tb2(id int primary key,for_id int);
CREATE TABLE

--订阅者端同步表添加列col8并添加外键约束,引用logical_tb2的主键
sub_db=# alter table logical_tb1 add column col8 int references logical_tb2(id);
ALTER TABLE

--订阅端插入初始化数据
sub_db=# insert into logical_tb2 values(1,1),(2,2),(3,3);
INSERT 0 3

sub_db=# select * from logical_tb2;
 id | for_id 
----+--------
  1 |      1
  2 |      2
  3 |      3
(3 rows)

--发布者端插入违反外键约束的数据(发布者端没有外加约束)
pub_db=# insert into logical_tb1 values(5,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}','beijing.haidian',4);
INSERT 0 1

--观察订阅者端的数据,依然能同步进来,不受外键约束的影响
sub_db=# select * from logical_tb1;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 |      col7       | col8 
----+------+--------+------------+-----------+-------------------------------+------+-----------------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  |                 |     
  2 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:40:28.794865+08 | {9}  |                 |     
  3 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:39:15.537949+08 | {9}  | beijing.haidian |    1
  4 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:40:28.885829+08 | {9}  | beijing.haidian |    4
  5 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:51:50.167068+08 | {9}  | beijing.haidian |    4
(5 rows)
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
4月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
存储 关系型数据库 数据库
How to Optimize PostgreSQL Logical Replication
How to Optimize PostgreSQL Logical Replication
121 0
How to Optimize PostgreSQL Logical Replication
|
SQL 关系型数据库 Java
PostgreSQL增量订阅方案:利用Logical Decoding订阅增量
PostgreSQL增量订阅方案:利用Logical Decoding订阅增量
1117 0
PostgreSQL增量订阅方案:利用Logical Decoding订阅增量
|
SQL Web App开发 关系型数据库
|
关系型数据库 流计算 PostgreSQL
|
流计算 关系型数据库 网络协议