在数据库中跑后台长任务

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

标签

PostgreSQL , dblink , 长任务


背景

如果业务上需要在数据库中跑LONG SQL,并且不希望跑的过程中因为窗口断开,导致数据库任务用户主动cancel query。有什么方法?

使用DBLINK异步调用是不错的方法,相当于数据库内部建立了连接在后台跑。

方法

1、创建任务表,方便观察任务状态

create table tbl_task (  
  id serial8 primary key,  -- 任务ID  
  client_info jsonb,       -- 客户端描述(usename, datname, search_path, client_addr, client_port)  
  sql text,                -- SQL信息  
  start_time timestamp,    -- 开始时间  
  end_time timestamp default clock_timestamp(),  -- 结束时间  
  info text                -- 描述信息  
);  

2、创建dblink 插件

create extension dblink;  

3、创建生成dblink连接的函数,重复创建不报错。

create or replace function conn(    
  name,   -- dblink名字    
  text    -- 连接串,URL    
) returns void as $$      
declare      
begin      
  perform dblink_connect($1, $2);     
  return;      
exception when others then      
  return;      
end;      
$$ language plpgsql strict;    

4、创建异步调用封装函数

create or replace function run_task(    
  sql text,       -- 要执行的SQL  
  info text,             -- 任务描述  
  conn_name name  default 'link_for_task', -- dblink 名字  
  conn text  default format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task', '127.0.0.1', current_setting('port'), current_user, current_database())  ,    -- 连接串   
  client_info jsonb   default format('{"client_addr":"%s", "client_pot":"%s", "search_path":"%s", "usename":"%s", "datname":"%s"}', inet_client_addr(), inet_client_port(), replace(current_setting('search_path'),'"','\"'), current_user, current_database())::jsonb   -- 客户端信息  
) returns void as $$     
declare    
begin    
  perform conn(conn_name,  conn);             -- 连接。       
    -- perform dblink_get_result(conn_name);    -- 消耗掉上一次异步连接的结果,否则会报错。      
      
  -- 发送异步DBLINK调用    
  perform dblink_send_query(conn_name, sql||format('; insert into tbl_task(client_info,sql,start_time,info) values (%L, %L, %L, %L);', client_info, sql, clock_timestamp(),info ));      
end;    
$$ language plpgsql strict;    

5、调用异步调用封装函数

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call'       -- 任务描述  
);  

6、查看当前正在跑的后台任务

postgres=# select * from pg_stat_activity where application_name='run_task';  
-[ RECORD 1 ]----+---------------------------------------------------------------------------------------------  
datid            | 13285  
datname          | postgres  
pid              | 1510  
usesysid         | 10  
usename          | postgres  
application_name | run_task  
client_addr      | 127.0.0.1  
client_hostname  |   
client_port      | 55088  
backend_start    | 2018-06-21 18:04:20.964586+08  
xact_start       |   
query_start      | 2018-06-21 18:04:20.967177+08  
state_change     | 2018-06-21 18:04:20.969363+08  
wait_event_type  | Client  
wait_event       | ClientRead  
state            | idle  
backend_xid      |   
backend_xmin     |   
query            | select count(*) from test; insert into tbl_task(client_info,sql,start_time,info) values (E'{"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\\"$user\\", public"}', 'select count(*) from test', '2018-06-21 18:04:20.967118+08', 'test dblink async call')  
backend_type     | client backend  

7、查看任务状态

postgres=# select * from tbl_task;  
-[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------  
id          | 1  
client_info | {"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\"$user\", public"}  
sql         | select count(*) from test  
start_time  | 2018-06-21 18:07:39.60439  
end_time    | 2018-06-21 18:07:47.331041  
info        | test dblink async call  

如果是数据库普通用户调用,不支持trust认证

如果是普通用户,请使用密码认证,同时请务必保障pg_hba.conf使用的是密码认证。

比如阿里云RDS PPAS的用户:

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call' ,      -- 任务描述  
  'link_task', 
  format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task, password=%s', '127.0.0.1', current_setting('port'), current_user, current_database(), '当前用户密码')  
);  

如果是阿里云RDS PG 9.4的用户,内核做过修改,请使用如下方法

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call' ,      -- 任务描述  
  'link_task', 
  format('user=%s dbname=%s application_name=run_task, password=%s', current_user, current_database(), '密码')  
);  

注意

1、DBLINK异步连接会占用连接,算连接数的。

2、单个DBLINK连接,如果异步调用的SQL没有执行完,不能发起第二次请求。

NOTICE:  could not send query: another command is already in progress

那么你需要看看这个DBLINK的后台任务是否还在执行(通过前面查询pg_stat_activity的方法, 执行完state状态为idle),如果执行完了,那么执行以下SQL取一下结果,就可以继续使用这个DBLINK NAME发送异步请求了。

select * from dblink_get_result('link_for_task') as t(id text);

或者,你可以不用等这个DBLINK的异步任务执行完,马上想发起另一个异步任务,那么你需要使用一个新的DBLINK NAME。

select run_task(
  $$select count(*) from test where c1='abc'$$,   -- 要RUN的SQL
  'test dblink async call',       -- 任务描述
  'new_dblink_name'   -- 有别于前面已使用的DBLINK NAME
);

3、单个DBLINK连接,如果异步调用的SQL执行完了,调用dblink_get_result后,才能发起第二次请求。

NOTICE:  could not send query: another command is already in progress

4、单个DBLINK连接,如果异步调用的SQL没有执行完,调用dblink_get_result时,会等待异步调用执行完,才会有返回。等待过程中堵塞当前调用dblink_get_result的会话。

5、调用DBLINK异步接口的会话如果断开了,那么它发起的dblink异步调用后台任务执行完成后,连接会自动释放。

6、如果SQL中包含单引号,可以使用转义的写法,也可以使用没有符号的写法,不需要转义。

select run_task(
  $$select count(*) from test where c1='abc'$$,   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

select run_task(
  'select count(*) from test where c1=''abc''',   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

select run_task(
  E'select count(*) from test where c1=\'abc\'',   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

美元符号内可以输入任意个字符,成对出现即可。

select run_task(
  $_qqq_$select count(*) from test where c1='abc'$_qqq_$,   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

参考

https://www.postgresql.org/docs/10/static/dblink.html

《PostgreSQL 批量导入性能 (采用dblink 异步调用)》

目录
相关文章
|
1月前
|
关系型数据库 MySQL Java
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
65 0
|
6月前
|
SQL 数据库 开发工具
实时计算 Flink版产品使用合集之数据库中有新增索引,同步任务没有报错,索引的变动是否有影响
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
11天前
|
缓存 NoSQL 数据库
运用云数据库 Tair 构建缓存为应用提速,完成任务得苹果音响、充电套装等好礼!
本活动将带大家了解云数据库 Tair(兼容 Redis),通过体验构建缓存以提速应用,完成任务,即可领取罗马仕安卓充电套装,限量1000个,先到先得。邀请好友共同参与活动,还可赢取苹果 HomePod mini、小米蓝牙耳机等精美好礼!
|
6月前
|
DataWorks 关系型数据库 调度
DataWorks操作报错合集之DataWorks 数据库同步任务中,如果遇到表情符报错的现象,怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
65 0
|
3月前
|
存储 SQL 数据库
【计算机三级数据库技术】第8章 数据库后台编程技术--附思维导图
本文介绍了数据库后台编程的关键技术,包括存储过程、用户定义函数、触发器和游标,并附有思维导图。
32 1
|
3月前
|
运维 前端开发 Serverless
中后台前端开发问题之降低数据库使用门槛和运维成本如何解决
中后台前端开发问题之降低数据库使用门槛和运维成本如何解决
30 0
|
3月前
|
运维 关系型数据库 MySQL
在Linux中,MySQL数据库日常运维中涉及哪些关键任务?
在Linux中,MySQL数据库日常运维中涉及哪些关键任务?
|
6月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之从Oracle数据库同步数据时,checkpoint恢复后无法捕获到任务暂停期间的变更日志,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 关系型数据库 数据库
实时计算 Flink版操作报错合集之在使用RDS数据库作为源端,遇到只能同步21个任务,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。