PostgreSQL 9.5 pg_dump新特性 你是我的眼

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:
为什么叫你是我的眼,因为你看到的世界,就是我的世界。

PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。
pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。
Allow pg_dump to share a snapshot taken by another session using --snapshot (Simon Riggs, Michael Paquier)
The remote snapshot must have been exported by pg_export_snapshot() or been defined when creating a logical replication slot. This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.

测试:
 
 

postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# select pg_export_snapshot();
 pg_export_snapshot 
--------------------
 0000072C-1
(1 row)

先不要断开这个事务。等备份启动后再关闭即可(不需要等待备份结束)。

 

pg95@db-172-16-3-150-> pg_dump --snapshot=0000072C-1

使用这个SNAPSHOT导出。

对于逻辑复制,我们需要使用逻辑流复制协议创建slot,然后开启备份。
例子:
 
 

vi pg_hba.conf
# replication privilege.
local   replication     postgres                                trust
host    replication     postgres        127.0.0.1/32            trust
host    replication     postgres        ::1/128                 trust
pg_ctl reload


在数据库端使用test_encoding记录逻辑变更到WAL中。
使用流复制协议连接数据库。
 
 

pg95@db-172-16-3-150-> psql 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres replication=database' 
psql (9.5devel)
Type "help" for help.
postgres=# CREATE_REPLICATION_SLOT ab12 LOGICAL "/opt/pgsql9.5/lib/test_decoding.so";
 slot_name | consistent_point | snapshot_name |           output_plugin            
-----------+------------------+---------------+------------------------------------
 ab12      | 7/77B59A00       | 00000736-1    | /opt/pgsql9.5/lib/test_decoding.so
(1 row)


使用这个SNAPSHOT导出。
 
 

pg95@db-172-16-3-150-> pg_dump --snapshot=00000736-1


另外我们可以使用pg_recvlogical从这个slot开始接收逻辑变更:
 
 

pg_recvlogical -S ab12 -d postgres -v  -f - --start
BEGIN 1849
table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:12
COMMIT 1849


有了dump(基础备份)+逻辑变更(SQL)+exec sql模块,就可以完成基于SQL的逻辑复制。

使用SQL函数也可以消费slot的变更。
 
 

-[ RECORD 9 ]+-----------------------------------
slot_name    | ab12
plugin       | /opt/pgsql9.5/lib/test_decoding.so
slot_type    | logical
datoid       | 13181
database     | postgres
active       | f
active_pid   | 
xmin         | 
catalog_xmin | 1850
restart_lsn  | 7/77B5A3B0
postgres=# select * from pg_replication_slots ;
postgres=# insert into t1 values (1);
INSERT 0 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('ab12', NULL, NULL, 'include-timestamp', '1', 'include-xids', '1');
  location  | xid  |                                                           data                                                 
          
------------+------+----------------------------------------------------------------------------------------------------------------
----------
 7/77B5ABB8 | 1852 | BEGIN 1852
 7/77B5ABB8 | 1852 | table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[b
igint]:15
 7/77B5AD98 | 1852 | COMMIT 1852 (at 2000-01-01 08:00:00+08)
(3 rows)


但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.

[其他]
1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。
例如:
 
 

2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"

2. 流复制协议详见

[参考]
5. test_decoding支持的options, contrib/test_decoding/test_decoding.c
include-xids
include-timestamp
force-binary
skip-empty-xacts
only-local
如下:
 
 

        foreach(option, ctx->output_plugin_options)
        {
                DefElem    *elem = lfirst(option);

                Assert(elem->arg == NULL || IsA(elem->arg, String));

                if (strcmp(elem->defname, "include-xids") == 0)
                {
                        /* if option does not provide a value, it means its value is true */
                        if (elem->arg == NULL)
                                data->include_xids = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_xids))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "include-timestamp") == 0)
                {
                        if (elem->arg == NULL)
                                data->include_timestamp = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "force-binary") == 0)
                {
                        bool            force_binary;

                        if (elem->arg == NULL)
                                continue;
                        else if (!parse_bool(strVal(elem->arg), &force_binary))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                        if (force_binary)
                                opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
                }
                else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
                {

                        if (elem->arg == NULL)
                                data->skip_empty_xacts = true;
                        else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "only-local") == 0)
                {

                        if (elem->arg == NULL)
                                data->only_local = true;
                        else if (!parse_bool(strVal(elem->arg), &data->only_local))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else
                {
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("option \"%s\" = \"%s\" is unknown",
                                                        elem->defname,
                                                        elem->arg ? strVal(elem->arg) : "(null)")));
                }


6. src/backend/replication/walsender.c
创建逻辑slot时,自动创建snapshot。
 
 

/*
 * Create a new replication slot.
 */
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
......
        if (cmd->kind == REPLICATION_KIND_LOGICAL)
        {
......
                /*
                 * Export a plain (not of the snapbuild.c type) snapshot to the user
                 * that can be imported into another session.
                 */
                snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);

......


注意在执行replication命令时,会先释放snapshot,因此务必在这之前将这个snapshot 给pg_dump导入。
 
 

/*
 * Execute an incoming replication command.
 */
void
exec_replication_command(const char *cmd_string)
{
......
        /*
         * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
         * command arrives. Clean up the old stuff if there's anything.
         */
        SnapBuildClearExportedSnapshot();
......
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
SQL 关系型数据库 OLAP
|
10月前
|
存储 关系型数据库 数据库
探索PostgreSQL 14新特性--SEARCH和CYCLE
探索PostgreSQL 14新特性--SEARCH和CYCLE
51 0
|
10月前
|
算法
PostgreSQL16中pg_dump的LZ4和ZSTD压缩
PostgreSQL16中pg_dump的LZ4和ZSTD压缩
108 0
|
10月前
|
存储 缓存 关系型数据库
PostgreSQL 14新特性--减少索引膨胀
PostgreSQL 14新特性--减少索引膨胀
398 0
|
10月前
|
SQL 机器学习/深度学习 存储
PostgreSQL逻辑备份pg_dump使用及其原理解析
PostgreSQL逻辑备份pg_dump使用及其原理解析
182 0
|
SQL 存储 缓存
从研发角度深入了解RDS AliSQL内核2020新特性
内容简要: 一、关于内核 二、内核特性详解 一、 关于内核 (一)回归内核
从研发角度深入了解RDS AliSQL内核2020新特性
|
SQL 存储 缓存
从研发角度深入了解RDS AliSQL内核2020新特性 ——楼方鑫(黄忠)
从研发角度深入了解RDS AliSQL内核2020新特性 ——楼方鑫(黄忠)
从研发角度深入了解RDS AliSQL内核2020新特性    ——楼方鑫(黄忠)
|
SQL AliSQL Cloud Native
RDS发布会解读 | AliSQL内核新特性
AliSQL在2020年做了不少事情,有必要总结分享一下,以便让大家更好地知道有哪些特性,可以在哪些业务场景中使用到,也是为了在2021年更好地向前发展。
438 0
RDS发布会解读 | AliSQL内核新特性
|
SQL AliSQL Cloud Native
RDS发布会解读| AliSQL内核新特性
AliSQL在2020年做了不少事情,有必要总结分享一下,以便让大家更好地知道有哪些特性,可以在哪些业务场景中使用到,也是为了在2021年更好的向前发展。在年初时计划的一些企业级功能基本上都实现了,并且在过程中特别强调了功能的场景通用性,不再是从某个行业某个特定业务或应用场景设计(比如电商秒杀),而是从云上众多用户的不同场景出发,并且不需要用户应用或SQL改造配合(直接一个开关就可以开启的),还要求在RDS 56/57/80三个主流版本上都有同样的体验,从云场景而生并为云场景服务的技术,都是云原生技术。这一目标角度的调整的确是给自己加了不少难度,但研发让所有云上用户都能轻松受益享受技术红利的新
2464 0
RDS发布会解读| AliSQL内核新特性
|
SQL 存储 Oracle
PostgreSQL Oracle 兼容性 - Oracle 19c 新特性在PostgreSQL中的使用
PostgreSQL Oracle 兼容性 - Oracle 19c 新特性在PostgreSQL中的使用
2804 0

相关产品

  • 云原生数据库 PolarDB