PostgreSQL 事务回卷案例分析

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

作者信息


谢桂起(花名:渊渱) 2020年毕业后加入阿里云,一直从事RDS PostgreSQL相关工作,善于解决线上各类RDS PostgreSQL运维管控相关问题。


背景


前阵子某个客户反馈他的RDS PostgreSQL无法写入,报错信息如下:

postgres=# select * from test;
 id 
----
(0 rows)
postgres=# insert into test select 1;
ERROR:  database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT:  Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.

随后RDS工程师介入处理以后,该问题立马得到了解决。

XID基础原理


XID 定义


XID(Transaction ID)是 PostgreSQL 内部的事务编号,每个事务都会分配一个XID,依次递增。PostgreSQL 数据中每个元组头部都会保存着 插入 或者 删除 这条元组的XID(Transaction ID),然后内核通过这个 XID 构造数据库的一致性读。在事务隔离级别是 可重复读 的情况下,假设如有两个事务,xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元组,看不到 t_xmin > 200 的元组。

typedef uint32 TransactionId;  /* 事务号定义,32位无符号整数 */
typedef struct HeapTupleFields
{
  TransactionId t_xmin;    /* 插入该元组的事务号 */
  TransactionId t_xmax;    /* 删除或锁定该元组的事务号 */
    /*** 其它属性省略 ***/
} HeapTupleFields;
struct HeapTupleHeaderData
{
  union
  {
    HeapTupleFields t_heap;
    DatumTupleFields t_datum;
  }      t_choice;
    /*** 其它属性省略 ***/
};

XID 发行机制


从上面结构中我们可以看到,XID 是一个32位无符号整数,也就是 XID 的范围是 0到2^32-1;那么超过了 2^32-1的事务怎么办呢?其实 XID 是一个环,超过了 2^32-1 之后又会从头开始分配。通过源代码也证明了上述结论:

// 无效事务号
#define InvalidTransactionId    ((TransactionId) 0)
// 引导事务号,在数据库初始化过程(BKI执行)中使用
#define BootstrapTransactionId    ((TransactionId) 1)
// 冻结事务号用于表示非常陈旧的元组,它们比所有正常事务号都要早(也就是可见)
#define FrozenTransactionId      ((TransactionId) 2)
// 第一个正常事务号
#define FirstNormalTransactionId  ((TransactionId) 3)
// 把 FullTransactionId 的低32位作为无符号整数生成 xid
#define XidFromFullTransactionId(x)    ((uint32) (x).value)
static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
  dest->value++;
  while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
    dest->value++;
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
    /*** 省略 ***/
  FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
    /*** 省略 ***
  return full_xid;
}
static void
AssignTransactionId(TransactionState s)
{
    /*** 省略 ***/
  s->fullTransactionId = GetNewTransactionId(isSubXact);
  if (!isSubXact)
    XactTopFullTransactionId = s->fullTransactionId;
    /*** 省略 ***/
}
TransactionId
GetTopTransactionId(void)
{
  if (!FullTransactionIdIsValid(XactTopFullTransactionId))
    AssignTransactionId(&TopTransactionStateData);
  return XidFromFullTransactionId(XactTopFullTransactionId);
}

可以看到,新事务号保存在共享变量缓存中:ShmemVariableCache->nextFullXid,每发行一个事务号后,向上调整它的值,并跳过上述三个特殊值。三个特殊仠分别为0、1和2,作用可以看上面代码注释。

XID 回卷机制


前面说到,XID 是一个环,分配到 2^32-1 之后又从 3 开始,那么内核是怎么比较两个事务的大小的呢?比如 xid 经历了这样一个过程 3-> 2^32-1 -> 5,那么内核怎么样知道 5 这个事务在 2^32-1 后面呢?我们再看一下代码:

/*
 * TransactionIdPrecedes --- is id1 logically < id2?
 */
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
  /*
   * If either ID is a permanent XID then we can just do unsigned
   * comparison.  If both are normal, do a modulo-2^32 comparison.
   */
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 < id2);
  diff = (int32) (id1 - id2);
  return (diff < 0);
}

可以看到,内核使用了一个比较取巧的方法:(int32) (id1 - id2) < 0,32位有符号整数的取值范围是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以转换成 int32 会变成负数。但是这里面有一个问题,「最新事务号-最老事务号」 必须小于 2^31,一旦大于就会出现回卷,导致老事务产生的数据对新事务不可见。

XID 回卷预防


前面讲到,「最新事务号-最老事务号」 必须小于 2^31,否则会发生回卷导致老事务产生的数据对新事务不可见,那内核是怎么避免这个问题的呢?内核是这样处理的:通过定期把老事务产生的元组的 XID 更新为 FrozenTransactionId,即更新为2,来回收 XID,而 XID 为2 的元组对所有的事务可见,这个过程称为 XID 冻结,通过这个方式可以回收 XID 来保证 |最新事务号-最老事务号| < 2^31。
除了内核自动冻结回收XID,我们也可以通过命令或者 sql 的方式手动进行 xid 冻结回收

  • 查询数据库或表的年龄,数据库年龄指的是:「最新事务号-数据库中最老事务号」,表年龄指的是:「最新事务号-表中最老事务号」
# 查看每个库的年龄
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1个库每个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 
# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;


  • 手动冻结回收一张表的元组的 xid 的sql:


vacuum freeze 表名;


  • 手动冻结回收一个库里面的所有表 xid 的命令:


vacuumdb -d 库名 --freeze --jobs=30 -h 连接串 -p 端口号 -U 库Owner

冻结回收过程是一个重 IO 的操作,这个过程内核会描述表的所有页面,然后把符合要求的元组的 t_xmin 字段更新为 2,所以这个过程需要在业务低峰进行,避免影响业务。

与冻结回收相关的内核参数有三个:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于笔者对于这三个参数理解不深,就不在这里班门弄斧了,感兴趣的同学可以自行找资料了解一下。

解决方案


问题分析


基于上面的原理分析,我们知道,「最新事务号-最老事务号」 =  2^31-1000000,即当前可用的 xid 仅剩下一百万的时候,内核就会禁止实例写入并报错:database is not accepting commands to avoid wraparound data loss in database, 这个时候必须连到提示中的 "xxxx" 对表进行 freeze 回收更多的 XID。

void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
  TransactionId xidVacLimit;
  TransactionId xidWarnLimit;
  TransactionId xidStopLimit;
  TransactionId xidWrapLimit;
  TransactionId curXid;
  Assert(TransactionIdIsNormal(oldest_datfrozenxid));
  /*
     * xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将发生回卷
   */
  xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  if (xidWrapLimit < FirstNormalTransactionId)
    xidWrapLimit += FirstNormalTransactionId;
  /*
     * 一旦当前事务号到达xidStopLimit,实例将不可写入,保留 1000000 的xid用于vacuum
     * 每 vacuum 一张表需要占用一个xid
   */
  xidStopLimit = xidWrapLimit - 1000000;
  if (xidStopLimit < FirstNormalTransactionId)
    xidStopLimit -= FirstNormalTransactionId;
  /*
     * 一旦当前事务号到达xidWarnLimit,将不停地收到
     * WARNING:  database "xxxx" must be vacuumed within 2740112 transactions
   */
  xidWarnLimit = xidStopLimit - 10000000;
  if (xidWarnLimit < FirstNormalTransactionId)
    xidWarnLimit -= FirstNormalTransactionId;
  /*
     * 一旦当前事务号到达xidVacLimit将触发force autovacuums
   */
  xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
  if (xidVacLimit < FirstNormalTransactionId)
    xidVacLimit += FirstNormalTransactionId;
  /* Grab lock for just long enough to set the new limit values */
  LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
  ShmemVariableCache->oldestXid = oldest_datfrozenxid;
  ShmemVariableCache->xidVacLimit = xidVacLimit;
  ShmemVariableCache->xidWarnLimit = xidWarnLimit;
  ShmemVariableCache->xidStopLimit = xidStopLimit;
  ShmemVariableCache->xidWrapLimit = xidWrapLimit;
  ShmemVariableCache->oldestXidDB = oldest_datoid;
  curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
  LWLockRelease(XidGenLock);
  /* Log the info */
  ereport(DEBUG1,
      (errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
          xidWrapLimit, oldest_datoid)));
  /*
     * 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age
     * 触发 autovacuum 对年龄最老的数据库进行清理,如果有多个数据库达到要求,按年龄最老的顺序依次清理
   * 通过设置标志位标记当前 autovacuum 结束之后再来一次 autovacuum
     */
  if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
    IsUnderPostmaster && !InRecovery)
    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
  /* Give an immediate warning if past the wrap warn point */
  if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
  {
    char     *oldest_datname;
    if (IsTransactionState())
      oldest_datname = get_database_name(oldest_datoid);
    else
      oldest_datname = NULL;
    if (oldest_datname)
      ereport(WARNING,
          (errmsg("database \"%s\" must be vacuumed within %u transactions",
              oldest_datname,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
    else
      ereport(WARNING,
          (errmsg("database with OID %u must be vacuumed within %u transactions",
              oldest_datoid,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
  }
}
bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 >= id2);
  diff = (int32) (id1 - id2);
  return (diff >= 0);
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
  if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
  {
    TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
    TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
    TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
    Oid      oldest_datoid = ShmemVariableCache->oldestXidDB;
        /*** 省略 ***/
    if (IsUnderPostmaster &&
      TransactionIdFollowsOrEquals(xid, xidStopLimit))
    {
      char     *oldest_datname = get_database_name(oldest_datoid);
      /* complain even if that DB has disappeared */
      if (oldest_datname)
        ereport(ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                oldest_datname),
             errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
            /*** 省略 ***/
    }
        /*** 省略 ***/
  }
    /*** 省略 ***/
}

问题定位

# 查看每个库的年龄
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1个库每个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 
# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;

问题解决


  1. 通过上面的第一个 sql,查找年龄最大的数据库,数据库年龄指的是:|最新事务号-数据库中最老事务号|
  2. 通过上面第二个 sql,查找年龄最大的表,然后对表依次执行:vacuum freeze 表名,把表中的老事务号冻结回收,表年龄指的是:|最新事务号-表中最老事务号|
  3. 运维脚本


  • 单进程 Shell 脚本
# 对指定数据库中年龄最大的前 50 张表进行 vacuum freeze
for cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd  | grep -v row | grep vacuum`; do
    psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "$cmd"
done
  • 多进程 Python 脚本
from multiprocessing import Pool
import psycopg2
args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='数据库名',
            user='用户名', password='密码')
def vacuum_handler(sql):
    sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur = conn.cursor()
        cur.execute(sql_str)
        print cur.fetchall()
        conn.close()
    except Exception as e:
        print str(e)
# 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发执行
def multi_vacuum():
    pool = Pool(processes=32)
    sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql_str)
        rows = cur.fetchall()
        for row in rows:
            cmd = row['vacuum_cmd']
            pool.apply_async(vacuum_handler, (cmd, ))
        conn.close()
        pool.close()
        pool.join()
    except Exception as e:
        print str(e)
multi_vacuum()


友情提示


vacuum freeze 会扫描表的所有页面并更新,是一个重 IO 的操作,操作过程中一定要控制好并发数,否则非常容易把实例打挂。

加入我们JOIN US

欢迎加入阿里云数据库RDS团队,这里有大规模高并发的数据库场景,更有丰富的业务场景;欢迎有志之士一起加入阿里云数据库RDS团队,简历投递地址:vogts.wangt@alibaba-inc.com

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
29天前
|
存储 SQL 关系型数据库
MySQL的事务隔离级别
【10月更文挑战第17天】MySQL的事务隔离级别
97 43
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1638 14
|
2月前
|
存储 Oracle 关系型数据库
Oracle和MySQL有哪些区别?从基本特性、技术选型、字段类型、事务、语句等角度详细对比Oracle和MySQL
从基本特性、技术选型、字段类型、事务提交方式、SQL语句、分页方法等方面对比Oracle和MySQL的区别。
523 18
Oracle和MySQL有哪些区别?从基本特性、技术选型、字段类型、事务、语句等角度详细对比Oracle和MySQL
|
1月前
|
SQL 关系型数据库 MySQL
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
尼恩,一位40岁的资深架构师,通过其丰富的经验和深厚的技術功底,为众多读者提供了宝贵的面试指导和技术分享。在他的读者交流群中,许多小伙伴获得了来自一线互联网企业的面试机会,并成功应对了诸如事务ACID特性实现、MVCC等相关面试题。尼恩特别整理了这些常见面试题的系统化解答,形成了《MVCC 学习圣经:一次穿透MYSQL MVCC》PDF文档,旨在帮助大家在面试中展示出扎实的技术功底,提高面试成功率。此外,他还编写了《尼恩Java面试宝典》等资料,涵盖了大量面试题和答案,帮助读者全面提升技术面试的表现。这些资料不仅内容详实,而且持续更新,是求职者备战技术面试的宝贵资源。
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
|
2月前
|
SQL 关系型数据库 MySQL
MySQL基础:事务
本文详细介绍了数据库事务的概念及操作,包括事务的定义、开启、提交与回滚。事务作为一组不可分割的操作集合,确保了数据的一致性和完整性。文章还探讨了事务的四大特性(原子性、一致性、隔离性、持久性),并分析了并发事务可能引发的问题及其解决方案,如脏读、不可重复读和幻读。最后,详细讲解了不同事务隔离级别的特点和应用场景。
144 4
MySQL基础:事务
|
2月前
|
Oracle NoSQL 关系型数据库
主流数据库对比:MySQL、PostgreSQL、Oracle和Redis的优缺点分析
主流数据库对比:MySQL、PostgreSQL、Oracle和Redis的优缺点分析
420 2
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL】索引和事务
【MySQL】索引和事务
55 0
|
2月前
|
SQL Oracle 关系型数据库
详解 MySQL 的事务以及隔离级别
详解 MySQL 的事务以及隔离级别
41 0
|
3月前
|
SQL 关系型数据库 MySQL
Mysql原理与调优-事务与MVCC
【8月更文挑战第19天】
|
3月前
|
存储 SQL 关系型数据库
深入解析MySQL事务机制和锁机制
深入解析MySQL事务机制和锁机制

相关产品

  • 云原生数据库 PolarDB
  • 下一篇
    无影云桌面