PostgreSQL 9.5 pg_dump新特性 你是我的眼

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介:
为什么叫你是我的眼,因为你看到的世界,就是我的世界。

PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。
pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。
Allow pg_dump to share a snapshot taken by another session using --snapshot (Simon Riggs, Michael Paquier)
The remote snapshot must have been exported by pg_export_snapshot() or been defined when creating a logical replication slot. This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.

测试:
 
 

postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# select pg_export_snapshot();
 pg_export_snapshot 
--------------------
 0000072C-1
(1 row)

先不要断开这个事务。等备份启动后再关闭即可(不需要等待备份结束)。

 

pg95@db-172-16-3-150-> pg_dump --snapshot=0000072C-1

使用这个SNAPSHOT导出。

对于逻辑复制,我们需要使用逻辑流复制协议创建slot,然后开启备份。
例子:
 
 

vi pg_hba.conf
# replication privilege.
local   replication     postgres                                trust
host    replication     postgres        127.0.0.1/32            trust
host    replication     postgres        ::1/128                 trust
pg_ctl reload


在数据库端使用test_encoding记录逻辑变更到WAL中。
使用流复制协议连接数据库。
 
 

pg95@db-172-16-3-150-> psql 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres replication=database' 
psql (9.5devel)
Type "help" for help.
postgres=# CREATE_REPLICATION_SLOT ab12 LOGICAL "/opt/pgsql9.5/lib/test_decoding.so";
 slot_name | consistent_point | snapshot_name |           output_plugin            
-----------+------------------+---------------+------------------------------------
 ab12      | 7/77B59A00       | 00000736-1    | /opt/pgsql9.5/lib/test_decoding.so
(1 row)


使用这个SNAPSHOT导出。
 
 

pg95@db-172-16-3-150-> pg_dump --snapshot=00000736-1


另外我们可以使用pg_recvlogical从这个slot开始接收逻辑变更:
 
 

pg_recvlogical -S ab12 -d postgres -v  -f - --start
BEGIN 1849
table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:12
COMMIT 1849


有了dump(基础备份)+逻辑变更(SQL)+exec sql模块,就可以完成基于SQL的逻辑复制。

使用SQL函数也可以消费slot的变更。
 
 

-[ RECORD 9 ]+-----------------------------------
slot_name    | ab12
plugin       | /opt/pgsql9.5/lib/test_decoding.so
slot_type    | logical
datoid       | 13181
database     | postgres
active       | f
active_pid   | 
xmin         | 
catalog_xmin | 1850
restart_lsn  | 7/77B5A3B0
postgres=# select * from pg_replication_slots ;
postgres=# insert into t1 values (1);
INSERT 0 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('ab12', NULL, NULL, 'include-timestamp', '1', 'include-xids', '1');
  location  | xid  |                                                           data                                                 
          
------------+------+----------------------------------------------------------------------------------------------------------------
----------
 7/77B5ABB8 | 1852 | BEGIN 1852
 7/77B5ABB8 | 1852 | table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[b
igint]:15
 7/77B5AD98 | 1852 | COMMIT 1852 (at 2000-01-01 08:00:00+08)
(3 rows)


但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.

[其他]
1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。
例如:
 
 

2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"

2. 流复制协议详见

[参考]
5. test_decoding支持的options, contrib/test_decoding/test_decoding.c
include-xids
include-timestamp
force-binary
skip-empty-xacts
only-local
如下:
 
 

        foreach(option, ctx->output_plugin_options)
        {
                DefElem    *elem = lfirst(option);

                Assert(elem->arg == NULL || IsA(elem->arg, String));

                if (strcmp(elem->defname, "include-xids") == 0)
                {
                        /* if option does not provide a value, it means its value is true */
                        if (elem->arg == NULL)
                                data->include_xids = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_xids))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "include-timestamp") == 0)
                {
                        if (elem->arg == NULL)
                                data->include_timestamp = true;
                        else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "force-binary") == 0)
                {
                        bool            force_binary;

                        if (elem->arg == NULL)
                                continue;
                        else if (!parse_bool(strVal(elem->arg), &force_binary))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                        if (force_binary)
                                opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
                }
                else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
                {

                        if (elem->arg == NULL)
                                data->skip_empty_xacts = true;
                        else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else if (strcmp(elem->defname, "only-local") == 0)
                {

                        if (elem->arg == NULL)
                                data->only_local = true;
                        else if (!parse_bool(strVal(elem->arg), &data->only_local))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                 strVal(elem->arg), elem->defname)));
                }
                else
                {
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("option \"%s\" = \"%s\" is unknown",
                                                        elem->defname,
                                                        elem->arg ? strVal(elem->arg) : "(null)")));
                }


6. src/backend/replication/walsender.c
创建逻辑slot时,自动创建snapshot。
 
 

/*
 * Create a new replication slot.
 */
static void
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
......
        if (cmd->kind == REPLICATION_KIND_LOGICAL)
        {
......
                /*
                 * Export a plain (not of the snapbuild.c type) snapshot to the user
                 * that can be imported into another session.
                 */
                snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);

......


注意在执行replication命令时,会先释放snapshot,因此务必在这之前将这个snapshot 给pg_dump导入。
 
 

/*
 * Execute an incoming replication command.
 */
void
exec_replication_command(const char *cmd_string)
{
......
        /*
         * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
         * command arrives. Clean up the old stuff if there's anything.
         */
        SnapBuildClearExportedSnapshot();
......
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
存储 关系型数据库 数据库
深入了解 PostgreSQL:功能、特性和部署
PostgreSQL,通常简称为Postgres,是一款强大且开源的关系型数据库管理系统(RDBMS),它在数据存储和处理方面提供了广泛的功能和灵活性。本文将详细介绍 PostgreSQL 的功能、特性以及如何部署和使用它。
702 1
深入了解 PostgreSQL:功能、特性和部署
|
3月前
|
SQL 关系型数据库 数据库
PostgreSQL常用命令,启动连接,pg_dump导入导出
PostgreSQL常用命令,启动连接,pg_dump导入导出
|
关系型数据库 大数据 PostgreSQL
PostgreSQL16-新特性-并行聚合
PostgreSQL16-新特性-并行聚合
144 0
|
存储 关系型数据库 数据库
探索PostgreSQL 14新特性--SEARCH和CYCLE
探索PostgreSQL 14新特性--SEARCH和CYCLE
89 0
|
算法
PostgreSQL16中pg_dump的LZ4和ZSTD压缩
PostgreSQL16中pg_dump的LZ4和ZSTD压缩
200 0
|
缓存 监控 关系型数据库
[译]PostgreSQL16-新特性-新增IO统计视图:pg_stat_io
[译]PostgreSQL16-新特性-新增IO统计视图:pg_stat_io
237 0
|
存储 缓存 关系型数据库
PostgreSQL 14新特性--减少索引膨胀
PostgreSQL 14新特性--减少索引膨胀
477 0
|
SQL 机器学习/深度学习 存储
PostgreSQL逻辑备份pg_dump使用及其原理解析
PostgreSQL逻辑备份pg_dump使用及其原理解析
274 0
|
存储 SQL Oracle
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
AnalyticDB PostgreSQL 7.0 新增了存储过程功能的支持,让用户在使用ADB PG时能够更方便高效地开发业务,并能够更好地兼容Oracle等传统数仓的业务。
495 1
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
|
SQL 关系型数据库 Linux
知识分享之PostgreSQL——OIDS的特性与新版本去除SQL
之前一直使用的PostgreSQL 9.6系列版本,由于官方不再维护了,就准备换成最新稳定版本的,查看了一下官方版本说明,发现13系列版本是目前稳定性较好的版本,于是兴冲冲的更换了过来,但随之而来的就是一些新特性,其中就比如表中的OID字段,这个字段是对象标识符,之前能用于行标记,现在发现只有表才具有这个隐藏字段,行数据没有这个支持了,于是就需要将老版本的表进行关闭掉这个字段。下面我们就开始关闭之旅。
166 0
知识分享之PostgreSQL——OIDS的特性与新版本去除SQL

相关产品

  • 云原生数据库 PolarDB
  • 下一篇
    无影云桌面