PostgreSQL dblink异步调用实践,跑并行多任务 - 例如开N个并行后台任务创建索引, 开N个后台任务跑若干SQL

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云数据库 RDS SQL Server,基础系列 2核4GB
简介: 标签PostgreSQL , 后台任务 , DBLINK 异步调用背景使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

标签

PostgreSQL , 后台任务 , DBLINK 异步调用


背景

使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

并行后台任务接口实现

接口效果:

select run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
);  

实现

1、创建dblink插件

create extension if not exists dblink;    

2、创建一个建立连接函数,不报错

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;    

3、创建跑多任务的接口函数

create or replace function run_sqls_parallel(   
  parallels int,    -- 并行度  
  sqls text[],      -- 需要执行的SQLs  
  conn text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 连接串     
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix   
  i int := 1;       -- 任务ID变量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i   
  sql text;       -- SQL 元素   
  current_conns int := 0;  -- 当前活跃的异步调用   
begin  
  -- 建立获取实时pg_stat_activity内容连接    
  perform conn(app_prefix_stat,  conn||app_prefix_stat);              
  
  foreach sql in array sqls  
  loop  
    -- 当前是否有空闲异步连接   
    select application_name into app_conn_name from   
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);   
    -- 有空闲异步连接  
    if found then  
      -- 消耗掉上一次异步连接的结果,否则会报错。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);      
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
      -- 发送异步DBLINK调用      
      perform dblink_send_query(app_conn_name, sql);   
      
    -- 无空闲异步连接  
    else  
      -- 当前已建立的异步连接数  
      select cn into current_conns from   
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);      
      loop  
        -- 达到并行度  
        if current_conns >= parallels then   
          -- 是否有空闲异步连接  
          select application_name into app_conn_name from   
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);   
          -- 有  
          if found then  
            -- 消耗掉上一次异步连接的结果,否则会报错。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
            -- 发送异步DBLINK调用      
            perform dblink_send_query(app_conn_name, sql);   
              
            -- 退出循环  
            exit;  
          -- 没有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未达到并行度  
        else  
          -- 建立连接  
          perform conn(app_prefix||i,  conn||app_prefix||i);             -- 建立连接。  
  
          -- 发送异步DBLINK调用      
          perform dblink_send_query(app_prefix||i, sql);   
            
          -- 连接suffix序号 递增  
          i := i+1;  
            
          -- 退出循环  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 当前已建立的异步连接数  
    select cn into current_conns from   
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);      
    if current_conns=0 then   
      raise notice 'whole tasks done.';  
      for app_conn_name in 
        select application_name from   
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)
      loop
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);   
      end loop;
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  

试用

1、运行5条SQL,开2个并行任务

select * from run_sqls_parallel(  
 2, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 30070.275 ms (00:30.070)  

2、运行10个并行任务,跑6条SQL

postgres=# select * from run_sqls_parallel(  
 10, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   

NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 10050.064 ms (00:10.050)  

完全符合预期。

3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
do language plpgsql $$    
declare    
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;        
  sqls text[];      
  tbs name := 'tbs1';    -- 索引表空间  
begin    
  set maintenance_work_mem='2GB';    
    
  foreach n in array tables loop    
    i := 1;      
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped   
    loop    
      -- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引  
      sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs);      -- 封装创建索引的SQL    
      sqls := array_append(sqls, sql);   
      i:=i+1;    
    end loop;    
  end loop;    
  
  perform * from run_sqls_parallel(  
    10,   -- 并行度  
    sqls  -- 执行index SQL数组  
  ) as t(a text);   
  
  foreach n in array tables loop    
    execute format('analyze %s', n);     
  end loop;    
end;    
$$;    

完全符合预期。

小结

本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。

接口效果:

select * from run_sqls_parallel (  
  参数1:并行度,   
  参数2:要后台并行执行的SQLs(数组呈现)     
)
as t(a text);

参考

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

《PostgreSQL AB表切换最佳实践 - 提高切换成功率,杜绝雪崩 - 珍藏级》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
2月前
|
SQL 存储 关系型数据库
如何巧用索引优化SQL语句性能?
本文从索引角度探讨了如何优化MySQL中的SQL语句性能。首先介绍了如何通过查看执行时间和执行计划定位慢SQL,并详细解析了EXPLAIN命令的各个字段含义。接着讲解了索引优化的关键点,包括聚簇索引、索引覆盖、联合索引及最左前缀原则等。最后,通过具体示例展示了索引如何提升查询速度,并提供了三层B+树的存储容量计算方法。通过这些技巧,可以帮助开发者有效提升数据库查询效率。
166 2
|
2月前
|
SQL 数据采集 自然语言处理
NL2SQL之DB-GPT-Hub<详解篇>:text2sql任务的微调框架和基准对比
NL2SQL之DB-GPT-Hub<详解篇>:text2sql任务的微调框架和基准对比
|
2月前
|
SQL 运维
Doris同一个SQL任务,前一天执行成功,第二天执行失败
Doris 动态分区 插入数据 同样的代码隔天运行一个成功一个失败
|
2月前
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
|
2月前
|
SQL 关系型数据库 MySQL
如何确认SQL用了索引:详细技巧与方法
在数据库管理中,索引是提高SQL查询性能的重要手段
|
3月前
|
SQL 存储 索引
SQL Server的Descending Indexes降序索引
【9月更文挑战第21天】在SQL Server中,降序索引允许指定列的排序顺序为降序,可显著优化涉及降序排序的查询性能,特别是在复合索引中。通过创建降序索引,可以更高效地满足特定业务需求,如按交易时间降序获取最新记录。然而,使用时需考虑查询频率、数据分布及维护成本,以确保最佳性能。
|
2月前
|
SQL 存储 关系型数据库
SQL默认索引是什么:深入解析与技巧
在SQL数据库中,索引是一种用于提高查询性能的重要数据结构
|
2月前
|
SQL 存储 关系型数据库
SQL默认索引是什么
在SQL数据库中,索引是一种用于提高查询性能的数据结构
|
2月前
|
SQL 关系型数据库 MySQL
如何确认SQL用了索引
在数据库管理和优化过程中,确认SQL查询是否使用了索引是一个至关重要的步骤
|
2月前
|
SQL 关系型数据库 MySQL
如何确认SQL查询是否使用了索引:详细步骤与技巧
在数据库管理和优化中,确认SQL查询是否有效利用了索引是提升性能的关键步骤

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版