开发者社区> 德哥> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

PostgreSQL 逻辑订阅 - DDL 订阅 实现方法

简介:
+关注继续查看

标签

PostgreSQL , 逻辑订阅 , 逻辑复制 , DDL 复制 , udf , 触发器 , 事件触发器


背景

逻辑订阅是PostgreSQL 10内置的功能,通过逻辑订阅,可以实现部分数据的同步要求。例如可以做到行级、表级、库级的订阅。

《使用PostgreSQL逻辑订阅实现multi-master》

《PostgreSQL 逻辑订阅 - 给业务架构带来了什么希望?》

《PostgreSQL 10.0 preview 逻辑订阅 - 原理与最佳实践》

逻辑订阅实际上和MySQL binglog复制类似,但是DDL是不写表的,那么DDL如何复制呢?

pic

有两种方法,将DDL记录下来,写入表中,在订阅端对这个表创建触发器,在触发器内执行DDL即可。

需要被订阅的对象,建上对应的触发器即可。

注意:

使用这种方法,目标端建议schema, 表结构都与订阅端一样。如果schema不一样,那么在触发器中,设置一下search_path再执行对应的DDL。

event trigger方法

略,用法请参考PostgreSQL手册。

封装UDF的方法

本文重点介绍这个方法。

1、主库:创建存储DDL的目标表

create table ddl_record(  
  id serial8 primary key,   -- 主键  
  content text,             -- DDL内容  
  tbl_name name,            -- 表名  
  sub     boolean,          -- 是否需要发布此表, true 表示发布, false 表示不发布  
  pub_name name[],          -- 发布到哪些publication, 使用数组表示  
  crt_time timestamp        -- 时间  
);  

2、主库:创建测试表

create table t_test (  
  id serial primary key,  
  info text,  
  crt_time timestamp  
);  

3、主库:创建封装DDL的UDF

create or replace function exec_ddl(v_content text, v_tbl_name name default null, v_sub boolean default null, v_pub_name name[] default null) returns void as $$  
declare  
  pub_name name;  
begin  
    
  -- 检查DDL与tbl_name中对象名是否一致。初略检查。  
  if v_content !~* v_tbl_name then  
    raise exception 'you must specify correct table name with $1 and $2';  
  end if;  
    
  -- 执行DDL  
  execute v_content;  
    
  -- 插入订阅表  
  insert into ddl_record(content, tbl_name, sub, pub_name, crt_time) values (v_content, v_tbl_name, v_sub, v_pub_name, clock_timestamp());  
    
  -- 如果包含create table,并选择了发布,那么发布此表  
  if v_sub and lower(v_content) ~ 'create +table' then  
    foreach pub_name in array v_pub_name loop  
      execute format ('ALTER PUBLICATION %s add table %s', pub_name, v_tbl_name);  
    end loop;  
  end if;  
end;  
$$ language plpgsql strict;  

4、主库:创建发布

CREATE PUBLICATION pub1;  
  
ALTER PUBLICATION pub1 ADD TABLE ddl_record, t_test;  

5、备库:创建初次订阅

初次订阅,需要订阅ddl_RECORD和t_test

首先要创建TABLE

create table ddl_record(  
  id serial8 primary key,   -- 主键  
  content text,             -- DDL内容  
  tbl_name name,            -- 表名  
  sub     boolean,          -- 是否需要订阅此表  
  pub_name name[],          -- 发布到哪些publication  
  crt_time timestamp        -- 时间  
);  
  
create table t_test (  
  id serial primary key,  
  info text,  
  crt_time timestamp  
);  

订阅,指定发布端的连接地址,注意如果需要密码,也请输入。(具体的发布详细用法,可以参考本文末尾)

create subscription sub1 connection 'hostaddr=127.0.0.1 port=1999 user=postgres dbname=postgres' publication pub1;  

创建ddl_record触发器函数,用DBLINK来调用DDL(可能是BUG,后面能修复),确保127.0.0.1访问不需要密码。

不要使用这个触发器函数:

create or replace function tg_exec_ddl() returns trigger as $$  
declare  
  sub_name text := TG_ARGV[0];  
  v_port text;  
  conn text;  
begin  
  -- dblink  
  CREATE EXTENSION if not exists dblink;  
    
  show port into v_port;  
  conn := format('hostaddr=127.0.0.1 port=%s user=%s dbname=%s', v_port, current_user, current_database);  
  -- set search_path=xxx;  
  if NEW.sub then                        -- 仅复制UDF中设置为发布=true了的DDL  
    execute NEW.content;  
    perform pg_sleep(1);  
      
    if NEW.content ~* 'create +table' then  
      -- 刷新订阅,否则新增的发布表不会被订阅到  
      -- copy_data (boolean)  
      -- Specifies whether the existing data in the publications that are being subscribed   
      -- to should be copied once the replication starts. The default is true.  
      -- 新增的订阅表,是否需要copy数据。(老的不管)  
      execute format('ALTER SUBSCRIPTION %s REFRESH PUBLICATION with (copy_data=true)', sub_name) ;    
    end if;  
  end if;  
  --   
  return null;  
end;  
$$ language plpgsql strict;  

请使用这个触发器函数:

create or replace function tg_exec_ddl() returns trigger as $$  
declare  
  sub_name text := TG_ARGV[0];  
  v_port text;  
  conn text;  
begin  
  -- dblink  
  CREATE EXTENSION if not exists dblink;  
    
  show port into v_port;  
  conn := format('hostaddr=127.0.0.1 port=%s user=%s dbname=%s', v_port, current_user, current_database);  
  -- set search_path=xxx;  
  if NEW.sub then                        -- 仅复制UDF中设置为发布=true了的DDL  
    execute format('select dblink_exec(''%s'', ''%s'')', conn, NEW.content);  
    perform pg_sleep(1);  
      
    if NEW.content ~* 'create +table' then  
      -- 刷新订阅,否则新增的发布表不会被订阅到  
      -- copy_data (boolean)  
      -- Specifies whether the existing data in the publications that are being subscribed   
      -- to should be copied once the replication starts. The default is true.  
      -- 新增的订阅表,是否需要copy数据。(老的不管)  
      execute format('select dblink_exec(''%s'', ''ALTER SUBSCRIPTION %s REFRESH PUBLICATION with (copy_data=true)'')', conn, sub_name) ;    
    end if;  
  end if;  
  --   
  return null;  
end;  
$$ language plpgsql strict;  

创建ddl_record插入触发器,指定subscript的名称

create trigger tg_exec_ddl after insert on ddl_record for each row execute procedure tg_exec_ddl('sub1');  

设置允许replica执行触发器,一定要执行,否则订阅端不会触发。

alter table ddl_record enable always trigger tg_exec_ddl  ;  
  
  
  
  
DISABLE/ENABLE [ REPLICA | ALWAYS ] TRIGGER  
  
These forms configure the firing of trigger(s) belonging to the table.   
  
A disabled trigger is still known to the system,   
but is not executed when its triggering event occurs.   
  
For a deferred trigger, the enable status is checked when the event occurs,   
not when the trigger function is actually executed.   
  
One can disable or enable a single trigger specified by name,   
or all triggers on the table,   
or only user triggers (this option excludes internally generated constraint   
triggers such as those that are used to implement foreign key constraints   
or deferrable uniqueness and exclusion constraints).   
  
Disabling or enabling internally generated constraint triggers requires   
superuser privileges; it should be done with caution since of course   
the integrity of the constraint cannot be guaranteed if the triggers   
are not executed.   
  
The trigger firing mechanism is also affected by the configuration variable   
session_replication_role.   
  
Simply enabled triggers will fire when the replication role is “origin”   
(the default) or “local”.   
  
Triggers configured as ENABLE REPLICA will only fire if the session is in “replica” mode,   
and triggers configured as ENABLE ALWAYS will fire regardless of the current replication mode.  
  
This command acquires a SHARE ROW EXCLUSIVE lock.  

6、主库:使用UDF执行DDL,修改已有被订阅表的表结构

select exec_ddl(  
$_$  
alter table t_test add column c1 int  
$_$,  
't_test',  
true,  
array['pub1']  
);  

备库:订阅端也自动添加了这个字段

postgres=# \d t_test  
                                       Table "public.t_test"  
  Column  |            Type             | Collation | Nullable |              Default                 
----------+-----------------------------+-----------+----------+------------------------------------  
 id       | integer                     |           | not null | nextval('t_test_id_seq'::regclass)  
 info     | text                        |           |          |   
 crt_time | timestamp without time zone |           |          |   
 c1       | integer                     |           |          |   
Indexes:  
    "t_test_pkey" PRIMARY KEY, btree (id)  

7、主库:使用UDF执行DDL,新增一张表

select exec_ddl($_$  
create table t123 (id int primary key, info text, crt_time timestamp)  
$_$,  
't123',  
true,  
array['pub1']  
);  

备库:订阅端也自动创建了这张表

postgres=# \dt  
           List of relations  
 Schema |    Name    | Type  |  Owner     
--------+------------+-------+----------  
 public | ddl_record | table | postgres  
 public | t123       | table | postgres  
 public | t_test     | table | postgres  
 public | test       | table | postgres  
(4 rows)  

精细化

可以将以上过程再精细化一下,例如通过标记,设置为是否需要将此DDL在目标端执行,是否需要在目标端执行订阅。

如果有多个订阅端,可以按订阅端精细化管理。

最终实现在主库通过UDF执行DDL,订阅端自动识别:

是否需要执行DDL,是否需要将此次新增的对象执行订阅等。

以上功能都可以在这套方法中实现,无非就是加一些参数、ddl_record的字段以及目标端trigger逻辑修改等。

参考

《use event trigger function record user who alter table's SQL》

《PostgreSQL 事件触发器 - PostgreSQL 9.3 Event Trigger》

《PostgreSQL 事件触发器应用 - DDL审计记录》

《PostgreSQL Oracle 兼容性之 - 事件触发器实现类似Oracle的回收站功能》

《PostgreSQL 事件触发器 - DDL审计 , DDL逻辑复制 , 打造DDL统一管理入》

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
RDS数据库和ECS自建数据库主从复制,数据同步
设置阿里云RDS和ECS上自建的数据库达成主从分离。
17177 0
MSSQL-最佳实践-实例级别数据库上云RDS SQL Server
--- title: MSSQL-最佳实践-实例级别数据库上云RDS SQL Server author: 风移 --- # 摘要 到目前,我们完成了SQL Server备份还原专题系列八篇月报分享:三种常见的数据库备份、备份策略的制定、查找备份链、数据库的三种恢复模式与备份之间的关系、利用文件组实现冷热数据隔离备份方案、如何监控备份还原进度、阿里云RDS SQL自动化迁移上云的一种
1598 0
免费享受同城双可用区高可用容错能力!阿里云云数据库RDS新增可用区6月汇总(内含福利)
6月份,阿里云云数据库 MySQL 版,云数据库 PPAS 版,云数据库 SQL Server 版,云数据库 PostgreSQL 版均宣布新增可用区,用户在控制台上按需求创建实例,即可享受同城双可用区高可用容错能力。接下来小编将为大家详细列出新增可用区。
2729 0
MSSQL · 最佳实践 · 实例级别数据库上云RDS SQL Server
摘要 到目前,我们完成了SQL Server备份还原专题系列八篇月报分享:三种常见的数据库备份、备份策略的制定、查找备份链、数据库的三种恢复模式与备份之间的关系、利用文件组实现冷热数据隔离备份方案、如何监控备份还原进度、阿里云RDS SQL自动化迁移上云的一种解决方案以及上个月分享的RDS SDK实现数据库迁移上阿里云,本期我们分享如何将用户线下或者ECS上自建实例级别数据库一键迁移上阿里云RDS SQL Server。
1675 0
阿里云云数据库RDS秒级监控功能解锁,通宵加班找故障将成为过去式
每一个奋斗在前线的数据库管理员和运维人员似乎运气都不太好,这些人都绝对经历过的诡异事件就是:逢年过节必出故障,明明眼看着要休假了,又接到故障通知,只好通宵加班找问题。没问题的时候可能大家都不会想到你,一出问题就先拿运维试问,于是每逢佳节便出现拜数据库的戏谑图片。
2716 0
重磅干货免费下载!阿里云RDS团队论文被数据库顶会SIGMOD 2018收录
来自阿里云RDS团队的论文“**TcpRT: Instrument and Diagnostic Analysis System for Service Quality of Cloud Databases at Massive Scale in Real-time” (TcpRT:面向大规模海量云数据库的服务质量实时采集与诊断系统)**被数据库顶会SIGMOD 2018收录。
10381 0
参与 API 创新应用大赛,体验RDS CloudDBA数据库性能优化 API
阿里云的RDS数据库,有开发者所需要的一系列的功能,但很多功能很多开发者可能并没有使用过。这里,介绍一个RDS比较有用的功能:CloudDBA数据库性能优化 API。
12948 0
从运维的角度分析使用阿里云数据库RDS的必要性--你不应该在阿里云上使用自建的MySQL/SQL Server/Oracle/PostgreSQL数据库
开宗明义,你不应该在阿里云上使用自建的MySQL or SQL Server数据库,对了,还有Oracle or PostgreSQL数据库。 云数据库 RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务。
4454 0
【元气云妹】数据库RDS的连接方式
如何通过DMS和MySQL-Front客户端连接RDS实例
2164 0
+关注
德哥
公益是一辈子的事, I am digoal, just do it.
文章
问答
来源圈子
更多
让用户数据永远在线,让数据无缝的自由流动
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
PolarDB for PostgreSQL三节点功能介绍
立即下载
PostgreSQL复制原理及高可用集群
立即下载
PolarDB for PostgreSQL 源码与应用实战
立即下载