PostgreSQL 事务回卷案例分析

简介:
+关注继续查看

作者信息


谢桂起(花名:渊渱) 2020年毕业后加入阿里云,一直从事RDS PostgreSQL相关工作,善于解决线上各类RDS PostgreSQL运维管控相关问题。


背景


前阵子某个客户反馈他的RDS PostgreSQL无法写入,报错信息如下:

postgres=# select * from test;
 id 
----
(0 rows)

postgres=# insert into test select 1;
ERROR:  database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT:  Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.

随后RDS工程师介入处理以后,该问题立马得到了解决。

XID基础原理


XID 定义


XID(Transaction ID)是 PostgreSQL 内部的事务编号,每个事务都会分配一个XID,依次递增。PostgreSQL 数据中每个元组头部都会保存着 插入 或者 删除 这条元组的XID(Transaction ID),然后内核通过这个 XID 构造数据库的一致性读。在事务隔离级别是 可重复读 的情况下,假设如有两个事务,xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元组,看不到 t_xmin > 200 的元组。

typedef uint32 TransactionId;  /* 事务号定义,32位无符号整数 */

typedef struct HeapTupleFields
{
  TransactionId t_xmin;    /* 插入该元组的事务号 */
  TransactionId t_xmax;    /* 删除或锁定该元组的事务号 */

    /*** 其它属性省略 ***/
} HeapTupleFields;

struct HeapTupleHeaderData
{
  union
  {
    HeapTupleFields t_heap;
    DatumTupleFields t_datum;
  }      t_choice;

    /*** 其它属性省略 ***/
};

XID 发行机制


从上面结构中我们可以看到,XID 是一个32位无符号整数,也就是 XID 的范围是 0到2^32-1;那么超过了 2^32-1的事务怎么办呢?其实 XID 是一个环,超过了 2^32-1 之后又会从头开始分配。通过源代码也证明了上述结论:

// 无效事务号
#define InvalidTransactionId    ((TransactionId) 0)
// 引导事务号,在数据库初始化过程(BKI执行)中使用
#define BootstrapTransactionId    ((TransactionId) 1)
// 冻结事务号用于表示非常陈旧的元组,它们比所有正常事务号都要早(也就是可见)
#define FrozenTransactionId      ((TransactionId) 2)
// 第一个正常事务号
#define FirstNormalTransactionId  ((TransactionId) 3)
// 把 FullTransactionId 的低32位作为无符号整数生成 xid
#define XidFromFullTransactionId(x)    ((uint32) (x).value)

static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
  dest->value++;
  while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
    dest->value++;
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
    /*** 省略 ***/
  FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
    /*** 省略 ***
  return full_xid;
}

static void
AssignTransactionId(TransactionState s)
{
    /*** 省略 ***/
  s->fullTransactionId = GetNewTransactionId(isSubXact);
  if (!isSubXact)
    XactTopFullTransactionId = s->fullTransactionId;
    /*** 省略 ***/
}

TransactionId
GetTopTransactionId(void)
{
  if (!FullTransactionIdIsValid(XactTopFullTransactionId))
    AssignTransactionId(&TopTransactionStateData);
  return XidFromFullTransactionId(XactTopFullTransactionId);
}

可以看到,新事务号保存在共享变量缓存中:ShmemVariableCache->nextFullXid,每发行一个事务号后,向上调整它的值,并跳过上述三个特殊值。三个特殊仠分别为0、1和2,作用可以看上面代码注释。

XID 回卷机制


前面说到,XID 是一个环,分配到 2^32-1 之后又从 3 开始,那么内核是怎么比较两个事务的大小的呢?比如 xid 经历了这样一个过程 3-> 2^32-1 -> 5,那么内核怎么样知道 5 这个事务在 2^32-1 后面呢?我们再看一下代码:

/*
 * TransactionIdPrecedes --- is id1 logically < id2?
 */
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
  /*
   * If either ID is a permanent XID then we can just do unsigned
   * comparison.  If both are normal, do a modulo-2^32 comparison.
   */
  int32    diff;

  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 < id2);

  diff = (int32) (id1 - id2);
  return (diff < 0);
}

可以看到,内核使用了一个比较取巧的方法:(int32) (id1 - id2) < 0,32位有符号整数的取值范围是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以转换成 int32 会变成负数。但是这里面有一个问题,「最新事务号-最老事务号」 必须小于 2^31,一旦大于就会出现回卷,导致老事务产生的数据对新事务不可见。

XID 回卷预防


前面讲到,「最新事务号-最老事务号」 必须小于 2^31,否则会发生回卷导致老事务产生的数据对新事务不可见,那内核是怎么避免这个问题的呢?内核是这样处理的:通过定期把老事务产生的元组的 XID 更新为 FrozenTransactionId,即更新为2,来回收 XID,而 XID 为2 的元组对所有的事务可见,这个过程称为 XID 冻结,通过这个方式可以回收 XID 来保证 |最新事务号-最老事务号| < 2^31。
除了内核自动冻结回收XID,我们也可以通过命令或者 sql 的方式手动进行 xid 冻结回收

  • 查询数据库或表的年龄,数据库年龄指的是:「最新事务号-数据库中最老事务号」,表年龄指的是:「最新事务号-表中最老事务号」
# 查看每个库的年龄
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1个库每个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;


  • 手动冻结回收一张表的元组的 xid 的sql:


vacuum freeze 表名;


  • 手动冻结回收一个库里面的所有表 xid 的命令:


vacuumdb -d 库名 --freeze --jobs=30 -h 连接串 -p 端口号 -U 库Owner

冻结回收过程是一个重 IO 的操作,这个过程内核会描述表的所有页面,然后把符合要求的元组的 t_xmin 字段更新为 2,所以这个过程需要在业务低峰进行,避免影响业务。

与冻结回收相关的内核参数有三个:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于笔者对于这三个参数理解不深,就不在这里班门弄斧了,感兴趣的同学可以自行找资料了解一下。

解决方案


问题分析


基于上面的原理分析,我们知道,「最新事务号-最老事务号」 =  2^31-1000000,即当前可用的 xid 仅剩下一百万的时候,内核就会禁止实例写入并报错:database is not accepting commands to avoid wraparound data loss in database, 这个时候必须连到提示中的 "xxxx" 对表进行 freeze 回收更多的 XID。

void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
  TransactionId xidVacLimit;
  TransactionId xidWarnLimit;
  TransactionId xidStopLimit;
  TransactionId xidWrapLimit;
  TransactionId curXid;

  Assert(TransactionIdIsNormal(oldest_datfrozenxid));

  /*
     * xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将发生回卷
   */
  xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  if (xidWrapLimit < FirstNormalTransactionId)
    xidWrapLimit += FirstNormalTransactionId;

  /*
     * 一旦当前事务号到达xidStopLimit,实例将不可写入,保留 1000000 的xid用于vacuum
     * 每 vacuum 一张表需要占用一个xid
   */
  xidStopLimit = xidWrapLimit - 1000000;
  if (xidStopLimit < FirstNormalTransactionId)
    xidStopLimit -= FirstNormalTransactionId;

  /*
     * 一旦当前事务号到达xidWarnLimit,将不停地收到
     * WARNING:  database "xxxx" must be vacuumed within 2740112 transactions
   */
  xidWarnLimit = xidStopLimit - 10000000;
  if (xidWarnLimit < FirstNormalTransactionId)
    xidWarnLimit -= FirstNormalTransactionId;

  /*
     * 一旦当前事务号到达xidVacLimit将触发force autovacuums
   */
  xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
  if (xidVacLimit < FirstNormalTransactionId)
    xidVacLimit += FirstNormalTransactionId;

  /* Grab lock for just long enough to set the new limit values */
  LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
  ShmemVariableCache->oldestXid = oldest_datfrozenxid;
  ShmemVariableCache->xidVacLimit = xidVacLimit;
  ShmemVariableCache->xidWarnLimit = xidWarnLimit;
  ShmemVariableCache->xidStopLimit = xidStopLimit;
  ShmemVariableCache->xidWrapLimit = xidWrapLimit;
  ShmemVariableCache->oldestXidDB = oldest_datoid;
  curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
  LWLockRelease(XidGenLock);

  /* Log the info */
  ereport(DEBUG1,
      (errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
          xidWrapLimit, oldest_datoid)));

  /*
     * 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age
     * 触发 autovacuum 对年龄最老的数据库进行清理,如果有多个数据库达到要求,按年龄最老的顺序依次清理
   * 通过设置标志位标记当前 autovacuum 结束之后再来一次 autovacuum
     */
  if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
    IsUnderPostmaster && !InRecovery)
    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

  /* Give an immediate warning if past the wrap warn point */
  if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
  {
    char     *oldest_datname;

    if (IsTransactionState())
      oldest_datname = get_database_name(oldest_datoid);
    else
      oldest_datname = NULL;

    if (oldest_datname)
      ereport(WARNING,
          (errmsg("database \"%s\" must be vacuumed within %u transactions",
              oldest_datname,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
    else
      ereport(WARNING,
          (errmsg("database with OID %u must be vacuumed within %u transactions",
              oldest_datoid,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
  }
}

bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 >= id2);

  diff = (int32) (id1 - id2);
  return (diff >= 0);
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);

  if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
  {
    TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
    TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
    TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
    Oid      oldest_datoid = ShmemVariableCache->oldestXidDB;

        /*** 省略 ***/
    if (IsUnderPostmaster &&
      TransactionIdFollowsOrEquals(xid, xidStopLimit))
    {
      char     *oldest_datname = get_database_name(oldest_datoid);

      /* complain even if that DB has disappeared */
      if (oldest_datname)
        ereport(ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                oldest_datname),
             errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
            /*** 省略 ***/
    }
        /*** 省略 ***/
  }
    /*** 省略 ***/
}

问题定位

# 查看每个库的年龄
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1个库每个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;

问题解决


  1. 通过上面的第一个 sql,查找年龄最大的数据库,数据库年龄指的是:|最新事务号-数据库中最老事务号|
  2. 通过上面第二个 sql,查找年龄最大的表,然后对表依次执行:vacuum freeze 表名,把表中的老事务号冻结回收,表年龄指的是:|最新事务号-表中最老事务号|
  3. 运维脚本


  • 单进程 Shell 脚本
# 对指定数据库中年龄最大的前 50 张表进行 vacuum freeze

for cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd  | grep -v row | grep vacuum`; do
    psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "$cmd"
done
  • 多进程 Python 脚本
from multiprocessing import Pool
import psycopg2

args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='数据库名',
            user='用户名', password='密码')

def vacuum_handler(sql):
    sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur = conn.cursor()
        cur.execute(sql_str)
        print cur.fetchall()
        conn.close()
    except Exception as e:
        print str(e)

# 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发执行
def multi_vacuum():
    pool = Pool(processes=32)
    sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql_str)
        rows = cur.fetchall()
        for row in rows:
            cmd = row['vacuum_cmd']
            pool.apply_async(vacuum_handler, (cmd, ))
        conn.close()
        pool.close()
        pool.join()
    except Exception as e:
        print str(e)


multi_vacuum()


友情提示


vacuum freeze 会扫描表的所有页面并更新,是一个重 IO 的操作,操作过程中一定要控制好并发数,否则非常容易把实例打挂。

加入我们JOIN US

欢迎加入阿里云数据库RDS团队,这里有大规模高并发的数据库场景,更有丰富的业务场景;欢迎有志之士一起加入阿里云数据库RDS团队,简历投递地址:vogts.wangt@alibaba-inc.com

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
19天前
|
关系型数据库 分布式数据库 PolarDB
沉浸式学习PostgreSQL|PolarDB 15: 企业ERP软件、网站、分析型业务场景、营销场景人群圈选, 任意字段组合条件数据筛选
在企业ERP软件、网站中经常会有一些让用户输入筛选条件(或者勾选筛选条件)的地方, 一个页面可能出现若干个选项, 每个选项用户可以进行勾选或下拉框选择. 例如淘宝网, 发货地是哪里, 商品价格范围, 商品类目, 内存大小, .... 很多选项提供选择. 分析业务场景, 经常会使用大宽表来表示对象的特征, 每个字段代表一个特征维度, 然后通过各个字段的组合条件来进行数据的统计分析. 营销场景, 和前面分析场景类似, 通过各个字段的组合条件圈选目标用户. 通常一个选项代表一个对象的某属性, 用户可能根据任意组合条件进行筛选, 本实验目标学习如何快速在任意字段组合条件输入搜索到满足条件的数据.
399 0
|
27天前
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
212 1
|
2月前
|
JSON 关系型数据库 分布式数据库
|
2月前
|
存储 关系型数据库 PostgreSQL
Postgresql内核源码分析-heapam分析
Postgresql内核源码分析-heapam分析
|
2月前
|
SQL 存储 自然语言处理
玩转阿里云PostgreSQL,通过pg_jieba对豆瓣影评进行热评分析
在当今社交媒体的时代,人们通过各种平台分享自己的生活、观点和情感。然而,对于平台管理员和品牌经营者来说,了解用户的情感和意见变得至关重要。为了帮助他们更好地了解用户的情感倾向,我们可以使用PostgreSQL中的pg_jieba插件对这些发帖进行分词和情感分析,来构建一个社交媒体情感分析系统,系统将根据用户的发帖内容,自动判断其情感倾向是积极、消极还是中性,并将结果存储在数据库中。 本文通过针对kaggle数据集中的豆瓣影评的中文评论数据,通过阿里云的PostgreSQL中的pg_jieba插件进行分词(可自定义多个词典,并且切换自定义词典进行分词),基于分词的结果进行统计分析。
|
3月前
|
关系型数据库 数据库 PostgreSQL
PostgreSQL pg_rewind报错分析
PostgreSQL pg_rewind报错分析
34 0
|
3月前
|
关系型数据库 PostgreSQL
PostgreSQL pg_ctl start超时分析
PostgreSQL pg_ctl start超时分析
44 0
|
5月前
|
SQL Cloud Native 关系型数据库
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版功能演示(上)——六、查看分析执行计划
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版功能演示(上)——六、查看分析执行计划
|
9月前
|
并行计算 算法 Cloud Native
PolarDB 开源版 使用PostGIS 数据寻龙点穴(空间聚集分析)- 大数据与GIS分析解决线下店铺选址问题
寻龙点穴是风水学术语。古人说:三年寻龙,十年点穴。意思就是说,学会寻龙脉要很长的时间,但要懂得点穴,并且点得准则难上加难,甚至须要用“十年”时间。 但是,若没正确方法,就是用百年时间,也不能够点中风水穴心聚气的真点,这样一来,寻龙的功夫也白费了。 准确地点正穴心,并不是一件容易的事,对初学者来说如此,就是久年经验老手,也常常点错点偏。 寻龙点穴旨在寻找龙气聚集之地,而现实中,我们也有类似需求,比如找的可能是人气聚集之地。 PolarDB 开源版 使用PostGIS 数据寻龙点穴(空间聚集分析)- 大数据与GIS分析解决线下店铺选址问题
382 0
PolarDB 开源版 使用PostGIS 数据寻龙点穴(空间聚集分析)- 大数据与GIS分析解决线下店铺选址问题
|
10月前
|
存储 并行计算 Cloud Native
PolarDB 开源版 通过rdkit 支撑生物、化学分子结构数据存储与计算、分析
PolarDB 的云原生存算分离架构, 具备低廉的数据存储、高效扩展弹性、高速多机并行计算能力、高速数据搜索和处理; PolarDB与计算算法结合, 将实现双剑合璧, 推动业务数据的价值产出, 将数据变成生产力. 本文将介绍PolarDB 开源版 通过rdkit 支撑生物、化学分子结构数据存储与计算、分析
290 0
相关产品
云数据库 Redis 版
云数据库 MongoDB 版
云数据库 RDS
推荐文章
更多