PostgreSQL dblink异步调用实践,跑并行多任务 - 例如开N个并行后台任务创建索引, 开N个后台任务跑若干SQL

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介: 标签PostgreSQL , 后台任务 , DBLINK 异步调用背景使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

标签

PostgreSQL , 后台任务 , DBLINK 异步调用


背景

使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

并行后台任务接口实现

接口效果:

select run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
);  

实现

1、创建dblink插件

create extension if not exists dblink;    

2、创建一个建立连接函数,不报错

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;    

3、创建跑多任务的接口函数

create or replace function run_sqls_parallel(   
  parallels int,    -- 并行度  
  sqls text[],      -- 需要执行的SQLs  
  conn text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 连接串     
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix   
  i int := 1;       -- 任务ID变量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i   
  sql text;       -- SQL 元素   
  current_conns int := 0;  -- 当前活跃的异步调用   
begin  
  -- 建立获取实时pg_stat_activity内容连接    
  perform conn(app_prefix_stat,  conn||app_prefix_stat);              
  
  foreach sql in array sqls  
  loop  
    -- 当前是否有空闲异步连接   
    select application_name into app_conn_name from   
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);   
    -- 有空闲异步连接  
    if found then  
      -- 消耗掉上一次异步连接的结果,否则会报错。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);      
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
      -- 发送异步DBLINK调用      
      perform dblink_send_query(app_conn_name, sql);   
      
    -- 无空闲异步连接  
    else  
      -- 当前已建立的异步连接数  
      select cn into current_conns from   
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);      
      loop  
        -- 达到并行度  
        if current_conns >= parallels then   
          -- 是否有空闲异步连接  
          select application_name into app_conn_name from   
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);   
          -- 有  
          if found then  
            -- 消耗掉上一次异步连接的结果,否则会报错。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
            -- 发送异步DBLINK调用      
            perform dblink_send_query(app_conn_name, sql);   
              
            -- 退出循环  
            exit;  
          -- 没有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未达到并行度  
        else  
          -- 建立连接  
          perform conn(app_prefix||i,  conn||app_prefix||i);             -- 建立连接。  
  
          -- 发送异步DBLINK调用      
          perform dblink_send_query(app_prefix||i, sql);   
            
          -- 连接suffix序号 递增  
          i := i+1;  
            
          -- 退出循环  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 当前已建立的异步连接数  
    select cn into current_conns from   
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);      
    if current_conns=0 then   
      raise notice 'whole tasks done.';  
      for app_conn_name in 
        select application_name from   
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)
      loop
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);   
      end loop;
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  

试用

1、运行5条SQL,开2个并行任务

select * from run_sqls_parallel(  
 2, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 30070.275 ms (00:30.070)  

2、运行10个并行任务,跑6条SQL

postgres=# select * from run_sqls_parallel(  
 10, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   

NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 10050.064 ms (00:10.050)  

完全符合预期。

3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
do language plpgsql $$    
declare    
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;        
  sqls text[];      
  tbs name := 'tbs1';    -- 索引表空间  
begin    
  set maintenance_work_mem='2GB';    
    
  foreach n in array tables loop    
    i := 1;      
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped   
    loop    
      -- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引  
      sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs);      -- 封装创建索引的SQL    
      sqls := array_append(sqls, sql);   
      i:=i+1;    
    end loop;    
  end loop;    
  
  perform * from run_sqls_parallel(  
    10,   -- 并行度  
    sqls  -- 执行index SQL数组  
  ) as t(a text);   
  
  foreach n in array tables loop    
    execute format('analyze %s', n);     
  end loop;    
end;    
$$;    

完全符合预期。

小结

本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。

接口效果:

select * from run_sqls_parallel (  
  参数1:并行度,   
  参数2:要后台并行执行的SQLs(数组呈现)     
)
as t(a text);

参考

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

《PostgreSQL AB表切换最佳实践 - 提高切换成功率,杜绝雪崩 - 珍藏级》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
2月前
|
存储 SQL 关系型数据库
mysql底层原理:索引、慢查询、 sql优化、事务、隔离级别、MVCC、redolog、undolog(图解+秒懂+史上最全)
mysql底层原理:索引、慢查询、 sql优化、事务、隔离级别、MVCC、redolog、undolog(图解+秒懂+史上最全)
mysql底层原理:索引、慢查询、 sql优化、事务、隔离级别、MVCC、redolog、undolog(图解+秒懂+史上最全)
|
3月前
|
存储 监控 关系型数据库
B-tree不是万能药:PostgreSQL索引失效的7种高频场景与破解方案
在PostgreSQL优化实践中,B-tree索引虽承担了80%以上的查询加速任务,但因多种原因可能导致索引失效,引发性能骤降。本文深入剖析7种高频失效场景,包括隐式类型转换、函数包裹列、前导通配符等,并通过实战案例揭示问题本质,提供生产验证的解决方案。同时,总结索引使用决策矩阵与关键原则,助你让索引真正发挥作用。
209 0
|
5月前
|
SQL 存储 关系型数据库
SQL优化策略与实践:组合索引与最左前缀原则详解
本文介绍了SQL优化的多种方式,包括优化查询语句(避免使用SELECT *、减少数据处理量)、使用索引(创建合适索引类型)、查询缓存、优化表结构、使用存储过程和触发器、批量处理以及分析和监控数据库性能。同时,文章详细讲解了组合索引的概念及其最左前缀原则,即MySQL从索引的最左列开始匹配条件,若跳过最左列,则索引失效。通过示例代码,展示了如何在实际场景中应用这些优化策略,以提高数据库查询效率和系统响应速度。
174 10
|
6月前
|
SQL 索引
【YashanDB知识库】字段加上索引后,SQL查询不到结果
【YashanDB知识库】字段加上索引后,SQL查询不到结果
|
3月前
|
SQL 关系型数据库 PostgreSQL
CTE vs 子查询:深入拆解PostgreSQL复杂SQL的隐藏性能差异
本文深入探讨了PostgreSQL中CTE(公共表表达式)与子查询的选择对SQL性能的影响。通过分析两者底层机制,揭示CTE的物化特性及子查询的优化融合优势,并结合多场景案例对比执行效率。最终给出决策指南,帮助开发者根据数据量、引用次数和复杂度选择最优方案,同时提供高级优化技巧和版本演进建议,助力SQL性能调优。
252 1
|
6月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】yashandb执行包含带oracle dblink表的sql时性能差
【YashanDB知识库】yashandb执行包含带oracle dblink表的sql时性能差
|
7月前
|
SQL 关系型数据库 OLAP
云原生数据仓库AnalyticDB PostgreSQL同一个SQL可以实现向量索引、全文索引GIN、普通索引BTREE混合查询,简化业务实现逻辑、提升查询性能
本文档介绍了如何在AnalyticDB for PostgreSQL中创建表、向量索引及混合检索的实现步骤。主要内容包括:创建`articles`表并设置向量存储格式,创建ANN向量索引,为表增加`username`和`time`列,建立BTREE索引和GIN全文检索索引,并展示了查询结果。参考文档提供了详细的SQL语句和配置说明。
167 2
|
SQL UED
SQL Server并行死锁案例解析
原文:SQL Server并行死锁案例解析 并行执行作为提升查询响应时间,提高用户体验的一种有效手段被大家所熟知,感兴趣的朋友可以看我以前的博客SQL Server优化技巧之SQL Server中的"MapReduce", SQL Server优化器特性-位图过滤(Bitmap),然而正如我一直强调的,任何事物均有利弊,重点在于抉择.
1251 0
|
12月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
407 13

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多