pgpool-recovery扩展分析

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: Pgpool-II的故障转移功能需要用到pgpool-recovery扩展,提供了pgpool_recovery、pgpool_remote_start、pgpool_pgctl、pgpool_switch_xlog等几个用C语言实现的自定义函数,用于辅助online recovery工作。

Pgpool-II的故障转移功能需要用到pgpool-recovery扩展,提供了pgpool_recovery、pgpool_remote_start、pgpool_pgctl、pgpool_switch_xlog等几个用C语言实现的自定义函数,用于辅助online recovery工作。

image.png

一个PostgreSQL扩展通常由一个控制文件(pgpool_recovery.control)创建扩展时需要执行的SQL语句(pgpool_recovery--1.4.sql)若干功能实现代码文件(通常用C语言开发)Makefile文件组成。

pgpool_recovery.control

# pgpool-recovery extension
comment = 'recovery functions for pgpool-II for V4.3'
default_version = '1.4'
module_pathname = '$libdir/pgpool-recovery'
relocatable = true

这个文件定义扩展的基本信息,包括扩展的名称、说明、版本信息等,扩展创建成功后,在数据库里都能查询到对应的信息,见下图:

image.png

image.png

pgpool_recovery--1.4.sql

-- complain if script is sourced in psql, rather than via CREATE EXTENSION\echo Use "CREATE EXTENSION pgpool_recovery" to load this file. \quit
CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text,IN primary_host text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_remote_start(IN remote_host text,IN remote_data_directory text)RETURNS boolAS'MODULE_PATHNAME','pgpool_remote_start'LANGUAGE C STRICT;CREATE FUNCTION pgpool_pgctl(IN action text,IN stop_mode text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_pgctl'LANGUAGE C STRICT;CREATE FUNCTION pgpool_switch_xlog(IN archive_dir text)RETURNS textAS'MODULE_PATHNAME','pgpool_switch_xlog'LANGUAGE C STRICT;

这个sql在创建扩展时自动执行,执行成功后会创建pgpool_recovery、pgpool_remote_start、pgpool_pgctl、pgpool_switch_xlog四个用户自定义函数。

image.png

pgpool_recovery

该函数通过重载实现了多个版本,Pgpool >= 4.3使用的是带有七个参数的版本;4.1和4.2使用六个参数的版本;4.0使用五参数版本;3.4到3.7使用四参数版本。

CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text,IN primary_host text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;

参数说明如下:

script_name -- 需要执行的脚本名(对应pgpool.conf中recovery_1st_stage_command参数指定的脚本名)

remote_host -- 需要recovery的主机名或者IP地址

remote_data_directory -- 需要recovery的节点的PGDATA目录

primary_port -- 主节点的PostgreSQL端口号

remote_node -- 需要recovery的主机的节点号(对应pgpool etc目录下的pgpool_node_id文件中的内容)

remote_port -- 需要recovery的节点PostgreSQL的端口号

primary_host -- 主节点主机名或者IP地址


pgpool_pgctl

CREATE FUNCTION pgpool_pgctl(IN action text,IN stop_mode text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_pgctl'LANGUAGE C STRICT;

这个函数根据参数生成启停PostgreSQL的命令并执行。参数说明如下:

action -- 对于pgpool可能是start、stop、promote

stop_mode -- 对应pg_ctl的-m选项的参数,可能的选项有:smart、fast、immediate


调用相关脚本的逻辑在src/pcp_con/recovery.c中实现:

/** Call pgpool_recovery() function.** "main_backend" is either primary backend node (in streaming replication* mode) or main backend node (in other mode).*/staticvoidexec_recovery(PGconn*conn, BackendInfo*main_backend, BackendInfo*recovery_backend, charstage, intrecovery_node)

Pgpool-II的pcp_recovery_node工具会执行exec_recovery函数,此函数负责调度,根据recovery的进度执行recovery_1st_stage_command和recovery_2st_stage_command中指定的脚本。

recovery.c

函数的C语言实现,代码比较简单,注释也很详细,这里就不做说明了。

#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include "postgres.h"#include "fmgr.h"#include "miscadmin.h"#include "executor/spi.h"#include "funcapi.h"#include "catalog/namespace.h"#include "catalog/pg_proc.h"#include "utils/syscache.h"#include "utils/builtins.h"             /* PostgreSQL 8.4 needs this for textout */#include "utils/guc.h"#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 90300)#include "access/htup_details.h"        /* PostgreSQL 9.3 or later needs this */#endif#define REMOTE_START_FILE "pgpool_remote_start"#include <stdlib.h>#ifdef PG_MODULE_MAGICPG_MODULE_MAGIC;
#endifPG_FUNCTION_INFO_V1(pgpool_recovery);
PG_FUNCTION_INFO_V1(pgpool_remote_start);
PG_FUNCTION_INFO_V1(pgpool_pgctl);
PG_FUNCTION_INFO_V1(pgpool_switch_xlog);
externDatumpgpool_recovery(PG_FUNCTION_ARGS);
externDatumpgpool_remote_start(PG_FUNCTION_ARGS);
externDatumpgpool_pgctl(PG_FUNCTION_ARGS);
externDatumpgpool_switch_xlog(PG_FUNCTION_ARGS);
staticcharrecovery_script[1024];
staticcharcommand_text[1024];
staticOidget_function_oid(constchar*funcname, constchar*argtype, constchar*nspname);
char*Log_line_prefix=NULL;
Datumpgpool_recovery(PG_FUNCTION_ARGS)
{
intr;
char*script=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(0))));
char*remote_host=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(1))));
char*remote_data_directory=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(2))));
if (!superuser())
#ifdef ERRCODE_INSUFFICIENT_PRIVILEGEereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to use pgpool_recovery function"))));
#elseelog(ERROR, "must be superuser to use pgpool_recovery function");
#endifif (PG_NARGS() >=7)            /* Pgpool-II 4.3 or later */        {
char*primary_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(3))));
intremote_node=PG_GETARG_INT32(4);
char*remote_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(5))));
char*primary_host=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(6))));
snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d \"%s\" \"%s\"",
DataDir, script, DataDir, remote_host,
remote_data_directory, primary_port, remote_node, remote_port, primary_host);
        }
elseif (PG_NARGS() >=6)               /* Pgpool-II 4.1 or 4.2 */        {
char*primary_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(3))));
intremote_node=PG_GETARG_INT32(4);
char*remote_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(5))));
snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d \"%s\"",
DataDir, script, DataDir, remote_host,
remote_data_directory, primary_port, remote_node, remote_port);
        }
elseif (PG_NARGS() >=5)               /* Pgpool-II 4.0 */        {
char*primary_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(3))));
intremote_node=PG_GETARG_INT32(4);
snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d",
DataDir, script, DataDir, remote_host,
remote_data_directory, primary_port, remote_node);
        }
elseif (PG_NARGS() >=4)       /* Pgpool-II 3.4 - 3.7 */        {
char*primary_port=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(3))));
snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\"",
DataDir, script, DataDir, remote_host,
remote_data_directory, primary_port);
        }
else        {
snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\"",
DataDir, script, DataDir, remote_host,
remote_data_directory);
        }
elog(DEBUG1, "recovery_script: %s", recovery_script);
r=system(recovery_script);
if (r!=0)
        {
elog(ERROR, "pgpool_recovery failed");
        }
PG_RETURN_BOOL(true);
}
Datumpgpool_remote_start(PG_FUNCTION_ARGS)
{
intr;
char*remote_host=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(0))));
char*remote_data_directory=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(1))));
if (!superuser())
#ifdef ERRCODE_INSUFFICIENT_PRIVILEGEereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to use pgpool_remote_start function"))));
#elseelog(ERROR, "must be superuser to use pgpool_remote_start function");
#endifsnprintf(recovery_script, sizeof(recovery_script),
"%s/%s %s %s", DataDir, REMOTE_START_FILE,
remote_host, remote_data_directory);
elog(DEBUG1, "recovery_script: %s", recovery_script);
r=system(recovery_script);
if (r!=0)
        {
elog(ERROR, "pgpool_remote_start failed");
        }
PG_RETURN_BOOL(true);
}
Datumpgpool_pgctl(PG_FUNCTION_ARGS)
{
intr;
char*action=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(0))));
char*stop_mode=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(1))));
char*pg_ctl;
char*data_directory;
if (!superuser())
#ifdef ERRCODE_INSUFFICIENT_PRIVILEGEereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to use pgpool_pgctl function"))));
#elseelog(ERROR, "must be superuser to use pgpool_pgctl function");
#endif#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 90600)pg_ctl=GetConfigOptionByName("pgpool.pg_ctl", NULL, false);
data_directory=GetConfigOptionByName("data_directory", NULL, false);
#elsepg_ctl=GetConfigOptionByName("pgpool.pg_ctl", NULL);
data_directory=GetConfigOptionByName("data_directory", NULL);
#endifif (strcmp(stop_mode, "") !=0)
        {
snprintf(command_text, sizeof(command_text),
"%s %s -D %s -m %s 2>/dev/null 1>/dev/null < /dev/null &",
pg_ctl, action, data_directory, stop_mode);
        }
else        {
snprintf(command_text, sizeof(command_text),
"%s %s -D %s 2>/dev/null 1>/dev/null < /dev/null &",
pg_ctl, action, data_directory);
        }
elog(DEBUG1, "command_text: %s", command_text);
r=system(command_text);
if (strcmp(action, "reload") ==0&&r!=0)
        {
elog(ERROR, "pgpool_pgctl failed");
        }
PG_RETURN_BOOL(true);
}
/** pgpool_switch_log is the same as pg_switch_xlog except that* it wait till archiving is completed.* We call xlog functions with the oid to avoid a compile error* at old PostgreSQL.*/Datumpgpool_switch_xlog(PG_FUNCTION_ARGS)
{
char*archive_dir;
char*filename;
charpath[MAXPGPATH];
structstatfst;
Datumlocation;
text*filename_t;
text*result;
Oidswitch_xlog_oid;
Oidxlogfile_name_oid;
#if !defined(PG_VERSION_NUM) || (PG_VERSION_NUM < 90400)char*pg_xlogfile_name_arg_type="text";
#else/** The argument data type of PG's pg_xlogfile_name() function has been* changed from text to pg_lsn since PostgreSQL 9.4*/char*pg_xlogfile_name_arg_type="pg_lsn";
#endifarchive_dir=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(PG_GETARG_TEXT_P(0))));
if (stat(archive_dir, &fst) <0)
#ifdef ERRCODE_INSUFFICIENT_PRIVILEGEereport(ERROR,
                                (errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", archive_dir)));
#elseelog(ERROR, "could not stat file \"%s\"", archive_dir);
#endifswitch_xlog_oid=get_function_oid("pg_switch_xlog", NULL, "pg_catalog");
xlogfile_name_oid=get_function_oid("pg_xlogfile_name", pg_xlogfile_name_arg_type, "pg_catalog");
if (!switch_xlog_oid||!xlogfile_name_oid)
        {
/* probably PostgreSQL is 10 or greater */switch_xlog_oid=get_function_oid("pg_switch_wal", NULL, "pg_catalog");
xlogfile_name_oid=get_function_oid("pg_walfile_name", pg_xlogfile_name_arg_type, "pg_catalog");
if (!switch_xlog_oid||!xlogfile_name_oid)
elog(ERROR, "cannot find xlog functions");
        }
location=OidFunctionCall1(switch_xlog_oid, PointerGetDatum(NULL));
filename_t=DatumGetTextP(OidFunctionCall1(xlogfile_name_oid, location));
filename=DatumGetCString(DirectFunctionCall1(textout,
PointerGetDatum(filename_t)));
snprintf(path, MAXPGPATH, "%s/%s", archive_dir, filename);
elog(LOG, "pgpool_switch_xlog: waiting for \"%s\"", path);
while (stat(path, &fst) !=0||fst.st_size==0||fst.st_size% (1024*1024) !=0)
        {
CHECK_FOR_INTERRUPTS();
sleep(1);
        }
result=DatumGetTextP(DirectFunctionCall1(textin,
CStringGetDatum(path)));
PG_RETURN_TEXT_P(result);
}
staticOidget_function_oid(constchar*funcname, constchar*argtype, constchar*nspname)
{
#ifndef PROCNAMENSPOidtypid;
Oidnspid;
Oidfuncid;
Oidoids[1];
oidvector*oid_v;
HeapTupletup;
if (argtype)
        {
typid=TypenameGetTypid(argtype);
elog(DEBUG1, "get_function_oid: %s typid: %d", argtype, typid);
oids[0] =typid;
oid_v=buildoidvector(oids, 1);
        }
else        {
oid_v=buildoidvector(NULL, 0);
        }
#if !defined(PG_VERSION_NUM) || (PG_VERSION_NUM < 90300)nspid=LookupExplicitNamespace(nspname);
#else/** LookupExplicitNamespace() of PostgreSQL 9.3 or later, has third* argument "missing_ok" which suppresses ERROR exception, but returns* invalid_oid. See include/catalog/namespace.h*/nspid=LookupExplicitNamespace(nspname, false);
#endifelog(DEBUG1, "get_function_oid: oid of \"%s\": %d", nspname, nspid);
tup=SearchSysCache(PROCNAMEARGSNSP,
PointerGetDatum(funcname),
PointerGetDatum(oid_v),
ObjectIdGetDatum(nspid),
0);
if (HeapTupleIsValid(tup))
        {
#if defined(PG_VERSION_NUM) && (PG_VERSION_NUM >= 120000)Form_pg_procproctup= (Form_pg_proc) GETSTRUCT(tup);
funcid=proctup->oid;
#elsefuncid=HeapTupleGetOid(tup);
#endifelog(DEBUG1, "get_function_oid: oid of \"%s\": %d", funcname, funcid);
ReleaseSysCache(tup);
returnfuncid;
        }
#endifreturn0;
}
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
负载均衡
Pgpool-II实现高可用+读写分离+负载均衡(七)---- recovery_1st_stage分析
recovery_1st_stage是Pgpool online recovery的第一阶段,位于PG_DATA目录下,主要功能就是使用pg_basebackup恢复(recovery)从节点。
LXJ
|
关系型数据库 Shell PostgreSQL
PostgreSQL recovery.conf恢复配置
PostgreSQL recovery.conf恢复配置
LXJ
533 0
|
SQL Oracle 关系型数据库
xDB Replication Server - PostgreSQL, Oracle, SQL Server, PPAS 全量、增量(redo log based, or trigger based)同步(支持single-master, mult-master同步, 支持DDL)
xDB Replication Server - PostgreSQL, Oracle, SQL Server, PPAS 全量、增量(redo log based, or trigger based)同步(支持single-master, mult-master同步, 支持DDL)
928 0
|
负载均衡 网络协议 Oracle
Oracle 11g R2 RAC高可用连接特性 – SCAN详解1
<p style="line-height:1.5em; font-family:'Lucida Grande','Lucida Sans Unicode',Verdana,Arial,sans-serif; font-size:1em; margin-top:1.2em; margin-bottom:1.2em"> 昨天帮朋友解决11g RAC SCAN问题,当时为这朋友简单解答了一些
2256 0
|
Oracle 关系型数据库 测试技术
Oracle 数据库重放(Database Replay)功能演示
https://www.cnblogs.com/jyzhao/p/5072800.html 我们可以捕获生产环境的工作量,在测试环境上重放,从而在不影响生产环境的前提下做一些改动测试。捕获:需要Oracle版本为10.2.0.4或更高.重放:需要Oracle版本为11g Release 1或更新. 本文环境:RHEL6.4 + Oracle 11.2.0.4下面介绍一下执行Database Replay的Workflow。
1320 0
|
Oracle 关系型数据库 Shell