作者
digoal
日期
2023-08-22
标签
PostgreSQL , PolarDB , 数据库 , 教学
背景
非常欢迎数据库用户提出场景给我, 在此issue回复即可, 一起来建设沉浸式数据库学习教学素材库, 帮助开发者用好数据库, 提升开发者职业竞争力, 同时为企业降本提效.
本文的实验可以使用永久免费的阿里云云起实验室来完成.
如果你本地有docker环境也可以把镜像拉到本地来做实验:
x86_64机器使用以下docker image:
Apple Chip机器使用以下docker image:
业务场景介绍: 高并发秒杀业务
秒杀业务在电商中最为常见, 可以抽象成热点记录(行)的高并发更新. 而通常在数据库中最细粒度的锁是行锁, 所以热门商品将会被大量会话涌入, 出现锁等待, 甚至把数据库的会话占满, 导致其他请求无法获得连接产生业务故障.
实现和对比
创建商品表, 测试扣减库存操作.
drop table IF EXISTS tbl; create unlogged table tbl ( -- 测试使用unlogged table减少redo id int primary key, -- 商品id cnt int, -- 库存 ts timestamp -- 修改时间 );
插入一条记录, 初始设置20亿库存.
insert into tbl values (1, 2000000000, now());
增加实验环境数据库最大连接数
postgres=# alter system set max_connections =2000; ALTER SYSTEM docker stop pg docker start pg docker exec -ti pg bash
传统方法 设计和实验
编写测试脚本, 扣件商品id=1的库存.
vi t1.sql update tbl set cnt=cnt-1, ts=now() where id=1;
使用1920个并发连接进行测试:
pgbench -M prepared -n -r -f ./t1.sql -P 1 -c 1920 -j 8 -T 120
结果:
transaction type: ./t1.sql scaling factor: 1 query mode: prepared number of clients: 1920 number of threads: 8 duration: 120 s number of transactions actually processed: 31875 latency average = 7729.207 ms latency stddev = 15626.227 ms initial connection time = 1784.073 ms tps = 230.270300 (without initial connection time) statement latencies in milliseconds: 7722.072 update tbl set cnt=cnt-1, ts=now() where id=1;
tps: 230.270300
PolarDB|PG新方法1 设计和实验
使用skip locked跳过被锁的行, 减少冲突等待时长.
编写测试脚本, 扣件商品id=1的库存. 使用skip locked跳过被锁的行, 减少等待. 如果能返回商品id, 表示更新成功, 如果返回0条记录, 表示没有拿到锁.
vi t1.sql with tmp as ( select id from tbl where id=1 for update skip locked ) update tbl set cnt=cnt-1, ts=now() from tmp where tbl.id=tmp.id returning tbl.id;
使用1920个并发连接进行测试:
pgbench -M prepared -n -r -f ./t1.sql -P 1 -c 1920 -j 8 -T 120
结果:
transaction type: ./t1.sql scaling factor: 1 query mode: prepared number of clients: 1920 number of threads: 8 duration: 120 s number of transactions actually processed: 2847721 latency average = 77.784 ms latency stddev = 138.536 ms initial connection time = 1703.949 ms tps = 23915.353056 (without initial connection time)
tps: 23915.353056
PolarDB|PG新方法2 设计和实验
清理环境垃圾:
vacuum full tbl;
编写测试脚本, 扣件商品id=1的库存. 同时使用pg_try_advisory_xact_lock预判是否能拿到商品id=1的锁, 如果能返回商品id, 表示更新成功, 如果返回0条记录, 表示没有拿到锁.
vi t2.sql update tbl set cnt=cnt-1, ts=now() where id=1 and pg_try_advisory_xact_lock(1) returning id;
使用1920个并发连接进行测试:
pgbench -M prepared -n -r -f ./t2.sql -P 1 -c 1920 -j 8 -T 120
结果:
transaction type: ./t2.sql scaling factor: 1 query mode: prepared number of clients: 1920 number of threads: 8 duration: 120 s number of transactions actually processed: 13917081 latency average = 12.053 ms latency stddev = 30.646 ms initial connection time = 1879.213 ms tps = 116928.030422 (without initial connection time) statement latencies in milliseconds: 12.056 update tbl set cnt=cnt-1, ts=now() where id=1 and pg_try_advisory_xact_lock(1) returning id;
tps: 116928.030422
对照
test case | 传统方法 tps | skip locked方法 tps | advisory lock方法 tps | 性能提升倍数 |
高并发秒杀(热点记录更新) | 230.270300 | 23915.353056 | 116928.030422 | 507.785982 |
业务场景介绍: 高并发队列消费业务
《高并发队列处理业务的数据库性能优化 - IO扫描|CPU计算浪费 , 锁冲突 , 垃圾索引扫描浪费》
在跨境电商业务中可能涉及这样的场景, 由于有上下游产业链的存在,
- 1、用户下单后, 上下游厂商会在自己系统中生成一笔订单记录并反馈给对方,
- 2、在收到反馈订单后, 本地会先缓存反馈的订单记录队列,
- 3、然后后台再从缓存取出订单并进行处理.
这个过程的核心流程:
- 1、高速写入队列、
- 2、从队列按先后顺序提取并高速处理、
- 3、从队列清除已处理订单记录.
如果是高并发的处理, 因为大家都按一个顺序获取, 容易产生热点, 可能遇到取出队列遇到锁冲突瓶颈、IO扫描浪费、CPU计算浪费的瓶颈. 以及在清除已处理订单后, 索引版本未及时清理导致的回表版本判断带来的IO浪费和CPU运算浪费瓶颈等.
- 文末的《打车与宇宙大爆炸的关系》一文有相似问题和优化方法, 思路类似.
接下来的实验将给出“队列处理业务的数据库性能优化”优化方法和demo演示. 性能提升10到20倍.
实现和对比
1、上游写入订单处理队列表
drop table if exists t_order_q; create unlogged table t_order_q ( id serial8 primary key, -- 自增主键 order_id uuid unique, -- 上游传递过来的订单号 cts timestamp not null -- 上游传递过来的订单创建时间 ); -- create index on t_order_q (cts); -- 如果按订单时间先后取出处理, 则需要创建时间字段索引. 本实验按自增主键顺序处理, 则不需要时间索引.
2、取出并处理后的订单状态表
drop table if exists t_order_u; create unlogged table t_order_u ( id serial8 primary key, -- 自增主键 order_id uuid unique, -- 上游传递过来的订单号 cts timestamp not null, -- 上游传递过来的订单创建时间 uts timestamp not null, -- 订单处理时间 status int not null -- 订单处理状态标记 );
3、写入100万条订单队列
truncate t_order_q; insert into t_order_q (order_id, cts) select gen_random_uuid(), clock_timestamp() from generate_series(1,1000000);
传统方法 设计和实验
1、写pgbench压测脚本, 从t_order_q队列取出一条订单信息, 然后处理这条订单信息, 并将处理结果插入到t_order_u处理结果表.
vi t1.sql begin; select id as vid from t_order_q order by id for update limit 1 \gset with tmp as (delete from t_order_q where id = :vid returning order_id, cts) insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp; end;
2、压测256个并发消耗队列, 平均每个连接处理3906个事务.
select 1000000/256.0; 3906.2500000000000
3、压测结果
pgbench -M extended -f ./t1.sql -n -r -P 1 -c 256 -j 8 -t 3906
transaction type: ./t1.sql scaling factor: 1 query mode: extended number of clients: 256 number of threads: 8 number of transactions per client: 3906 number of transactions actually processed: 999936/999936 latency average = 111.243 ms latency stddev = 125.890 ms initial connection time = 234.312 ms tps = 2280.326174 (without initial connection time)
tps: 2280.326174
PolarDB|PG新方法1 设计和实验
先重新生成测试数据.
1、写pgbench压测脚本, 从t_order_q队列取出一条订单信息, 使用skip locked跳过被其他会话正在处理的订单, 然后处理这条订单信息, 并将处理结果插入到t_order_u处理结果表.
vi t2.sql begin; select id as vid from t_order_q order by id for update skip locked limit 1 \gset with tmp as (delete from t_order_q where id = :vid returning order_id, cts) insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp; end;
2、压测结果
pgbench -M extended -f ./t2.sql -n -r -P 1 -c 256 -j 8 -t 3906
transaction type: ./t2.sql scaling factor: 1 query mode: extended number of clients: 256 number of threads: 8 number of transactions per client: 3906 number of transactions actually processed: 999936/999936 latency average = 65.596 ms latency stddev = 104.377 ms initial connection time = 234.029 ms tps = 3795.525190 (without initial connection time)
tps: 3795.525190
PolarDB|PG新方法2 设计和实验
先重新生成测试数据.
1、写pgbench压测脚本, 从t_order_q队列取出1条订单数据(并且使用ad lock对队列ID加事务锁, 判断是否正在处理, 事务结束自动释放ad lock. ad lock也经常被用于秒杀场景泄压.), 然后处理这条订单信息, 并将处理结果插入到t_order_u处理结果表.
vi t3.sql with tmp as (delete from t_order_q where ctid = (select ctid from t_order_q where pg_try_advisory_xact_lock(id) order by id limit 1) returning order_id, cts) insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp; - 或 - - begin; - select id as v_id from t_order_q where pg_try_advisory_xact_lock(id) order by id limit 1 \gset - with tmp as (delete from t_order_q where id = :v_id returning order_id, cts) - insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp; - end; - - 或(sleep 模拟应用拿到需要处理的订单后的应用端操作增加的耗时.) - - begin; - select id as v_id from t_order_q where pg_try_advisory_xact_lock(id) order by id limit 1 \gset - \sleep 10ms - with tmp as (delete from t_order_q where id = :v_id returning order_id, cts) - insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp; - end;
2、压测结果
pgbench -M extended -f ./t3.sql -n -r -P 1 -c 256 -j 8 -t 3906
transaction type: ./t3.sql scaling factor: 1 query mode: extended number of clients: 256 number of threads: 8 number of transactions per client: 3906 number of transactions actually processed: 999936/999936 latency average = 20.404 ms latency stddev = 45.780 ms initial connection time = 239.823 ms tps = 12283.493260 (without initial connection time)
tps: 12283.493260
PolarDB|PG新方法3 设计和实验
先重新生成测试数据. 使用分区表, 每次从1个分区队列获取订单, 从物理层面进一步减少IO 和 CPU 浪费.
1、上游写入订单处理队列表
drop table if exists t_order_q; create unlogged table t_order_q ( id serial8 primary key, -- 自增主键 order_id uuid , -- 上游传递过来的订单号 cts timestamp not null -- 上游传递过来的订单创建时间 ) PARTITION BY hash (id) ; -- create index on t_order_q (cts); -- 如果按订单时间先后取出处理, 则需要创建时间字段索引. 本实验按自增主键顺序处理, 则不需要时间索引. do language plpgsql $$ declare x int := 256; begin for i in 0..x-1 loop execute format ($_$create unlogged table t_order_q_%s PARTITION OF t_order_q FOR VALUES WITH (MODULUS %s, REMAINDER %s);$_$, i, x, i ); end loop; end $$;
2、取出并处理后的订单状态表
drop table if exists t_order_u; create unlogged table t_order_u ( id serial8 primary key, -- 自增主键 order_id uuid unique, -- 上游传递过来的订单号 cts timestamp not null, -- 上游传递过来的订单创建时间 uts timestamp not null, -- 订单处理时间 status int not null -- 订单处理状态标记 );
3、写入100万条订单队列
truncate t_order_q; insert into t_order_q (order_id, cts) select gen_random_uuid(), clock_timestamp() from generate_series(1,1000000);
4、编写压测函数, 在函数中根据pgbench client_id来取模选择对应的表分区进行处理. (在实际的应用中也可以使用类似手段.)
《PostgreSQL Oracle 兼容性之 - DBMS_SQL(存储过程动态SQL中使用绑定变量-DB端prepare statement)》
create or replace function dyn_pre_orders(partitions int, client_id int) returns int8 as $$ declare suffix int := mod(client_id, partitions); begin execute format('execute p%s(%s)', suffix, client_id); return 0; exception when others then execute format('prepare p%s(int) as with tmp as (delete from t_order_q_%s where ctid = (select ctid from t_order_q_%s where pg_try_advisory_xact_lock(id) order by id limit 1) returning order_id, cts) insert into t_order_u (order_id,cts,uts,status) select tmp.order_id, tmp.cts, now(), 1 from tmp;', suffix, suffix, suffix); execute format('execute p%s(%s)', suffix, client_id); return 0; end; $$ language plpgsql strict;
5、写pgbench压测脚本.
vi t4.sql \set clients 256 select dyn_pre_orders(:clients, :client_id);
6、压测结果
pgbench -M extended -f ./t4.sql -n -r -P 1 -c 256 -j 8 -t 3906
transaction type: ./t4.sql scaling factor: 1 query mode: extended number of clients: 256 number of threads: 8 number of transactions per client: 3906 number of transactions actually processed: 999936/999936 latency average = 4.400 ms latency stddev = 12.287 ms initial connection time = 250.656 ms tps = 49892.858208 (without initial connection time) statement latencies in milliseconds: 0.000 \set clients 256 4.400 select dyn_pre_orders(:clients, :client_id);
tps: 49892.858208
对照
test case | 传统方法 tps | skip locked方法 tps | advisory lock方法 tps | advisory lock + 分区方法 tps | 性能提升倍数 |
高并发秒杀(热点记录更新) | 2280.326174 | 3795.525190 | 12283.493260 | 49892.858208 | 21.9 |
知识点
1、advisory lock:
轻量锁, 根据机器性能的不同, 每秒预计可处理50万次左右的轻量锁请求.
轻量锁分为共享、独占、无堵塞尝试等请求, 同时持有锁的范围分为会话或事务周期.
例如本例用到的 pg_try_advisory_xact_lock 表示事务级别尝试请求ID为N的轻量锁, 如果没有其他事务持有则返回true, 否则返回false.
更多信息请参考末尾文章.
2、select for update skip locked
跳过被锁的行, 与advisory lock功能相似, 但是实现方法不一样, skip locked经过的逻辑比advisory lock长(skip locked要经过tuple search, 已经到表内部. 而advisory lock在搜索tuple之前已经完成了判断), 所以效率与之相比偏低.
思考
1、为什么第一种方法会这么慢呢?
1920个连接中, 同一时刻只有1个连接能持有行锁, 它更新完释放锁, 然后等待队列中的第一个会话得到锁后进行更新. 因此始终有1919个连接处于等待状态, 处于等待队列中的第1920个会话要等的时间最长.
在实际秒杀场景中, 库存可能没有这么多, 所以等待都是浪费掉的, 因为等拿到锁可以更新的时候, 库存都已经变成0了, 前面等于白白占用了会话连接和数据库资源.
这种场景最容易引起数据库雪崩.
2、对于业务场景2, 还有什么方法提升性能?
2.1、减少浪费的IO和cpu计算:
- 在并发的情况下, order by id limit 1需要扫描若干行, 而不是1行, 因为可能有些ID已经被ad lock touch了, 浪费的pg_try_advisory_xact_lock() cpu ops计算次数约等于 n + n-1 + n-2 + ... + n-n, 浪费的IO约等于N.
优化方法:
- 固定N个链接, 按ID hash mod 取不同的数据分片, 从而减少浪费的IO和cpu计算.
- 或者将队列表拆分成几个分区表, 入库的时候 按id hash mode, 每个分区分配给不同的进程取数, 从而减少冲突和浪费的扫描提高并发.
2.2、提高index vacuum的频率, 减少因没有index version导致的垃圾数据判断带来的cpu和回表的IO浪费. 提升autovacuum_work_mem, 容纳下所有dead tuple ctid避免多次扫描index.
优化方法:
- 配置参数autovacuum_naptime、autovacuum_work_mem(或者老版本 maintenance_work_mem)即可.
2.3、使用并行vacuum, 配置max_parallel_maintenance_workers.
2.4、配置vacuum使用prefetch blocks, 减少io delay带来的vacuum 比较久的问题. (适合 单次IO delay较高, 但是吞吐没有瓶颈的云盘)
2.5、一次取出多条, 批量处理.
2.6、使用IOPS较高, 单次IO delay较低的本地nvme SSD.
更多请参考末尾文章.