PostgreSQL standby recover的源码分析 (walreceiver唤醒时机? 为什么standby crash后walreceiver不会立即被唤醒?)

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 Tair(兼容Redis),内存型 2GB
简介:

标签

PostgreSQL , 流复制 , stream replication , wal receiver , 唤醒时机 , 状态机


背景

前段时间有位网友提的问题,

当PostgreSQL数据库的standby节点crash后再启动,发现standby节点的wal receiver进程很久才启动并开始从主节点接收WAL。

这段时间是在等待standby节点恢复pg_xlog目录中已有的xlog日志。

pic

这是为什么呢?

数据库crash后从哪个WAL位置开始恢复

PostgreSQL在crash后,需要从最近的一个检查点开始恢复,因为在每次检查点(如果是standby可能称为restart CKPT)开始后,发生变更的块都会记录对应的FULL PAGE到WAL日志文件中,从检查点恢复,可以保证不会因为操作系统的partial write 导致数据块的不一致。

pic

对于standby节点也是一样的,启动时,从最近的一次检查点开始恢复,它可以选择CKPT或者RESTART CKPT开始恢复,选最近的即可。

在standby上也可以创建检查点(称为restart ckpt),后面会讲到。

这里只解释了一个问题,就是数据库CRASH后从哪里开始恢复,还有一个问题没搞明白?

为什么wal receiver进程很久才启动并开始从主节点接收WAL?

什么时候启动wal receiver进程

得先讲讲数据库有几种恢复来源:

/*  
 * Codes indicating where we got a WAL file from during recovery, or where  
 * to attempt to get one.  
 */  
typedef enum  
{  
        XLOG_FROM_ANY = 0,                      /* request to read WAL from any source */  
        XLOG_FROM_ARCHIVE,                      /* restored using restore_command */  
        XLOG_FROM_PG_XLOG,                      /* existing file in pg_xlog */  
        XLOG_FROM_STREAM                        /* streamed from master */  
} XLogSource;  

3种恢复来源,

restore_command@recovery.conf , $PGDATA/pg_xlog , 以及wal receiver 。

然后我们再分析一下,它如何选择恢复来源的?

1. 首先会找pg_xlog目录有没有需要的日志(包括TLI),如果有,会从日志目录中直接读取pg_xlog进行恢复,直到PG_XLOG目录中没有需要的文件,则会将source置为下一个可用的source。

2. 当SOURCE=stream时,并且wal receiver进程没有启动时,发信号启动它。

3. 如果stream失败,会重新扫描tli,然后等待wal_retrieve_retry_interval这个时间间隔,

循环往复。

pic

代码详见

/*  
 * Open the WAL segment containing WAL position 'RecPtr'.  
 *  
 * The segment can be fetched via restore_command, or via walreceiver having  
 * streamed the record, or it can already be present in pg_xlog. Checking  
 * pg_xlog is mainly for crash recovery, but it will be polled in standby mode  
 * too, in case someone copies a new segment directly to pg_xlog. That is not  
 * documented or recommended, though.  
 *  
 * If 'fetching_ckpt' is true, we're fetching a checkpoint record, and should  
 * prepare to read WAL starting from RedoStartLSN after this.  
 *  
 * 'RecPtr' might not point to the beginning of the record we're interested  
 * in, it might also point to the page or segment header. In that case,  
 * 'tliRecPtr' is the position of the WAL record we're interested in. It is  
 * used to decide which timeline to stream the requested WAL from.  
 *  
 * If the record is not immediately available, the function returns false  
 * if we're not in standby mode. In standby mode, waits for it to become  
 * available.  
 *  
 * When the requested record becomes available, the function opens the file  
 * containing it (if not open already), and returns true. When end of standby  
 * mode is triggered by the user, and there is no more WAL available, returns  
 * false.  
 */  
static bool  
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,  
                                                        bool fetching_ckpt, XLogRecPtr tliRecPtr)  
{  
        static TimestampTz last_fail_time = 0;  
        TimestampTz now;  

        /*-------  
         * Standby mode is implemented by a state machine:  
         *  
         * 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just  
         *        pg_xlog (XLOG_FROM_XLOG)  
         * 2. Check trigger file  
         * 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)  
         * 4. Rescan timelines  
         * 5. Sleep wal_retrieve_retry_interval milliseconds, and loop back to 1.  
         *  
         * Failure to read from the current source advances the state machine to  
         * the next state.  
         *  
         * 'currentSource' indicates the current state. There are no currentSource  
         * values for "check trigger", "rescan timelines", and "sleep" states,  
         * those actions are taken when reading from the previous source fails, as  
         * part of advancing to the next state.  
         *-------  
         */  
        if (!InArchiveRecovery)  
                currentSource = XLOG_FROM_PG_XLOG;  
        else if (currentSource == 0)  


                currentSource = XLOG_FROM_ARCHIVE;  

        for (;;)  
        {  
                int                     oldSource = currentSource;  

                // 切换source  
                if (lastSourceFailed)  
                {  
                        switch (currentSource)  
                        {  
                                case XLOG_FROM_ARCHIVE:  
                                case XLOG_FROM_PG_XLOG:  
                ......  

                                        /*  
                                         * If primary_conninfo is set, launch walreceiver to try  
                                         * to stream the missing WAL.  
                                         *  
                                         * If fetching_ckpt is TRUE, RecPtr points to the initial  
                                         * checkpoint location. In that case, we use RedoStartLSN  
                                         * as the streaming start position instead of RecPtr, so  
                                         * that when we later jump backwards to start redo at  
                                         * RedoStartLSN, we will have the logs streamed already.  
                                         */  
                                        if (PrimaryConnInfo)  
                                        {  
                                                XLogRecPtr      ptr;  
                                                TimeLineID      tli;  

                                                if (fetching_ckpt)  
                                                {  
                                                        ptr = RedoStartLSN;  
                                                        tli = ControlFile->checkPointCopy.ThisTimeLineID;  
                                                }  
                                                else  
                                                {  
                                                        ptr = tliRecPtr;  
                                                        tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);  

                                                        if (curFileTLI > 0 && tli < curFileTLI)  
                                                                elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",  
                                                                         (uint32) (ptr >> 32), (uint32) ptr,  
                                                                         tli, curFileTLI);  
                                                }  
                                                curFileTLI = tli;  
                                                // 请求 fork walreceiver 进程  
                        RequestXLogStreaming(tli, ptr, PrimaryConnInfo,  
                                                                                         PrimarySlotName);  
                                                receivedUpto = 0;  
                                        }  

....  

                // 根据source执行  
                /*  
                 * We've now handled possible failure. Try to read from the chosen  
                 * source.  
                 */  
                lastSourceFailed = false;  

                switch (currentSource)  
                {  
                        case XLOG_FROM_ARCHIVE:  
                        case XLOG_FROM_PG_XLOG:  
                                /* Close any old file we might have open. */  
                                if (readFile >= 0)  
                                {  
                                        close(readFile);  
                                        readFile = -1;  
                                }  
                                /* Reset curFileTLI if random fetch. */  
                                if (randAccess)  
                                        curFileTLI = 0;  

                                /*  
                                 * Try to restore the file from archive, or read an existing  
                                 * file from pg_xlog.  
                                 */  
                                readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,  
                                                 currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :  
                                                                                          currentSource);  
                                if (readFile >= 0)      // 表示打开文件成功,继续在这个source里读下一个文件 ,直到失败  
                                        return true;    /* success! */  

                                /*  
                                 * Nope, not found in archive or pg_xlog.  
                                 */  
                                lastSourceFailed = true;  
                                break;  

                        case XLOG_FROM_STREAM:  

整理一下启动wal receiver的流程如下

pic

相关代码

src/include/storage/pmsignal.h

/*  
 * Reasons for signaling the postmaster.  We can cope with simultaneous  
 * signals for different reasons.  If the same reason is signaled multiple  
 * times in quick succession, however, the postmaster is likely to observe  
 * only one notification of it.  This is okay for the present uses.  
 */  
typedef enum  
{  
        PMSIGNAL_RECOVERY_STARTED,      /* recovery has started */  
        PMSIGNAL_BEGIN_HOT_STANDBY, /* begin Hot Standby */  
        PMSIGNAL_WAKEN_ARCHIVER,        /* send a NOTIFY signal to xlog archiver */  
        PMSIGNAL_ROTATE_LOGFILE,        /* send SIGUSR1 to syslogger to rotate logfile */  
        PMSIGNAL_START_AUTOVAC_LAUNCHER,        /* start an autovacuum launcher */  
        PMSIGNAL_START_AUTOVAC_WORKER,          /* start an autovacuum worker */  
        PMSIGNAL_BACKGROUND_WORKER_CHANGE,      /* background worker state change */  
        PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */  
        PMSIGNAL_ADVANCE_STATE_MACHINE,         /* advance postmaster's state machine */  

        NUM_PMSIGNALS                           /* Must be last value of enum! */  
} PMSignalReason;  

src/backend/postmaster/postmaster.c

#define StartupDataBase()               StartChildProcess(StartupProcess)  
#define StartBackgroundWriter() StartChildProcess(BgWriterProcess)  
#define StartCheckpointer()             StartChildProcess(CheckpointerProcess)  
#define StartWalWriter()                StartChildProcess(WalWriterProcess)  
#define StartWalReceiver()              StartChildProcess(WalReceiverProcess)  


/*  
 * sigusr1_handler - handle signal conditions from child processes  
 */  
static void  
sigusr1_handler(SIGNAL_ARGS)  
{  
....  
        if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER) &&  
                WalReceiverPID == 0 &&  
                (pmState == PM_STARTUP || pmState == PM_RECOVERY ||  
                 pmState == PM_HOT_STANDBY || pmState == PM_WAIT_READONLY) &&  
                Shutdown == NoShutdown)  
        {  
                /* Startup Process wants us to start the walreceiver process. */  
                WalReceiverPID = StartWalReceiver();  
        }  
....  



/*  
 * StartChildProcess -- start an auxiliary process for the postmaster  
 *  
 * "type" determines what kind of child will be started.  All child types  
 * initially go to AuxiliaryProcessMain, which will handle common setup.  
 *  
 * Return value of StartChildProcess is subprocess' PID, or 0 if failed  
 * to start subprocess.  
 */  
static pid_t  
StartChildProcess(AuxProcType type)  
{  
...  
#ifdef EXEC_BACKEND  
        pid = postmaster_forkexec(ac, av);  
#else                                                   /* !EXEC_BACKEND */  
        pid = fork_process();  

        if (pid == 0)                           /* child */  
        {  
                InitPostmasterChild();  

                /* Close the postmaster's sockets */  
                ClosePostmasterPorts(false);  

                /* Release postmaster's working memory context */  
                MemoryContextSwitchTo(TopMemoryContext);  
                MemoryContextDelete(PostmasterContext);  
                PostmasterContext = NULL;  

                AuxiliaryProcessMain(ac, av);  
                ExitPostmaster(0);  
        }  

src/backend/replication/walreceiverfuncs.c

/*  
 * Request postmaster to start walreceiver.  
 *  
 * recptr indicates the position where streaming should begin, conninfo  
 * is a libpq connection string to use, and slotname is, optionally, the name  
 * of a replication slot to acquire.  
 */  
void  
RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,  
                                         const char *slotname)  
{  

...  
        if (walrcv->walRcvState == WALRCV_STOPPED)  
        {  
                launch = true;  
                walrcv->walRcvState = WALRCV_STARTING;  
        }  
        else  
                walrcv->walRcvState = WALRCV_RESTARTING;  
        walrcv->startTime = now;  
...  
        if (launch)  
                SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);  
        else  
                SetLatch(&walrcv->latch);  
...  

src/backend/access/transam/xlog.c

/*  
 * Codes indicating where we got a WAL file from during recovery, or where  
 * to attempt to get one.  
 */  
typedef enum  
{  
        XLOG_FROM_ANY = 0,                      /* request to read WAL from any source */  
        XLOG_FROM_ARCHIVE,                      /* restored using restore_command */  
        XLOG_FROM_PG_XLOG,                      /* existing file in pg_xlog */  
        XLOG_FROM_STREAM                        /* streamed from master */  
} XLogSource;  


/*  
 * Keeps track of which source we're currently reading from. This is  
 * different from readSource in that this is always set, even when we don't  
 * currently have a WAL file open. If lastSourceFailed is set, our last  
 * attempt to read from currentSource failed, and we should try another source  
 * next.  
 */  
static XLogSource currentSource = 0;    /* XLOG_FROM_* code */  



/*  
 * Open the WAL segment containing WAL position 'RecPtr'.  
 *  
 * The segment can be fetched via restore_command, or via walreceiver having  
 * streamed the record, or it can already be present in pg_xlog. Checking  
 * pg_xlog is mainly for crash recovery, but it will be polled in standby mode  
 * too, in case someone copies a new segment directly to pg_xlog. That is not  
 * documented or recommended, though.  
 *  
 * If 'fetching_ckpt' is true, we're fetching a checkpoint record, and should  
 * prepare to read WAL starting from RedoStartLSN after this.  
 *  
 * 'RecPtr' might not point to the beginning of the record we're interested  
 * in, it might also point to the page or segment header. In that case,  
 * 'tliRecPtr' is the position of the WAL record we're interested in. It is  
 * used to decide which timeline to stream the requested WAL from.  
 *  
 * If the record is not immediately available, the function returns false  
 * if we're not in standby mode. In standby mode, waits for it to become  
 * available.  
 *  
 * When the requested record becomes available, the function opens the file  
 * containing it (if not open already), and returns true. When end of standby  
 * mode is triggered by the user, and there is no more WAL available, returns  
 * false.  
 */  
static bool  
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,  
                                                        bool fetching_ckpt, XLogRecPtr tliRecPtr)  
{  
        static TimestampTz last_fail_time = 0;  
        TimestampTz now;  

        /*-------  
         * Standby mode is implemented by a state machine:  
         *  
         * 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just  
         *        pg_xlog (XLOG_FROM_XLOG)  
         * 2. Check trigger file  
         * 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)  
         * 4. Rescan timelines  
         * 5. Sleep wal_retrieve_retry_interval milliseconds, and loop back to 1.  
         *  
         * Failure to read from the current source advances the state machine to  
         * the next state.  
         *  
         * 'currentSource' indicates the current state. There are no currentSource  
         * values for "check trigger", "rescan timelines", and "sleep" states,  
         * those actions are taken when reading from the previous source fails, as  
         * part of advancing to the next state.  
         *-------  
         */  
        if (!InArchiveRecovery)  
                currentSource = XLOG_FROM_PG_XLOG;  
        else if (currentSource == 0)  


                currentSource = XLOG_FROM_ARCHIVE;  

        for (;;)  
        {  
                int                     oldSource = currentSource;  

                // 切换source  
                if (lastSourceFailed)  
                {  
                        switch (currentSource)  
                        {  
                                case XLOG_FROM_ARCHIVE:  
                                case XLOG_FROM_PG_XLOG:  
                ......  

                                        /*  
                                         * If primary_conninfo is set, launch walreceiver to try  
                                         * to stream the missing WAL.  
                                         *  
                                         * If fetching_ckpt is TRUE, RecPtr points to the initial  
                                         * checkpoint location. In that case, we use RedoStartLSN  
                                         * as the streaming start position instead of RecPtr, so  
                                         * that when we later jump backwards to start redo at  
                                         * RedoStartLSN, we will have the logs streamed already.  
                                         */  
                                        if (PrimaryConnInfo)  
                                        {  
                                                XLogRecPtr      ptr;  
                                                TimeLineID      tli;  

                                                if (fetching_ckpt)  
                                                {  
                                                        ptr = RedoStartLSN;  
                                                        tli = ControlFile->checkPointCopy.ThisTimeLineID;  
                                                }  
                                                else  
                                                {  
                                                        ptr = tliRecPtr;  
                                                        tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);  

                                                        if (curFileTLI > 0 && tli < curFileTLI)  
                                                                elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",  
                                                                         (uint32) (ptr >> 32), (uint32) ptr,  
                                                                         tli, curFileTLI);  
                                                }  
                                                curFileTLI = tli;  
                                                // 请求 fork walreceiver 进程  
                        RequestXLogStreaming(tli, ptr, PrimaryConnInfo,  
                                                                                         PrimarySlotName);  
                                                receivedUpto = 0;  
                                        }  

....  

                // 根据source执行  
                /*  
                 * We've now handled possible failure. Try to read from the chosen  
                 * source.  
                 */  
                lastSourceFailed = false;  

                switch (currentSource)  
                {  
                        case XLOG_FROM_ARCHIVE:  
                        case XLOG_FROM_PG_XLOG:  
                                /* Close any old file we might have open. */  
                                if (readFile >= 0)  
                                {  
                                        close(readFile);  
                                        readFile = -1;  
                                }  
                                /* Reset curFileTLI if random fetch. */  
                                if (randAccess)  
                                        curFileTLI = 0;  

                                /*  
                                 * Try to restore the file from archive, or read an existing  
                                 * file from pg_xlog.  
                                 */  
                                readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,  
                                                 currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :  
                                                                                          currentSource);  
                                if (readFile >= 0)  
                                        return true;    /* success! */  

                                /*  
                                 * Nope, not found in archive or pg_xlog.  
                                 */  
                                lastSourceFailed = true;  
                                break;  

                        case XLOG_FROM_STREAM:  

standby的restart CKPT

Standby创建的检查点称为restart ckpt, 目的是防止正常的停止STANDBY后,还要从正常的检查点位置开始恢复。

建立了restart CKPT后,standby重启时,从restart CKPT开始恢复即可。

pic

代码详见

src/backend/postmaster/checkpointer.c

bool        do_restartpoint;  

do_restartpoint = RecoveryInProgress();  

                        if (!do_restartpoint)  
            {  
                CreateCheckPoint(flags);  
                ckpt_performed = true;  
            }  
            else  
                ckpt_performed = CreateRestartPoint(flags);  

src/backend/access/transam/xlog.c

/*  
 * This must be called ONCE during postmaster or standalone-backend shutdown  
 */  
void  
ShutdownXLOG(int code, Datum arg)  
{  
        /* Don't be chatty in standalone mode */  
        ereport(IsPostmasterEnvironment ? LOG : NOTICE,  
                        (errmsg("shutting down")));  

        if (RecoveryInProgress())  
                CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);  
        else  
        {  
                /*  
                 * If archiving is enabled, rotate the last XLOG file so that all the  
                 * remaining records are archived (postmaster wakes up the archiver  
                 * process one more time at the end of shutdown). The checkpoint  
                 * record will go to the next XLOG file and won't be archived (yet).  
                 */  
                if (XLogArchivingActive() && XLogArchiveCommandSet())  
                        RequestXLogSwitch();  

                CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);  
        }  
        ShutdownCLOG();  
        ShutdownCommitTs();  
        ShutdownSUBTRANS();  
        ShutdownMultiXact();  
}  




/*  
 * Establish a restartpoint if possible.  
 *  
 * This is similar to CreateCheckPoint, but is used during WAL recovery  
 * to establish a point from which recovery can roll forward without  
 * replaying the entire recovery log.  
 *  
 * Returns true if a new restartpoint was established. We can only establish  
 * a restartpoint if we have replayed a safe checkpoint record since last  
 * restartpoint.  
 */  
bool  
CreateRestartPoint(int flags)  
{  
...  

问题与改进建议

了解了原理,我们来想想现在的机制会存在什么问题。

1. 如果备库恢复速度较主慢,接收到的1024MB日志,只恢复到了512MB,然后wal receiver进程突然挂了。

此时,standby会将恢复source切到pg_xlog或resotre_command,由于pg_xlog里面还有512MB没有恢复,那么会等这512MB恢复完后,才会发生source的切换,再次唤醒wal receiver。

2. standby crash,没有产生shutdown restart CKPT.

crash后重启,需要从最近的restart ckpt或者ckpt进行恢复,如果这之间有许多PG_XLOG,那么也需要恢复一段时间,从而wal receiver的唤醒时间也会被拖长。

带来的问题就是:主库和备库的sender wal位点差异会受到一定的影响。如果正好此时主库挂了,缺失的日志可能会比较多。

改进建议

1. 备库在接收xlog时,记录xlog接收到的位点信息,从而XLOG不需要等apply位点来获取状态。

2. 并行接收,不要等APPLY请求唤醒WAL RECEIVER,使用独立的进程receive。

但是可能引入另一个问题,比如备库就是APPLY较慢,导致没有APPLY的XLOG堆积在备库的pg_xlog目录。

3. 并行恢复,由于PostgreSQL是物理的备库,效率已经很高了,通常不需要并行恢复,首先要考虑的是备库的IOPS能力。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
存储 关系型数据库 PostgreSQL
Postgresql内核源码分析-heapam分析
Postgresql内核源码分析-heapam分析
182 1
|
关系型数据库 PostgreSQL
Postgresql的XLOG累积源码分析
title: PGSQL的XLOG生成和清理逻辑 date: 2018-12-01 08:00:00 categories: Postgresql 总结归纳XLOG清理逻辑 WAL归档 # 在自动的WAL检查点之间的日志文件段的最大数量checkpoint_segments = # 在自动WAL检查点之间的最长时间checkpoint_timeout = # 缓解io压力ch
1529 0
|
存储 关系型数据库 PostgreSQL
|
关系型数据库 数据库 数据安全/隐私保护
|
关系型数据库 分布式数据库 PolarDB
《阿里云产品手册2022-2023 版》——PolarDB for PostgreSQL
《阿里云产品手册2022-2023 版》——PolarDB for PostgreSQL
363 0