Pgpool-II的故障转移功能需要用到pgpool-recovery扩展,提供了pgpool_recovery、pgpool_remote_start、pgpool_pgctl、pgpool_switch_xlog等几个用C语言实现的自定义函数,用于辅助online recovery工作。
一个PostgreSQL扩展通常由一个控制文件(pgpool_recovery.control)、创建扩展时需要执行的SQL语句(pgpool_recovery--1.4.sql)、若干功能实现代码文件(通常用C语言开发)、Makefile文件组成。
pgpool_recovery.control
# pgpool-recovery extension comment = 'recovery functions for pgpool-II for V4.3' default_version = '1.4' module_pathname = '$libdir/pgpool-recovery' relocatable = true
这个文件定义扩展的基本信息,包括扩展的名称、说明、版本信息等,扩展创建成功后,在数据库里都能查询到对应的信息,见下图:
pgpool_recovery--1.4.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION\echo Use "CREATE EXTENSION pgpool_recovery" to load this file. \quit CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text,IN primary_host text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;CREATE FUNCTION pgpool_remote_start(IN remote_host text,IN remote_data_directory text)RETURNS boolAS'MODULE_PATHNAME','pgpool_remote_start'LANGUAGE C STRICT;CREATE FUNCTION pgpool_pgctl(IN action text,IN stop_mode text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_pgctl'LANGUAGE C STRICT;CREATE FUNCTION pgpool_switch_xlog(IN archive_dir text)RETURNS textAS'MODULE_PATHNAME','pgpool_switch_xlog'LANGUAGE C STRICT;
这个sql在创建扩展时自动执行,执行成功后会创建pgpool_recovery、pgpool_remote_start、pgpool_pgctl、pgpool_switch_xlog四个用户自定义函数。
pgpool_recovery
该函数通过重载实现了多个版本,Pgpool >= 4.3使用的是带有七个参数的版本;4.1和4.2使用六个参数的版本;4.0使用五参数版本;3.4到3.7使用四参数版本。
CREATE FUNCTION pgpool_recovery(IN script_name text,IN remote_host text,IN remote_data_directory text,IN primary_port text,IN remote_node integer,IN remote_port text,IN primary_host text)RETURNS boolAS'MODULE_PATHNAME','pgpool_recovery'LANGUAGE C STRICT;
参数说明如下:
script_name -- 需要执行的脚本名(对应pgpool.conf中recovery_1st_stage_command参数指定的脚本名)
remote_host -- 需要recovery的主机名或者IP地址
remote_data_directory -- 需要recovery的节点的PGDATA目录
primary_port -- 主节点的PostgreSQL端口号
remote_node -- 需要recovery的主机的节点号(对应pgpool etc目录下的pgpool_node_id文件中的内容)
remote_port -- 需要recovery的节点PostgreSQL的端口号
primary_host -- 主节点主机名或者IP地址
pgpool_pgctl
CREATE FUNCTION pgpool_pgctl(IN action text,IN stop_mode text)RETURNS boolAS'$libdir/pgpool-recovery','pgpool_pgctl'LANGUAGE C STRICT;
这个函数根据参数生成启停PostgreSQL的命令并执行。参数说明如下:
action -- 对于pgpool可能是start、stop、promote
stop_mode -- 对应pg_ctl的-m选项的参数,可能的选项有:smart、fast、immediate
调用相关脚本的逻辑在src/pcp_con/recovery.c中实现:
/** Call pgpool_recovery() function.** "main_backend" is either primary backend node (in streaming replication* mode) or main backend node (in other mode).*/staticvoidexec_recovery(PGconn*conn, BackendInfo*main_backend, BackendInfo*recovery_backend, charstage, intrecovery_node)
Pgpool-II的pcp_recovery_node工具会执行exec_recovery函数,此函数负责调度,根据recovery的进度执行recovery_1st_stage_command和recovery_2st_stage_command中指定的脚本。
recovery.c
函数的C语言实现,代码比较简单,注释也很详细,这里就不做说明了。
/* PostgreSQL 8.4 needs this for textout *//* PostgreSQL 9.3 or later needs this */PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(pgpool_recovery); PG_FUNCTION_INFO_V1(pgpool_remote_start); PG_FUNCTION_INFO_V1(pgpool_pgctl); PG_FUNCTION_INFO_V1(pgpool_switch_xlog); externDatumpgpool_recovery(PG_FUNCTION_ARGS); externDatumpgpool_remote_start(PG_FUNCTION_ARGS); externDatumpgpool_pgctl(PG_FUNCTION_ARGS); externDatumpgpool_switch_xlog(PG_FUNCTION_ARGS); staticcharrecovery_script[1024]; staticcharcommand_text[1024]; staticOidget_function_oid(constchar*funcname, constchar*argtype, constchar*nspname); char*Log_line_prefix=NULL; Datumpgpool_recovery(PG_FUNCTION_ARGS) { intr; char*script=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); char*remote_host=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); char*remote_data_directory=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(2)))); if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to use pgpool_recovery function")))); elog(ERROR, "must be superuser to use pgpool_recovery function"); if (PG_NARGS() >=7) /* Pgpool-II 4.3 or later */ { char*primary_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(3)))); intremote_node=PG_GETARG_INT32(4); char*remote_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(5)))); char*primary_host=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(6)))); snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d \"%s\" \"%s\"", DataDir, script, DataDir, remote_host, remote_data_directory, primary_port, remote_node, remote_port, primary_host); } elseif (PG_NARGS() >=6) /* Pgpool-II 4.1 or 4.2 */ { char*primary_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(3)))); intremote_node=PG_GETARG_INT32(4); char*remote_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(5)))); snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d \"%s\"", DataDir, script, DataDir, remote_host, remote_data_directory, primary_port, remote_node, remote_port); } elseif (PG_NARGS() >=5) /* Pgpool-II 4.0 */ { char*primary_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(3)))); intremote_node=PG_GETARG_INT32(4); snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d", DataDir, script, DataDir, remote_host, remote_data_directory, primary_port, remote_node); } elseif (PG_NARGS() >=4) /* Pgpool-II 3.4 - 3.7 */ { char*primary_port=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(3)))); snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\" \"%s\"", DataDir, script, DataDir, remote_host, remote_data_directory, primary_port); } else { snprintf(recovery_script, sizeof(recovery_script), "\"%s/%s\" \"%s\" \"%s\" \"%s\"", DataDir, script, DataDir, remote_host, remote_data_directory); } elog(DEBUG1, "recovery_script: %s", recovery_script); r=system(recovery_script); if (r!=0) { elog(ERROR, "pgpool_recovery failed"); } PG_RETURN_BOOL(true); } Datumpgpool_remote_start(PG_FUNCTION_ARGS) { intr; char*remote_host=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); char*remote_data_directory=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to use pgpool_remote_start function")))); elog(ERROR, "must be superuser to use pgpool_remote_start function"); snprintf(recovery_script, sizeof(recovery_script), "%s/%s %s %s", DataDir, REMOTE_START_FILE, remote_host, remote_data_directory); elog(DEBUG1, "recovery_script: %s", recovery_script); r=system(recovery_script); if (r!=0) { elog(ERROR, "pgpool_remote_start failed"); } PG_RETURN_BOOL(true); } Datumpgpool_pgctl(PG_FUNCTION_ARGS) { intr; char*action=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); char*stop_mode=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); char*pg_ctl; char*data_directory; if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to use pgpool_pgctl function")))); elog(ERROR, "must be superuser to use pgpool_pgctl function"); pg_ctl=GetConfigOptionByName("pgpool.pg_ctl", NULL, false); data_directory=GetConfigOptionByName("data_directory", NULL, false); pg_ctl=GetConfigOptionByName("pgpool.pg_ctl", NULL); data_directory=GetConfigOptionByName("data_directory", NULL); if (strcmp(stop_mode, "") !=0) { snprintf(command_text, sizeof(command_text), "%s %s -D %s -m %s 2>/dev/null 1>/dev/null < /dev/null &", pg_ctl, action, data_directory, stop_mode); } else { snprintf(command_text, sizeof(command_text), "%s %s -D %s 2>/dev/null 1>/dev/null < /dev/null &", pg_ctl, action, data_directory); } elog(DEBUG1, "command_text: %s", command_text); r=system(command_text); if (strcmp(action, "reload") ==0&&r!=0) { elog(ERROR, "pgpool_pgctl failed"); } PG_RETURN_BOOL(true); } /** pgpool_switch_log is the same as pg_switch_xlog except that* it wait till archiving is completed.* We call xlog functions with the oid to avoid a compile error* at old PostgreSQL.*/Datumpgpool_switch_xlog(PG_FUNCTION_ARGS) { char*archive_dir; char*filename; charpath[MAXPGPATH]; structstatfst; Datumlocation; text*filename_t; text*result; Oidswitch_xlog_oid; Oidxlogfile_name_oid; char*pg_xlogfile_name_arg_type="text"; /** The argument data type of PG's pg_xlogfile_name() function has been* changed from text to pg_lsn since PostgreSQL 9.4*/char*pg_xlogfile_name_arg_type="pg_lsn"; archive_dir=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); if (stat(archive_dir, &fst) <0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", archive_dir))); elog(ERROR, "could not stat file \"%s\"", archive_dir); switch_xlog_oid=get_function_oid("pg_switch_xlog", NULL, "pg_catalog"); xlogfile_name_oid=get_function_oid("pg_xlogfile_name", pg_xlogfile_name_arg_type, "pg_catalog"); if (!switch_xlog_oid||!xlogfile_name_oid) { /* probably PostgreSQL is 10 or greater */switch_xlog_oid=get_function_oid("pg_switch_wal", NULL, "pg_catalog"); xlogfile_name_oid=get_function_oid("pg_walfile_name", pg_xlogfile_name_arg_type, "pg_catalog"); if (!switch_xlog_oid||!xlogfile_name_oid) elog(ERROR, "cannot find xlog functions"); } location=OidFunctionCall1(switch_xlog_oid, PointerGetDatum(NULL)); filename_t=DatumGetTextP(OidFunctionCall1(xlogfile_name_oid, location)); filename=DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(filename_t))); snprintf(path, MAXPGPATH, "%s/%s", archive_dir, filename); elog(LOG, "pgpool_switch_xlog: waiting for \"%s\"", path); while (stat(path, &fst) !=0||fst.st_size==0||fst.st_size% (1024*1024) !=0) { CHECK_FOR_INTERRUPTS(); sleep(1); } result=DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(path))); PG_RETURN_TEXT_P(result); } staticOidget_function_oid(constchar*funcname, constchar*argtype, constchar*nspname) { Oidtypid; Oidnspid; Oidfuncid; Oidoids[1]; oidvector*oid_v; HeapTupletup; if (argtype) { typid=TypenameGetTypid(argtype); elog(DEBUG1, "get_function_oid: %s typid: %d", argtype, typid); oids[0] =typid; oid_v=buildoidvector(oids, 1); } else { oid_v=buildoidvector(NULL, 0); } nspid=LookupExplicitNamespace(nspname); /** LookupExplicitNamespace() of PostgreSQL 9.3 or later, has third* argument "missing_ok" which suppresses ERROR exception, but returns* invalid_oid. See include/catalog/namespace.h*/nspid=LookupExplicitNamespace(nspname, false); elog(DEBUG1, "get_function_oid: oid of \"%s\": %d", nspname, nspid); tup=SearchSysCache(PROCNAMEARGSNSP, PointerGetDatum(funcname), PointerGetDatum(oid_v), ObjectIdGetDatum(nspid), 0); if (HeapTupleIsValid(tup)) { Form_pg_procproctup= (Form_pg_proc) GETSTRUCT(tup); funcid=proctup->oid; funcid=HeapTupleGetOid(tup); elog(DEBUG1, "get_function_oid: oid of \"%s\": %d", funcname, funcid); ReleaseSysCache(tup); returnfuncid; } return0; }