标签
PostgreSQL , 高并发消费 , pg_try_advisory_xact_lock , 秒杀 , 任务分配
背景
给任务分配线程ID,或让线程去抢占任务执行,是任务分配系统中的基本需求。
目的是能够快速的消耗掉所有的任务,同又要保证两点:
1、所有任务都被领取。
2、每个任务只能被一个线程领取。
3、每个线程同一时间只能领取一个任务。
实际上在数据库中, 就是一个高并发的,实时更新系统,设计时要尽量避免冲突,提高处理吞吐。
PostgreSQL的UDF,advisory lock是一个很好的功能点,可以实现高并发、高可靠的任务分配。
其中,秒杀例子:
《HTAP数据库 PostgreSQL 场景与性能测试之 30 - (OLTP) 秒杀 - 高并发单点更新》
给任务分配唯一JAVA线程
例子1
功能描述:
有1000个java线程/进程,需要为具体的某个任务选举出一个master,并把选举结果写入到table中,记录任务ID与master线程/线程ID。
或者说:1每个任务的选举都被投票一次;2每个任务都只有一个master。
如果某个线程已经是某个任务的master,那这个线程/进程不参与选举。
1、JAVA线程与任务ID对应关系表
create table java_pool (
tid int primary key , -- JAVA 线程ID
taskid int unique -- 任务 ID
);
2、插入1000个线程ID
insert into java_pool select generate_series(1,1000);
3、输入任务ID,返回JAVA线程ID,表示这个任务分配给某个JAVA线程ID。
create or replace function set_master(v_taskid int) returns int as $$
declare
res int;
begin
-- set lock_timeout = '10 ms';
-- 使用adlock,消除唯一约束时的等待以及更新时的锁等待。
update java_pool set taskid=$1
where tid in
(select tid from java_pool where pg_try_advisory_xact_lock(tid) and pg_try_advisory_xact_lock($1) and taskid is null limit 1)
and pg_try_advisory_xact_lock($1)
returning tid into res;
return res;
exception when unique_violation then
return -1; -- this task already set other tid
when others then
return -2; -- lock timeout, other session is setting the same taskid.
end;
$$ language plpgsql strict;
释放TID
update java_pool set taskid=null where tid=? and pg_try_advisory_xact_lock(?) and taskid is not null;
4、压测
vi test.sql
\set taskid random(100,100000)
\set tid random(1,1000)
select set_master(:taskid);
update java_pool set taskid=null where tid=:tid and pg_try_advisory_xact_lock(:tid) and taskid is not null;
5、压测结果
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 6876049
latency average = 0.558 ms
latency stddev = 0.508 ms
tps = 57298.823054 (including connections establishing)
tps = 57301.470302 (excluding connections establishing)
例子2
功能描述:
在一个table中,每一行记录了一个任务,需要把每个任务分配一个java执行线程/进程。
总的线程数/进程数多于任务数,并要求在table中记录当前任务分配到的线程/进程ID。
或者说,是多个java线程/进程需要争抢一个任务,需要某个方式实现: 1每个任务都被争抢到;2每个任务只被一个java线程/进程争抢到。
10万任务(已知)
100万线程(ID未知)
1、建表
create table task_pool (
taskid int primary key , -- 任务ID
tid int unique -- JAVA 线程ID
);
2、插入10万任务ID
insert into task_pool select generate_series(1,100000);
3、输入JAVA线程ID,返回任务ID。表示这个任务分配给某个JAVA线程ID。
create or replace function set_tid(v_tid int) returns int as $$
declare
res int;
begin
-- set lock_timeout = '10 ms';
update task_pool set tid=$1
where taskid in
(select taskid from task_pool where pg_try_advisory_xact_lock(taskid) and pg_try_advisory_xact_lock($1) and tid is null limit 1)
and pg_try_advisory_xact_lock($1)
returning taskid into res;
return res;
exception when unique_violation then
return -1; -- this task already set other tid
when others then
return -2; -- lock timeout, other session is setting the same taskid.
end;
$$ language plpgsql strict;
4、压测
vi test.sql
\set tid random(1,1000000)
select set_tid(:tid);
5、压测结果,约6秒分配完10万任务。
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
progress: 1.0 s, 42693.2 tps, lat 0.733 ms stddev 0.333
progress: 2.0 s, 41831.7 tps, lat 0.765 ms stddev 0.285
progress: 3.0 s, 38475.0 tps, lat 0.832 ms stddev 1.544
progress: 4.0 s, 39560.5 tps, lat 0.809 ms stddev 0.276
progress: 5.0 s, 36850.0 tps, lat 0.868 ms stddev 0.317
progress: 6.0 s, 32344.5 tps, lat 0.989 ms stddev 0.720
progress: 7.0 s, 16541.2 tps, lat 1.934 ms stddev 0.579
progress: 8.0 s, 17078.0 tps, lat 1.875 ms stddev 0.575
例子3
如果任务ID和JAVA 线程ID都不是预先生成的,那么同样可以使用类似的功能点提高并发和可靠性。
使用pg_try_advisory_xact_lock来提高并发,降低等待。
insert into tbl select $1,$2 where pg_try_advisory_xact_lock($1) and pg_try_advisory_xact_lock($2) returning *;
根据结果判定是否锁定任务和JAVA线程ID