关于PG逻辑订阅判断数据是否同步的方法-阿里云开发者社区

开发者社区> 数据库> 正文

关于PG逻辑订阅判断数据是否同步的方法

简介: 关于PG逻辑订阅判断数据是否同步的方法

PG逻辑订阅过程中,怎么判断订阅端已经同步到哪儿了?

考虑过2种方案,哪个更合适

  • 订阅端的pg_stat_subscriptionlatest_end_lsn
  • 发布端的pg_stat_replication中的replay_lsn

1. 关于pg_stat_subscription中的latest_end_lsn

pg_stat_subscription中的received_lsnlatest_end_lsn比较像,它们的区别如下

  • received_lsn:最后一次接收到的预写日志位置
  • latest_end_lsn:报告给原始WAL发送程序的最后的预写日志位置

1.1 pg_stat_subscriptionlatest_end_lsn的来源

来源是全局数组LogicalRepCtx->workers[]

select * from pg_stat_subscription
  pg_stat_get_subscription()
    memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
    values[6] = LSNGetDatum(worker.reply_lsn);

1.2 LogicalRepWorker的分配

Launcher ApplyWorker时分配slot,通过bgw_main_arg参数传给ApplyWorker

ApplyLauncherMain(Datum main_arg)
  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
    /* Find unused worker slot. */
    for (i = 0; i < max_logical_replication_workers; i++)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];

        if (!w->in_use)
        {
            worker = w;
            slot = i;
            break;
        }
    }
    bgw.bgw_main_arg = Int32GetDatum(slot);
    RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)

1.3 latest_end_lsn的更新

订阅端只有收到发布端的keepalive消息,才会更新pg_stat_subscription.latest_end_lsn
由于不是每次send_feedback()后都会更新latest_end_lsn,所以latest_end_lsn可能比实际反馈给发布端的lsn要滞后。实测时也经常能看到10秒以上的延迟。
为防止wal send超时,当超过wal_sender_timeout / 2还没有收到接受端反馈时,发送端会主动发送keepalive消息。

LogicalRepApplyLoop(XLogRecPtr last_received)
  
    for (;;)
    {
    ...
        len = walrcv_receive(wrconn, &buf, &fd);
        if (len != 0)
        {
        
            if (c == 'w')
            {
                XLogRecPtr    start_lsn;
                XLogRecPtr    end_lsn;
                TimestampTz send_time;

                start_lsn = pq_getmsgint64(&s);
                end_lsn = pq_getmsgint64(&s);
                send_time = pq_getmsgint64(&s);

                if (last_received < start_lsn)
                    last_received = start_lsn;

                if (last_received < end_lsn)
                    last_received = end_lsn;

                UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn

                apply_dispatch(&s);
            }
            else if (c == 'k')
            {
                XLogRecPtr    end_lsn;
                TimestampTz timestamp;
                bool        reply_requested;

                end_lsn = pq_getmsgint64(&s);
                timestamp = pq_getmsgint64(&s);
                reply_requested = pq_getmsgbyte(&s);

                if (last_received < end_lsn)
                    last_received = end_lsn;

                send_feedback(last_received, reply_requested, false);//反馈订阅端的write/flush/reply lsn
                UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
            }
        }
        send_feedback(last_received, false, false);//反馈订阅端的write/flush/reply lsn

2. 如何跟踪订阅端实际apply到哪里?

latest_end_lsn也能在一定程度上反映订阅端的apply位点,但是这和它本身的功能其实不是特别契合,而且它出现滞后的概率比较高,不是特别理想。

我们可以通过发布端的pg_stat_replication统计视图跟踪订阅端的apply位置。

同样参考上面LogicalRepApplyLoop()的代码,订阅端反馈自己复制位置的逻辑如下:

  • 如果没有pending的事务(所有和订阅相关的写事务已经在订阅端刷盘)
    反馈给sender:write=flush=apply=接受到最新wal位置
  • 如果有pending的事务
    反馈给sender:

    write=接受到最新wal位置
    flush=属于订阅范围的写事务已经在订阅端刷盘的位置
    apply=属于订阅范围的写事务已经在订阅端写盘的位置
    

由上面可以看出,逻辑订阅和物理复制不一样,物理复制是先写wal再apply这个WAL;逻辑订阅是先apply事务,再反馈这个事务产生的wal的flush位置

相关代码如下:

send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
      get_flush_position(&writepos, &flushpos, &have_pending_txes);
    /*
     * No outstanding transactions to flush, we can report the latest received
     * position. This is important for synchronous replication.
     */
    if (!have_pending_txes)
        flushpos = writepos = recvpos;
    ...
    pq_sendbyte(reply_message, 'r');
    pq_sendint64(reply_message, recvpos);    /* write */
    pq_sendint64(reply_message, flushpos);    /* flush */
    pq_sendint64(reply_message, writepos);    /* apply */
    pq_sendint64(reply_message, now);    /* sendTime */
    pq_sendbyte(reply_message, requestReply);    /* replyRequested */


static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
                   bool *have_pending_txes)
{
    dlist_mutable_iter iter;
    XLogRecPtr    local_flush = GetFlushRecPtr();

    *write = InvalidXLogRecPtr;
    *flush = InvalidXLogRecPtr;

    dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在应用commit日志时更新
    {
        FlushPosition *pos =
        dlist_container(FlushPosition, node, iter.cur);

        *write = pos->remote_end;

        if (pos->local_end <= local_flush)
        {
            *flush = pos->remote_end;
            dlist_delete(iter.cur);//从lsn_mapping中移除已经本地刷盘的记录
            pfree(pos);
        }
        else
        {
            /*
             * Don't want to uselessly iterate over the rest of the list which
             * could potentially be long. Instead get the last element and
             * grab the write position from there.
             */
            pos = dlist_tail_element(FlushPosition, node,
                                     &lsn_mapping);
            *write = pos->remote_end;
            *have_pending_txes = true;
            return;
        }
    }

    *have_pending_txes = !dlist_is_empty(&lsn_mapping);
}

应用commit日志时,会将commit对应的远程lsn和本地lsn添加到lsn_mapping末尾

ApplyWorkerMain
  LogicalRepApplyLoop(origin_startpos);
    apply_dispatch(&s);
      apply_handle_commit(StringInfo s)
        replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
        replorigin_session_origin_timestamp = commit_data.committime;
        CommitTransactionCommand();
        store_flush_position(commit_data.end_lsn);
            /* Track commit lsn  */
            flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
            flushpos->local_end = XactLastCommitEnd;
            flushpos->remote_end = remote_lsn;
            dlist_push_tail(&lsn_mapping, &flushpos->node);

3. 发布端pg_stat_replication中的apply位点能否保证正确性?

首先,需要明确,只有出现以下情况时,拿到的apply位置才认为有误的

  1. 发布端更新了订阅表的表
  2. 更新这个表的事务已提交
  3. 订阅端还没有应用这个事务
  4. pg_stat_replication中看到的apply位点已经大于等于3的事务结束位置

当所有表都是r或s状态时,订阅端的apply worker顺序接受和应用WAL日志。
在订阅端本地提交完成前,不会实施后续的send_feedback(),所以不会产生超过实际提交位置的apply位点(甚至碰巧pg_stat_subscription中的latest_end_lsn也可以认为是对的)。

4. 发布端pg_stat_replication中的apply位点是否可能反馈不及时?

有可能。但是pg_stat_replication.replay_lsn滞后的概率低于pg_stat_subscription.latest_end_lsn

当订阅端已处于同步状态时,下面的情况下pg_stat_replication中的apply位点可能反馈不及时,比发布端的当前lsn滞后。

  1. 订阅端处于sleep状态,最多sleep 1秒
  2. 发布端发送非订阅表更新的消息(含keepalive)不及时

发送端为了防止sender超时,会及时发送keepalive保活,因此我们可以在发布端停止更新订阅表后,可以最多等待wal_sender_timeout一样大的时间。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章