PostgreSQL 高并发任务分配系统 实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

标签

PostgreSQL , 高并发消费 , pg_try_advisory_xact_lock , 秒杀 , 任务分配


背景

给任务分配线程ID,或让线程去抢占任务执行,是任务分配系统中的基本需求。

目的是能够快速的消耗掉所有的任务,同又要保证两点:

1、所有任务都被领取。

2、每个任务只能被一个线程领取。

3、每个线程同一时间只能领取一个任务。

实际上在数据库中, 就是一个高并发的,实时更新系统,设计时要尽量避免冲突,提高处理吞吐。

PostgreSQL的UDF,advisory lock是一个很好的功能点,可以实现高并发、高可靠的任务分配。

其中,秒杀例子:

《HTAP数据库 PostgreSQL 场景与性能测试之 30 - (OLTP) 秒杀 - 高并发单点更新》

给任务分配唯一JAVA线程

例子1

功能描述:

有1000个java线程/进程,需要为具体的某个任务选举出一个master,并把选举结果写入到table中,记录任务ID与master线程/线程ID。

或者说:1每个任务的选举都被投票一次;2每个任务都只有一个master。

如果某个线程已经是某个任务的master,那这个线程/进程不参与选举。

1、JAVA线程与任务ID对应关系表

create table java_pool (  
  tid int primary key ,    -- JAVA 线程ID  
  taskid int unique        -- 任务 ID  
);  

2、插入1000个线程ID

insert into java_pool select generate_series(1,1000);  

3、输入任务ID,返回JAVA线程ID,表示这个任务分配给某个JAVA线程ID。

create or replace function set_master(v_taskid int) returns int as $$  
declare  
  res int;   
begin  
  -- set lock_timeout = '10 ms';  
  -- 使用adlock,消除唯一约束时的等待以及更新时的锁等待。  
  update java_pool set taskid=$1  
  where tid in  
  (select tid from java_pool where pg_try_advisory_xact_lock(tid) and pg_try_advisory_xact_lock($1) and taskid is null limit 1)  
  and pg_try_advisory_xact_lock($1)  
  returning tid into res;  
    
  return res;  
  
  exception when unique_violation then  
    return -1;  -- this task already set other tid  
  when others then  
    return -2;  -- lock timeout, other session is setting the same taskid.  
end;  
$$ language plpgsql strict;  

释放TID

update java_pool set taskid=null where tid=? and pg_try_advisory_xact_lock(?) and taskid is not null;  

4、压测

vi test.sql  
  
\set taskid random(100,100000)  
\set tid random(1,1000)  
select set_master(:taskid);  
update java_pool set taskid=null where tid=:tid and pg_try_advisory_xact_lock(:tid) and taskid is not null;  

5、压测结果

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T  120  
  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 6876049  
latency average = 0.558 ms  
latency stddev = 0.508 ms  
tps = 57298.823054 (including connections establishing)  
tps = 57301.470302 (excluding connections establishing)  

例子2

功能描述:

在一个table中,每一行记录了一个任务,需要把每个任务分配一个java执行线程/进程。

总的线程数/进程数多于任务数,并要求在table中记录当前任务分配到的线程/进程ID。

或者说,是多个java线程/进程需要争抢一个任务,需要某个方式实现: 1每个任务都被争抢到;2每个任务只被一个java线程/进程争抢到。

10万任务(已知)

100万线程(ID未知)

1、建表

create table task_pool (  
  taskid int primary key ,    -- 任务ID  
  tid int unique              -- JAVA 线程ID  
);  

2、插入10万任务ID

insert into task_pool select generate_series(1,100000);  

3、输入JAVA线程ID,返回任务ID。表示这个任务分配给某个JAVA线程ID。

create or replace function set_tid(v_tid int) returns int as $$  
declare  
  res int;  
begin  
  -- set lock_timeout = '10 ms';  
  update task_pool set tid=$1  
  where taskid in  
  (select taskid from task_pool where pg_try_advisory_xact_lock(taskid) and pg_try_advisory_xact_lock($1) and tid is null limit 1)  
  and pg_try_advisory_xact_lock($1)  
  returning taskid into res;  
    
  return res;  
  
  exception when unique_violation then  
    return -1;  -- this task already set other tid  
  when others then  
    return -2;  -- lock timeout, other session is setting the same taskid.  
end;  
$$ language plpgsql strict;  

4、压测

vi test.sql  
  
\set tid random(1,1000000)  
select set_tid(:tid);  

5、压测结果,约6秒分配完10万任务。

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
progress: 1.0 s, 42693.2 tps, lat 0.733 ms stddev 0.333  
progress: 2.0 s, 41831.7 tps, lat 0.765 ms stddev 0.285  
progress: 3.0 s, 38475.0 tps, lat 0.832 ms stddev 1.544  
progress: 4.0 s, 39560.5 tps, lat 0.809 ms stddev 0.276  
progress: 5.0 s, 36850.0 tps, lat 0.868 ms stddev 0.317  
progress: 6.0 s, 32344.5 tps, lat 0.989 ms stddev 0.720  
progress: 7.0 s, 16541.2 tps, lat 1.934 ms stddev 0.579  
progress: 8.0 s, 17078.0 tps, lat 1.875 ms stddev 0.575  

例子3

如果任务ID和JAVA 线程ID都不是预先生成的,那么同样可以使用类似的功能点提高并发和可靠性。

使用pg_try_advisory_xact_lock来提高并发,降低等待。

insert into tbl select $1,$2 where pg_try_advisory_xact_lock($1) and pg_try_advisory_xact_lock($2) returning *;  

根据结果判定是否锁定任务和JAVA线程ID
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
8月前
|
消息中间件 缓存 NoSQL
谈谈高并发系统的设计方法论
设计 `高并发` 系统,就是要让该系统保证它 `整体可用` 的同时,能够尽可能多的 `处理很高的并发用户请求`,能够 `承受很大的负载流量冲击`。
809 6
|
8月前
|
缓存 NoSQL 关系型数据库
|
6月前
|
消息中间件 算法 数据库
架构设计篇问题之商城系统高并发写的问题如何解决
架构设计篇问题之商城系统高并发写的问题如何解决
|
3月前
|
Java Go 云计算
Go语言在云计算和高并发系统中的卓越表现
【10月更文挑战第10天】Go语言在云计算和高并发系统中的卓越表现
|
5月前
|
监控 算法 Java
企业应用面临高并发等挑战,优化Java后台系统性能至关重要
随着互联网技术的发展,企业应用面临高并发等挑战,优化Java后台系统性能至关重要。本文提供三大技巧:1)优化JVM,如选用合适版本(如OpenJDK 11)、调整参数(如使用G1垃圾收集器)及监控性能;2)优化代码与算法,减少对象创建、合理使用集合及采用高效算法(如快速排序);3)数据库优化,包括索引、查询及分页策略改进,全面提升系统效能。
59 0
|
6月前
|
消息中间件 缓存 监控
如何设计一个秒杀系统,(高并发高可用分布式集群)
【7月更文挑战第4天】设计一个高并发、高可用的分布式秒杀系统是一个非常具有挑战性的任务,需要从架构、数据库、缓存、并发控制、降级限流等多个维度进行考虑。
166 1
|
6月前
|
设计模式 存储 缓存
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
70 0
|
7月前
|
自然语言处理 关系型数据库 数据库
技术经验解读:【转】PostgreSQL的FTI(TSearch)与中文全文索引的实践
技术经验解读:【转】PostgreSQL的FTI(TSearch)与中文全文索引的实践
90 0
|
8月前
|
算法
【数据结构与算法 11,高并发系统基础篇
【数据结构与算法 11,高并发系统基础篇
|
8月前
|
缓存 负载均衡 网络协议
作者推荐 | 高并发挑战?试试这些架构优化篇技巧,让你的系统焕发新生!
作者推荐 | 高并发挑战?试试这些架构优化篇技巧,让你的系统焕发新生!
445 1

热门文章

最新文章

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版