利用PostgreSQL的xmax实现无锁的并发队列处理

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: PostgreSQL的表里有几个系统隐藏列,xmax是其中一个,某些场景下我们可以利用PostgreSQL的xmax实现无锁的并发更新。本文介绍的消息或者任务队列的应用场景就是一例。

PostgreSQL的表里有几个系统隐藏列,xmax是其中一个,某些场景下我们可以利用PostgreSQL的xmax实现无锁的并发更新。本文介绍的消息或者任务队列的应用场景就是一例。

1. 场景和问题

当前台请求很频繁时,我们可能会把由此产生一些耗时而不紧急的任务作为后台作业延后处理,这样前台请求可以得到快速响应。
这些后台作业临时存放在一个表里,我们暂时称其为消息表,然后由后台进程处理这些消息。由于这个后台任务的工作量比较重,有时需要多个进程同时工作。这时需要考虑两个问题。

1)正确性
  消息不能被遗漏也不能被重复处理。
2)并发性能
  多个进程应避免争抢同一个消息。

2. 方案

2.1 方案1

为确保“正确性”,可以在获取消息时,用select ... for update给获得的消息加个锁,处理完把消息它删掉。这样 使用 select ... for update取消息时,如果该消息已被其它进程锁住它会等待,直到锁住 该消息的事务结束, 如果这条消息已被删除,那么 select ... for update会继续查找下一条消息。
难点是让“多个进程不争抢同一个消息”。为解决这个问题,可以用某种算法对消息划分子集,每个后台进程只处理特定的消息子集。
下面是个例子。

描述
有10个后台进程,每个后台进程分配一个0~9个编号。消息按照id对10取模,取模的值即其对应的后台进程的编号 。

数据定义

点击(此处)折叠或打开

  1. postgres=# create table msg(id int primary key,msg text);
  2. CREATE TABLE
  3. postgres=# insert into msg select id,id::text from generate_series(1,1000000) id;
  4. INSERT 0 1000000

消息处理
以编号为3的后台进程为例。

点击(此处)折叠或打开

  1. postgres=# begin;
  2. BEGIN
  3. postgres=# select id from msg where mod(id,10) = 3 order by id limit 1 for update;
  4.  id
  5. ----
  6.  3
  7. (1 row)

  8. postgres=# (后面的处理略)

2.2 方案2

方案1有很多缺陷。
1)后台进程数必须事先确定
2)每个进程必须提前知道自己的编号
3)不同后台进程的工作量可能不均匀
4)消息的顺序和处理顺序可能不一致

其实利用PostgreSQL特有的隐藏列xmax,可以有一种更好的解决方案。如下
  1. postgres=# begin;
  2. BEGIN
  3. postgres=# select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update;
  4.  id
  5. ----
  6.   2
  7. (1 row)

  8. postgres=# (后面的处理略)

下面解释一下。
xmax代表更新,删除或锁住( 使用for update )了该元组的事务。所以当xmax对应的事务还活着,并且这个事务不是自己,那么表示别的事务正在处理这个元组,只要通过where条件跳过这样的元组就可以避免和其他事务发生竞争了。
具体到这条SQL,主要是下面4个用”or“连接起来查询条件。
1)
  1. xmax = 0 or
从逻辑上讲,这个条件也可以不要。2)已经包含了1)的情况,但从性能上考虑还是需要的。
  
2)

  1. (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or
排除 事务快照中的所有活动事务。并且由于txid_snapshot_xip()取到的事务快照可能会有滞后,所以对大于等于 事务快照的xmax属性的未来事务也统统排除

3)

  1. (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or
条件2)中有个事务比较的逻辑,不过那里简单的使用”事务快照的xmin到xmax+1000之间发生事务回卷的更极端情况就不考虑了,即使在这种情况下,也不会遗漏消息处理,顶多多一次消息争用而已)

4)

  1. xmax::text::bigint = txid_current()
原本以为,事务快照中的活动事务会包含自身事务,但实测发现居然不包含自己。但是进而又发现, 自身事务的事务ID可能会比txid_snapshot_xip()取到的事务快照的xmax要大,所以必须要加上这个条件。

3. 验证

下面对方案2进行实测验证。

3.1 环境
测试环境为个人PC上的VMware虚拟机
PC
 CPU:Intel Core i5-3470 3.2G(4核)
 MEM:6GB
 SSD:OCZ-VERTEX4 128GB(VMware虚拟机所在磁盘,非系统盘)
 OS:Win7


VMware虚拟机
 CPU:4核
 MEM:1GB
 OS:CentOS 6.5
 PG:PostgreSQL 9.3.4(shared_buffers = 128MB,其他是默认值)

3.2  数据定义
postgres=# create table msg(id int primary key,msg text);
CREATE TABLE
postgres=# insert into msg select id,id::text from generate_series(1,1000000) id;
INSERT 0 1000000

3.3 消息处理
仅使用简单的消息删除进行测试,通过pgbench查看单并发和多并发时的消息处理性能。

3.3.1 不使用使用xmax

只通过for update加锁防止消息被重复处理。
  1. -bash-4.1$ cat test1.sql
  2. delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

1并发时,tps为641。
  1. -bash-4.1$ pgbench -n -r -c 1 -j 1 -t 1000 -p 5433 -f test1.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 1
  6. number of threads: 1
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 1000/1000
  9. latency average: 0.000 ms
  10. tps = 640.605552 (including connections establishing)
  11. tps = 641.918154 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     1.556288    delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

10并发时,tps为1714,比单并发提升2.67倍。
  1. -bash-4.1$ pgbench -n -r -c 10 -j 10 -t 1000 -p 5433 -f test1.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 10
  6. number of threads: 10
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 10000/10000
  9. latency average: 0.000 ms
  10. tps = 1702.379757 (including connections establishing)
  11. tps = 1714.727506 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     5.744313    delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

3.3.2 使用xmax

结合使用for update和xmax,既防止消息被重复处理又避免消息争用。

  1. -bash-4.1$ cat test2.sql
  2. delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

1并发时,tps为607。
  1. -bash-4.1$ pgbench -n -r -c 1 -j 1 -t 1000 -p 5433 -f test2.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 1
  6. number of threads: 1
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 1000/1000
  9. latency average: 0.000 ms
  10. tps = 607.107038 (including connections establishing)
  11. tps = 607.815535 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     1.643752    delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

10并发时,tps为2276,比单并发提升3.75倍。因为CPU是4核,所以这个提升幅度已经比较不错了。
虽然看似用了xmax后提升也不算太大(2276/1714=1.33倍),但是这个测试的消息处理只是简单的delete,如果处理复杂了,无锁方案的威力就更能显现出来了。
  1. -bash-4.1$ pgbench -n -r -c 10 -j 10 -t 1000 -p 5433 -f test2.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 10
  6. number of threads: 10
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 10000/10000
  9. latency average: 0.000 ms
  10. tps = 2268.052507 (including connections establishing)
  11. tps = 2276.312206 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     4.350373    delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

注1)以上测试的前后,通过执行"select count(*) from msg",验证被删除的记录数和执行的事务数相同,即没有发生同一个消息被2个进程重复处理的情况。
注2)如果要一次处理一批消息,可以修改limit值,并把delete语句中的"id = ..."改成"id in ..."
  1. delete from msg where id in (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 10 for update) returning *;


4. 总结

虽然有观点认为PostgreSQL的MVCC实现机制不如那个谁谁谁的好,但是活用好PG的MVCC,比如由MVCC带来的隐藏列xmax,有时也会带来意想不到的惊喜。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
关系型数据库 分布式数据库 数据库
沉浸式学习PostgreSQL|PolarDB 2: 电商高并发秒杀业务、跨境电商高并发队列消费业务
业务场景介绍: 高并发秒杀业务 秒杀业务在电商中最为常见, 可以抽象成热点记录(行)的高并发更新. 而通常在数据库中最细粒度的锁是行锁, 所以热门商品将会被大量会话涌入, 出现锁等待, 甚至把数据库的会话占满, 导致其他请求无法获得连接产生业务故障. 业务场景介绍: 高并发队列消费业务 在跨境电商业务中可能涉及这样的场景, 由于有上下游产业链的存在, 1、用户下单后, 上下游厂商会在自己系统中生成一笔订单记录并反馈给对方, 2、在收到反馈订单后, 本地会先缓存反馈的订单记录队列, 3、然后后台再从缓存取出订单并进行处理.
421 1
|
关系型数据库 测试技术 分布式数据库
PolarDB | PostgreSQL 高并发队列处理业务的数据库性能优化实践
在电商业务中可能涉及这样的场景, 由于有上下游关系的存在, 1、用户下单后, 上下游厂商会在自己系统中生成一笔订单记录并反馈给对方, 2、在收到反馈订单后, 本地会先缓存反馈的订单记录队列, 3、然后后台再从缓存取出订单并进行处理. 如果是高并发的处理, 因为大家都按一个顺序获取, 容易产生热点, 可能遇到取出队列遇到锁冲突瓶颈、IO扫描浪费、CPU计算浪费的瓶颈. 以及在清除已处理订单后, 索引版本未及时清理导致的回表版本判断带来的IO浪费和CPU运算浪费瓶颈等. 本文将给出“队列处理业务的数据库性能优化”优化方法和demo演示. 性能提升10到20倍.
830 4
|
存储 运维 Cloud Native
【实操系列】基于AnalyticDB PostgreSQL数据共享实现企业级跨多业务的敏捷分析
云数据仓库AnalyticDB PostgreSQL 版发布了最新自研的云原生架构实例,实现了跨实例间的数据共享能力。允许进行跨实例间的实时数据共享且无需进行数据迁移,可支持构建安全、高效、灵活的数据分析场景。本文介绍了依托数据共享实现云数仓跨多业务实例的敏捷数据分析方案;
【实操系列】基于AnalyticDB PostgreSQL数据共享实现企业级跨多业务的敏捷分析
|
SQL 存储 Oracle
PostgreSQL 事务隔离级别的实现和多版本并发控制|学习笔记
快速学习 PostgreSQL 事务隔离级别的实现和多版本并发控制
PostgreSQL 事务隔离级别的实现和多版本并发控制|学习笔记
|
关系型数据库 测试技术 PostgreSQL
postgresql实现影响分析
通过postgresql模仿分析假如城市发布通知,位于街道的人员是否受到了影响
124 0
postgresql实现影响分析
|
弹性计算 资源调度 运维
【实操系列】 AnalyticDB PostgreSQL发布实例计划管理功能,实现资源分时弹性&分时启停
本文将对AnalyticDB PostgreSQL产品的计划任务管理功能以及其背后的实现机制和最佳实践做详细介绍。
【实操系列】 AnalyticDB PostgreSQL发布实例计划管理功能,实现资源分时弹性&分时启停
|
存储 Kubernetes 负载均衡
「在 Kubernetes 上运行 Pgpool-Il」实现 PostgreSQL 查询(读)负载均衡和连接池
「在 Kubernetes 上运行 Pgpool-Il」实现 PostgreSQL 查询(读)负载均衡和连接池
356 0
「在 Kubernetes 上运行 Pgpool-Il」实现 PostgreSQL 查询(读)负载均衡和连接池
|
存储 SQL 关系型数据库
数据库误操作后悔药来了:AnalyticDB PostgreSQL教你实现分布式一致性备份恢复
本文将介绍AnalyticDB PostgreSQL版备份恢复的原理与使用方法。
884 0
数据库误操作后悔药来了:AnalyticDB PostgreSQL教你实现分布式一致性备份恢复
|
关系型数据库 MySQL 数据库
PostgreSQL数据库实现表字段的自增
PostgreSQL数据库实现表字段的自增
1894 0
|
Oracle 关系型数据库 数据库
关于PostgreSQL数据库兼容Oracle数据库闪回查询的实现方案
注:关于在PostgreSQL上面实现Oracle数据库的闪回功能(闪回查询 闪回表 闪回删除…)的这个想法已经有很长时间了,但是鉴于本人的能力 精力和身体条件 迟迟没有完成。期间也有很多的小伙伴跟我一起研究过这个功能,但是最终都因为各种各样的问题 没有做下去。Oracle数据库闪回功能跨越版本较大,功能也比较强大 在PostgreSQL数据库上实现,需要对数据库内核有很深入的理解 两大数据库不同的底层原理也终将影响各自的实现策略,PostgreSQL标记删除就地插入的特点和基于事务快照行可见性的特性是我们可以开发PostgreSQL闪回查询的大前提。本文主要介绍 实现闪回查询的 一种实现方案
328 0