开发者社区> 德哥> 正文

PostgreSQL 逻辑订阅 - DDL 订阅 实现方法

简介:
+关注继续查看

标签

PostgreSQL , 逻辑订阅 , 逻辑复制 , DDL 复制 , udf , 触发器 , 事件触发器


背景

逻辑订阅是PostgreSQL 10内置的功能,通过逻辑订阅,可以实现部分数据的同步要求。例如可以做到行级、表级、库级的订阅。

《使用PostgreSQL逻辑订阅实现multi-master》

《PostgreSQL 逻辑订阅 - 给业务架构带来了什么希望?》

《PostgreSQL 10.0 preview 逻辑订阅 - 原理与最佳实践》

逻辑订阅实际上和MySQL binglog复制类似,但是DDL是不写表的,那么DDL如何复制呢?

pic

有两种方法,将DDL记录下来,写入表中,在订阅端对这个表创建触发器,在触发器内执行DDL即可。

需要被订阅的对象,建上对应的触发器即可。

注意:

使用这种方法,目标端建议schema, 表结构都与订阅端一样。如果schema不一样,那么在触发器中,设置一下search_path再执行对应的DDL。

event trigger方法

略,用法请参考PostgreSQL手册。

封装UDF的方法

本文重点介绍这个方法。

1、主库:创建存储DDL的目标表

create table ddl_record(  
  id serial8 primary key,   -- 主键  
  content text,             -- DDL内容  
  tbl_name name,            -- 表名  
  sub     boolean,          -- 是否需要发布此表, true 表示发布, false 表示不发布  
  pub_name name[],          -- 发布到哪些publication, 使用数组表示  
  crt_time timestamp        -- 时间  
);  

2、主库:创建测试表

create table t_test (  
  id serial primary key,  
  info text,  
  crt_time timestamp  
);  

3、主库:创建封装DDL的UDF

create or replace function exec_ddl(v_content text, v_tbl_name name default null, v_sub boolean default null, v_pub_name name[] default null) returns void as $$  
declare  
  pub_name name;  
begin  
    
  -- 检查DDL与tbl_name中对象名是否一致。初略检查。  
  if v_content !~* v_tbl_name then  
    raise exception 'you must specify correct table name with $1 and $2';  
  end if;  
    
  -- 执行DDL  
  execute v_content;  
    
  -- 插入订阅表  
  insert into ddl_record(content, tbl_name, sub, pub_name, crt_time) values (v_content, v_tbl_name, v_sub, v_pub_name, clock_timestamp());  
    
  -- 如果包含create table,并选择了发布,那么发布此表  
  if v_sub and lower(v_content) ~ 'create +table' then  
    foreach pub_name in array v_pub_name loop  
      execute format ('ALTER PUBLICATION %s add table %s', pub_name, v_tbl_name);  
    end loop;  
  end if;  
end;  
$$ language plpgsql strict;  

4、主库:创建发布

CREATE PUBLICATION pub1;  
  
ALTER PUBLICATION pub1 ADD TABLE ddl_record, t_test;  

5、备库:创建初次订阅

初次订阅,需要订阅ddl_RECORD和t_test

首先要创建TABLE

create table ddl_record(  
  id serial8 primary key,   -- 主键  
  content text,             -- DDL内容  
  tbl_name name,            -- 表名  
  sub     boolean,          -- 是否需要订阅此表  
  pub_name name[],          -- 发布到哪些publication  
  crt_time timestamp        -- 时间  
);  
  
create table t_test (  
  id serial primary key,  
  info text,  
  crt_time timestamp  
);  

订阅,指定发布端的连接地址,注意如果需要密码,也请输入。(具体的发布详细用法,可以参考本文末尾)

create subscription sub1 connection 'hostaddr=127.0.0.1 port=1999 user=postgres dbname=postgres' publication pub1;  

创建ddl_record触发器函数,用DBLINK来调用DDL(可能是BUG,后面能修复),确保127.0.0.1访问不需要密码。

不要使用这个触发器函数:

create or replace function tg_exec_ddl() returns trigger as $$  
declare  
  sub_name text := TG_ARGV[0];  
  v_port text;  
  conn text;  
begin  
  -- dblink  
  CREATE EXTENSION if not exists dblink;  
    
  show port into v_port;  
  conn := format('hostaddr=127.0.0.1 port=%s user=%s dbname=%s', v_port, current_user, current_database);  
  -- set search_path=xxx;  
  if NEW.sub then                        -- 仅复制UDF中设置为发布=true了的DDL  
    execute NEW.content;  
    perform pg_sleep(1);  
      
    if NEW.content ~* 'create +table' then  
      -- 刷新订阅,否则新增的发布表不会被订阅到  
      -- copy_data (boolean)  
      -- Specifies whether the existing data in the publications that are being subscribed   
      -- to should be copied once the replication starts. The default is true.  
      -- 新增的订阅表,是否需要copy数据。(老的不管)  
      execute format('ALTER SUBSCRIPTION %s REFRESH PUBLICATION with (copy_data=true)', sub_name) ;    
    end if;  
  end if;  
  --   
  return null;  
end;  
$$ language plpgsql strict;  

请使用这个触发器函数:

create or replace function tg_exec_ddl() returns trigger as $$  
declare  
  sub_name text := TG_ARGV[0];  
  v_port text;  
  conn text;  
begin  
  -- dblink  
  CREATE EXTENSION if not exists dblink;  
    
  show port into v_port;  
  conn := format('hostaddr=127.0.0.1 port=%s user=%s dbname=%s', v_port, current_user, current_database);  
  -- set search_path=xxx;  
  if NEW.sub then                        -- 仅复制UDF中设置为发布=true了的DDL  
    execute format('select dblink_exec(''%s'', ''%s'')', conn, NEW.content);  
    perform pg_sleep(1);  
      
    if NEW.content ~* 'create +table' then  
      -- 刷新订阅,否则新增的发布表不会被订阅到  
      -- copy_data (boolean)  
      -- Specifies whether the existing data in the publications that are being subscribed   
      -- to should be copied once the replication starts. The default is true.  
      -- 新增的订阅表,是否需要copy数据。(老的不管)  
      execute format('select dblink_exec(''%s'', ''ALTER SUBSCRIPTION %s REFRESH PUBLICATION with (copy_data=true)'')', conn, sub_name) ;    
    end if;  
  end if;  
  --   
  return null;  
end;  
$$ language plpgsql strict;  

创建ddl_record插入触发器,指定subscript的名称

create trigger tg_exec_ddl after insert on ddl_record for each row execute procedure tg_exec_ddl('sub1');  

设置允许replica执行触发器,一定要执行,否则订阅端不会触发。

alter table ddl_record enable always trigger tg_exec_ddl  ;  
  
  
  
  
DISABLE/ENABLE [ REPLICA | ALWAYS ] TRIGGER  
  
These forms configure the firing of trigger(s) belonging to the table.   
  
A disabled trigger is still known to the system,   
but is not executed when its triggering event occurs.   
  
For a deferred trigger, the enable status is checked when the event occurs,   
not when the trigger function is actually executed.   
  
One can disable or enable a single trigger specified by name,   
or all triggers on the table,   
or only user triggers (this option excludes internally generated constraint   
triggers such as those that are used to implement foreign key constraints   
or deferrable uniqueness and exclusion constraints).   
  
Disabling or enabling internally generated constraint triggers requires   
superuser privileges; it should be done with caution since of course   
the integrity of the constraint cannot be guaranteed if the triggers   
are not executed.   
  
The trigger firing mechanism is also affected by the configuration variable   
session_replication_role.   
  
Simply enabled triggers will fire when the replication role is “origin”   
(the default) or “local”.   
  
Triggers configured as ENABLE REPLICA will only fire if the session is in “replica” mode,   
and triggers configured as ENABLE ALWAYS will fire regardless of the current replication mode.  
  
This command acquires a SHARE ROW EXCLUSIVE lock.  

6、主库:使用UDF执行DDL,修改已有被订阅表的表结构

select exec_ddl(  
$_$  
alter table t_test add column c1 int  
$_$,  
't_test',  
true,  
array['pub1']  
);  

备库:订阅端也自动添加了这个字段

postgres=# \d t_test  
                                       Table "public.t_test"  
  Column  |            Type             | Collation | Nullable |              Default                 
----------+-----------------------------+-----------+----------+------------------------------------  
 id       | integer                     |           | not null | nextval('t_test_id_seq'::regclass)  
 info     | text                        |           |          |   
 crt_time | timestamp without time zone |           |          |   
 c1       | integer                     |           |          |   
Indexes:  
    "t_test_pkey" PRIMARY KEY, btree (id)  

7、主库:使用UDF执行DDL,新增一张表

select exec_ddl($_$  
create table t123 (id int primary key, info text, crt_time timestamp)  
$_$,  
't123',  
true,  
array['pub1']  
);  

备库:订阅端也自动创建了这张表

postgres=# \dt  
           List of relations  
 Schema |    Name    | Type  |  Owner     
--------+------------+-------+----------  
 public | ddl_record | table | postgres  
 public | t123       | table | postgres  
 public | t_test     | table | postgres  
 public | test       | table | postgres  
(4 rows)  

精细化

可以将以上过程再精细化一下,例如通过标记,设置为是否需要将此DDL在目标端执行,是否需要在目标端执行订阅。

如果有多个订阅端,可以按订阅端精细化管理。

最终实现在主库通过UDF执行DDL,订阅端自动识别:

是否需要执行DDL,是否需要将此次新增的对象执行订阅等。

以上功能都可以在这套方法中实现,无非就是加一些参数、ddl_record的字段以及目标端trigger逻辑修改等。

参考

《use event trigger function record user who alter table's SQL》

《PostgreSQL 事件触发器 - PostgreSQL 9.3 Event Trigger》

《PostgreSQL 事件触发器应用 - DDL审计记录》

《PostgreSQL Oracle 兼容性之 - 事件触发器实现类似Oracle的回收站功能》

《PostgreSQL 事件触发器 - DDL审计 , DDL逻辑复制 , 打造DDL统一管理入》

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
mysqldump常用于MySQL数据库逻辑备份
mysqldump常用于MySQL数据库逻辑备份。   1、各种用法说明      A. 最简单的用法: mysqldump -uroot -pPassword [database name] > [dump file]      上述命令将指定数据库备份到某dump文件(转储文件)中,比如: mysqldump -uroot -p123 test > test.dump      生成的test.dump文件中包含建表语句(生成数据库结构哦)和插入数据的insert语句。
1055 0
如何暂停sqlserver数据订阅服务
原文:如何暂停sqlserver数据订阅服务 从 Management Studio 启动和停止快照代理或日志读取器代理 在 Management Studio 中连接到发布服务器,然后展开服务器节点和“复制”文件夹。
1794 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18999 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
25240 0
使用PostgreSQL逻辑订阅实现multi-master
标签 PostgreSQL , multi master , 逻辑订阅 背景 很多业务要求多活,但是多活中最难搞定的实际上是数据库,大多数业务通过分流,例如将数据根据UID切分到不同的IDC,同一个UID的数据永远只会写到一个IDC中,然后通过数据复制技术,将对应的数据复制到其他的IDC。
4277 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
20698 0
+关注
德哥
公益是一辈子的事, I'm digoal, just do it.
2153
文章
245
问答
来源圈子
更多
阿里云数据库:帮用户承担一切数据库风险,给您何止是安心!支持关系型数据库:MySQL、SQL Server、PostgreSQL、PPAS(完美兼容Oracle)、自研PB级数据存储的分布式数据库Petadata、自研金融级云数据库OceanBase支持NoSQL数据库:MongoDB、Redis、Memcache更有褚霸、丁奇、德哥、彭立勋、玄惭、叶翔等顶尖数据库专家服务。
+ 订阅
相关文档: 云数据库 OceanBase 版 可信账本数据库 云原生关系型数据库 PolarDB PostgreSQL引擎
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载