关于PG逻辑订阅判断数据是否同步的方法

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 关于PG逻辑订阅判断数据是否同步的方法

PG逻辑订阅过程中,怎么判断订阅端已经同步到哪儿了?

考虑过2种方案,哪个更合适

  • 订阅端的pg_stat_subscriptionlatest_end_lsn
  • 发布端的pg_stat_replication中的replay_lsn

1. 关于pg_stat_subscription中的latest_end_lsn

pg_stat_subscription中的received_lsnlatest_end_lsn比较像,它们的区别如下

  • received_lsn:最后一次接收到的预写日志位置
  • latest_end_lsn:报告给原始WAL发送程序的最后的预写日志位置

1.1 pg_stat_subscriptionlatest_end_lsn的来源

来源是全局数组LogicalRepCtx->workers[]

select * from pg_stat_subscription
  pg_stat_get_subscription()
    memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
    values[6] = LSNGetDatum(worker.reply_lsn);

1.2 LogicalRepWorker的分配

Launcher ApplyWorker时分配slot,通过bgw_main_arg参数传给ApplyWorker

ApplyLauncherMain(Datum main_arg)
  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
    /* Find unused worker slot. */
    for (i = 0; i < max_logical_replication_workers; i++)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];

        if (!w->in_use)
        {
            worker = w;
            slot = i;
            break;
        }
    }
    bgw.bgw_main_arg = Int32GetDatum(slot);
    RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)

1.3 latest_end_lsn的更新

订阅端只有收到发布端的keepalive消息,才会更新pg_stat_subscription.latest_end_lsn
由于不是每次send_feedback()后都会更新latest_end_lsn,所以latest_end_lsn可能比实际反馈给发布端的lsn要滞后。实测时也经常能看到10秒以上的延迟。
为防止wal send超时,当超过wal_sender_timeout / 2还没有收到接受端反馈时,发送端会主动发送keepalive消息。

LogicalRepApplyLoop(XLogRecPtr last_received)
  
    for (;;)
    {
    ...
        len = walrcv_receive(wrconn, &buf, &fd);
        if (len != 0)
        {
        
            if (c == 'w')
            {
                XLogRecPtr    start_lsn;
                XLogRecPtr    end_lsn;
                TimestampTz send_time;

                start_lsn = pq_getmsgint64(&s);
                end_lsn = pq_getmsgint64(&s);
                send_time = pq_getmsgint64(&s);

                if (last_received < start_lsn)
                    last_received = start_lsn;

                if (last_received < end_lsn)
                    last_received = end_lsn;

                UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn

                apply_dispatch(&s);
            }
            else if (c == 'k')
            {
                XLogRecPtr    end_lsn;
                TimestampTz timestamp;
                bool        reply_requested;

                end_lsn = pq_getmsgint64(&s);
                timestamp = pq_getmsgint64(&s);
                reply_requested = pq_getmsgbyte(&s);

                if (last_received < end_lsn)
                    last_received = end_lsn;

                send_feedback(last_received, reply_requested, false);//反馈订阅端的write/flush/reply lsn
                UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
            }
        }
        send_feedback(last_received, false, false);//反馈订阅端的write/flush/reply lsn

2. 如何跟踪订阅端实际apply到哪里?

latest_end_lsn也能在一定程度上反映订阅端的apply位点,但是这和它本身的功能其实不是特别契合,而且它出现滞后的概率比较高,不是特别理想。

我们可以通过发布端的pg_stat_replication统计视图跟踪订阅端的apply位置。

同样参考上面LogicalRepApplyLoop()的代码,订阅端反馈自己复制位置的逻辑如下:

  • 如果没有pending的事务(所有和订阅相关的写事务已经在订阅端刷盘)
    反馈给sender:write=flush=apply=接受到最新wal位置
  • 如果有pending的事务
    反馈给sender:

    write=接受到最新wal位置
    flush=属于订阅范围的写事务已经在订阅端刷盘的位置
    apply=属于订阅范围的写事务已经在订阅端写盘的位置
    

由上面可以看出,逻辑订阅和物理复制不一样,物理复制是先写wal再apply这个WAL;逻辑订阅是先apply事务,再反馈这个事务产生的wal的flush位置

相关代码如下:

send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
      get_flush_position(&writepos, &flushpos, &have_pending_txes);
    /*
     * No outstanding transactions to flush, we can report the latest received
     * position. This is important for synchronous replication.
     */
    if (!have_pending_txes)
        flushpos = writepos = recvpos;
    ...
    pq_sendbyte(reply_message, 'r');
    pq_sendint64(reply_message, recvpos);    /* write */
    pq_sendint64(reply_message, flushpos);    /* flush */
    pq_sendint64(reply_message, writepos);    /* apply */
    pq_sendint64(reply_message, now);    /* sendTime */
    pq_sendbyte(reply_message, requestReply);    /* replyRequested */


static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
                   bool *have_pending_txes)
{
    dlist_mutable_iter iter;
    XLogRecPtr    local_flush = GetFlushRecPtr();

    *write = InvalidXLogRecPtr;
    *flush = InvalidXLogRecPtr;

    dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在应用commit日志时更新
    {
        FlushPosition *pos =
        dlist_container(FlushPosition, node, iter.cur);

        *write = pos->remote_end;

        if (pos->local_end <= local_flush)
        {
            *flush = pos->remote_end;
            dlist_delete(iter.cur);//从lsn_mapping中移除已经本地刷盘的记录
            pfree(pos);
        }
        else
        {
            /*
             * Don't want to uselessly iterate over the rest of the list which
             * could potentially be long. Instead get the last element and
             * grab the write position from there.
             */
            pos = dlist_tail_element(FlushPosition, node,
                                     &lsn_mapping);
            *write = pos->remote_end;
            *have_pending_txes = true;
            return;
        }
    }

    *have_pending_txes = !dlist_is_empty(&lsn_mapping);
}

应用commit日志时,会将commit对应的远程lsn和本地lsn添加到lsn_mapping末尾

ApplyWorkerMain
  LogicalRepApplyLoop(origin_startpos);
    apply_dispatch(&s);
      apply_handle_commit(StringInfo s)
        replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
        replorigin_session_origin_timestamp = commit_data.committime;
        CommitTransactionCommand();
        store_flush_position(commit_data.end_lsn);
            /* Track commit lsn  */
            flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
            flushpos->local_end = XactLastCommitEnd;
            flushpos->remote_end = remote_lsn;
            dlist_push_tail(&lsn_mapping, &flushpos->node);

3. 发布端pg_stat_replication中的apply位点能否保证正确性?

首先,需要明确,只有出现以下情况时,拿到的apply位置才认为有误的

  1. 发布端更新了订阅表的表
  2. 更新这个表的事务已提交
  3. 订阅端还没有应用这个事务
  4. pg_stat_replication中看到的apply位点已经大于等于3的事务结束位置

当所有表都是r或s状态时,订阅端的apply worker顺序接受和应用WAL日志。
在订阅端本地提交完成前,不会实施后续的send_feedback(),所以不会产生超过实际提交位置的apply位点(甚至碰巧pg_stat_subscription中的latest_end_lsn也可以认为是对的)。

4. 发布端pg_stat_replication中的apply位点是否可能反馈不及时?

有可能。但是pg_stat_replication.replay_lsn滞后的概率低于pg_stat_subscription.latest_end_lsn

当订阅端已处于同步状态时,下面的情况下pg_stat_replication中的apply位点可能反馈不及时,比发布端的当前lsn滞后。

  1. 订阅端处于sleep状态,最多sleep 1秒
  2. 发布端发送非订阅表更新的消息(含keepalive)不及时

发送端为了防止sender超时,会及时发送keepalive保活,因此我们可以在发布端停止更新订阅表后,可以最多等待wal_sender_timeout一样大的时间。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
关系型数据库 流计算 PostgreSQL
关于PostgreSQL逻辑订阅中的复制状态
关于PostgreSQL逻辑订阅中的复制状态
2672 0
|
16天前
|
SQL 存储 算法
基于对象 - 事件模式的数据计算问题
基于对象-事件模式的数据计算是商业中最常见的数据分析任务之一。对象如用户、账号、商品等,通过唯一ID记录其相关事件,如操作日志、交易记录等。这种模式下的统计任务包括无序计算(如交易次数、通话时长)和有序计算(如漏斗分析、连续交易检测)。尽管SQL在处理无序计算时表现尚可,但在有序计算中却显得力不从心,主要原因是其对跨行记录运算的支持较弱,且大表JOIN和大结果集GROUP BY的性能较差。相比之下,SPL语言通过强化离散性和有序集合的支持,能够高效地处理这类计算任务,避免了大表JOIN和复杂的GROUP BY操作,从而显著提升了计算效率。
|
23天前
|
消息中间件 存储 缓存
如何在无状态函数中实现事务性操作
如何在无状态函数中实现事务性操作
|
6月前
|
关系型数据库 MySQL 测试技术
当update修改数据与原数据相同时会再次执行吗
当update修改数据与原数据相同时会再次执行吗
50 1
|
调度
任务同步管理的方法
任务同步管理的方法
94 0
|
消息中间件 监控 关系型数据库
PostgreSQL 逻辑复制、增量复制、逻辑订阅、增量订阅中间件 amazonriver - HelloBike开源
标签 PostgreSQL , 逻辑复制 , 流复制 , testdecoding , amazonriver , hellobike 背景 amazonriver 是一个将postgresql的实时数据同步到es或kafka的服务。由hellobike开源。 版本支持 Postgresql 9.4 or later Kafka 0.8 or later Elas
1404 9
|
Web App开发 关系型数据库 PostgreSQL
|
SQL 数据库 Go
SQL Server 可更新订阅中有行筛选的同步复制移除项目而不重新初始化所有订阅!
原文:SQL Server 可更新订阅中有行筛选的同步复制移除项目而不重新初始化所有订阅! 在可更新订阅的同步复制中,有行筛选的项目表,移除的时候会提示重新初始化所有的快照并且应用此快照,这将导致所有的订阅数据库被重新初始化。
1137 0
|
数据库
SqlServer 可更新订阅队列读取器代理错误:试图进行的插入或更新已失败
原文:SqlServer 可更新订阅队列读取器代理错误:试图进行的插入或更新已失败 今天发现队列读取器代理不停地尝试启动但总是出错: 其中内容如下: 队列读取器代理在连接“PublicationServer”上的“pubDB”时遇到错误“试图进行的插入或更新已失败, 原因是目标视图或者目标视图所跨越的某一视图指定了 WITH CHECK OPTION, 而该操作的一个或多个结果行又不符合 CHECK OPTION 约束。
1444 0