阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

标签

PostgreSQL , oss对象存储 , 阿里云RDS PG , 并行写 , dblink , 异步调用 , 异步任务监控 , OSS外部表 , 数据传输


背景

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 数据并行导出到OSS》

本文为从OSS并行导入数据到数据库中。

请先阅读:

RDS PG OSS 外部表文档1

RDS PG OSS 外部表文档2

原文

https://www.atatech.org/articles/98990

一.准备工作

首先,创建我们要用到的插件。

create extension dblink;  
create extension oss_fdw;  

二.创建异步存储过程

异步数据装载的准备工作,获取oss文件列表

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)  
  RETURNS bool AS  
$BODY$  
DECLARE  
    t_exist  int;  
    curs1 refcursor;  
    r   record;  
    filepath text;  
    fileindex int8;  
    s1 text;  
    s2 text;  
    s3 text;  
    c int = 0;  
    s4 text;  
    s5 text;  
    ss4 text;  
    ss5 text;  
    sql text;  
BEGIN  
    create table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);  
  
    select count(*) into t_exist from oss_fdw_load_status;  
  
    if t_exist != 0 then  
        RAISE NOTICE 'oss_fdw_load_status not empty';  
        return false;  
    end if;  
  
    -- 通过 oss_fdw_list_file 函数,把外部表 t_from 匹配的 OSS 中的文件列到表中  
    insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;  
  
    select count(*) into t_exist from oss_fdw_load_status;  
    if t_exist = 0 then  
        RAISE NOTICE 'oss_fdw_load_status empty,not task found';  
        return false;  
    end if;  
  
    return true;  
END;  
$BODY$  
    LANGUAGE plpgsql;  

数据装载的工作函数

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)  
  RETURNS bool AS  
$BODY$  
DECLARE  
    t_exist  int;  
    curs1 refcursor;  
    r   record;  
    filepath text;  
    fileindex int8;  
    s1 text;  
    s2 text;  
    s3 text;  
    c int = 0;  
    s4 text;  
    s5 text;  
    ss4 text;  
    ss5 text;  
    sql text;  
    db text;  
    user text;  
BEGIN  
    select count(*) into t_exist from oss_fdw_load_status;  
    if t_exist = 0 then  
        RAISE NOTICE 'oss_fdw_load_status empty';  
        return false;  
    end if;  
  
    s4 = 'oss_loader';  
    s5 = 'idle';  
    ss4 = '''' || s4 ||'''';  
    ss5 = '''' || s5 ||'''';  
    sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;  
  
    select current_database() into db;  
    select current_user into user;  
  
    -- 通过游标,不断获取单个任务  
    OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;  
    loop  
        fetch curs1 into r;  
        if not found then  
            exit;  
        end if;  
        fileindex = r.id;  
        filepath = r.filename;  
  
        s1 = '''' || t_from ||'''';  
        s2 = '''' || t_to ||'''';  
        s3 = '''' || filepath ||'''';  
  
        LOOP  
            -- 查看当前正在工作的任务数,过达到并发数就在这里等待  
            select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
            IF c < num_work THEN  
                EXIT;  
            END IF;  
            RAISE NOTICE 'current runing % loader', c;  
            perform pg_sleep(1);  
        END LOOP;  
  
        -- 通过 DBLINK 创建异步任务  
        perform dis_conn('oss_loader_'||fileindex);  
        perform dblink_connect('oss_loader_'||fileindex, 'dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);  
        perform dblink_send_query('oss_loader_'||fileindex, format('  
            begin;  
            select rds_oss_fdw_load_single_file(%s,%s,%s,%s);  
            end;'  
            , fileindex, s1, s2, s3)  
        );  
        RAISE NOTICE 'runing loader task % filename %',fileindex, filepath;  
    end loop;  
    close curs1;  
  
    -- 任务分配完成,等待所有任务完成  
    LOOP  
        select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);  
        IF c = 0 THEN  
            EXIT;  
        END IF;  
        RAISE NOTICE 'current runing % loader', c;  
        perform pg_sleep(1);  
    END LOOP;  
  
    return true;  
END;  
$BODY$  
    LANGUAGE plpgsql;  

单个文件的数据装载函数(设置不同的会话参数oss_fdw.rds_read_one_file,可以读取不同的OSS文件,用完重置)

CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)  
  RETURNS void AS  
$BODY$  
DECLARE  
    rowscount int8 = 0;  
    current text;  
    sql text;  
BEGIN  
    -- 配置 GUC 参数,指定要导入的 OSS 上的文件  
    perform set_config('oss_fdw.rds_read_one_file',filepath,true);  
    select current_setting('oss_fdw.rds_read_one_file') into current;  
    RAISE NOTICE 'begin load %', current;  
  
    -- 通过动态 SQL 导入数据  
    EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;  
    GET DIAGNOSTICS rowscount = ROW_COUNT;  
  
    -- 导入完成后,把结果保存到状态表中  
    RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;  
    update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;  
    return;  
  
EXCEPTION  
    when others then  
    RAISE 'run rds_oss_fdw_load_single_file with error';  
END;  
$BODY$  
    LANGUAGE plpgsql;  

关闭连接不报错函数

create or replace function dis_conn(name) returns void as $$    
declare    
begin    
  perform dblink_disconnect($1);    
  return;    
exception when others then    
  return;    
end;    
$$ language plpgsql strict;    

三.使用函数装载数据

1、 创建本地表(目标表)结构

2、 将包含数据的文件写入OSS

3、 在RDS PG中创建OSS外部表

4、 准备需要并行导入的列表

select rds_oss_fdw_load_data_prepare('oss_table','lineitem');  

执行后,会看到表 oss_fdw_load_status 中,保存了准备导入的所有文件列表,用户可以做适当的删减定制。

4、 数据装载

 select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');  

函数 rds_oss_fdw_load_data_execute 会等待数据导入的完成才返回。

5、 查询状态
期间,我们可以通过下列 SQL 查看正在工作的异步会话状态

 select application_name, state, pid,query, now() - xact_start as xact  from pg_stat_activity where state != 'idle' and application_name='oss_loader' order by xact desc;  

6、 管理状态

同时,我们也可以随时中断数据导入工作

select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';  

7、 查看进度

我们也很容易看到整个数据装载的进度(单位 MB)

select  
(  
select sum(size)/1024/1024 as complete from oss_fdw_load_status where status = 1  
)a,  
(  
select sum(size)/1024/1024 as full from oss_fdw_load_status  
)b;  

8、 性能

使用 TPCC 100GB的数据进行装载测试,耗时 10 分钟,平均 170MB/S

select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');  
  
select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');  
  
select sum(size)/1024/1024 from oss_fdw_load_status;  
      ?column?        
--------------------  
 22561.919849395752  
(1 row)  
  
select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';  
 pg_size_pretty   
----------------  
 101 GB  
(1 row)  

性能极限扩展

为了提高本地加载速度,用户可以这么做:

1、目标表选择UNLOGGED TABLE,注意如果选择unlogged table,那么数据库崩溃后unlogged table的数据会被清除,并且请记住备库看不到unlogged table的数据。

除非你的数据是定期全量覆盖的,否则不建议用unlogged table来加速。

create unlogged table xxx (xx xx);  

2、选择多个目标表

由于单个目标表,在INDEX LOCK,在EXTEND BLOCK方面都有一定的局限性,为了达到极限,可以使用多个目标表。例如每一批OSS文件对应一个本地表分区。

-- 并行  
insert into tbl1 select * from oss_tbl1;  
insert into tbl2 select * from oss_tbl2;  
.....  
insert into tblx select * from oss_tblx;  

3、导入前关闭目标表的表级autovacuum

autovacuum会影响导入性能,因为它要消耗一定的IO。

alter table tbl_dict set (autovacuum_enabled =off);  
  
alter table tbl_dict set (toast.autovacuum_enabled =off);  

4、导入后再开启目标表的autovacuum,收集统计信息

alter table tbl_dict set (autovacuum_enabled =on);  
  
alter table tbl_dict set (toast.autovacuum_enabled =on);  

5、后创建索引(可以并行)

索引可以同时创建(单个表的多个索引可以同时创建,不会相互锁等待。多个表的多个索引也可以同时创建),如果创建索引过程中不需要执行DML,建议不要开启concurrently选项,否则建议开启。

同时创建,可以提高资源利用率,达到快速完成数据导入加索引创建的目标。

云端相关产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

RDS PG OSS 外部表文档1

RDS PG OSS 外部表文档2

HDB PG OSS 外部表文档

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 数据并行导出到OSS》

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
4月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
2月前
|
安全 关系型数据库 数据库
阿里云RDS PostgreSQL版支持 PG17,还不来体验?
PostgreSQL被誉为最先进的开源数据库,具有强大的扩展性和灵活架构。9月26日,社区官方正式发布了PostgreSQL 17.0版本,在性能、逻辑复制、开发者体验等方面进行了优化。阿里云RDS PostgreSQL 版已支持 PostgreSQL 17.0,并在社区17.0基础上,进行了安全、成本、可运维性等多方面提升,增加多种内核特性及插件特性。
|
3月前
|
弹性计算 关系型数据库 MySQL
新一期陪跑班开课啦!阿里云专家手把手带你体验RDS通用云盘核心能力
本次课程将手把手带领用户创建一个云数据库RDS MySQL(通用云盘),并通过云服务器ECS对RDS MySQL实例进行压测,体验IO加速和IO突发带来的性能提升;并通过DMS执行DDL,将数据归档到OSS,再结合云盘缩容,体验数据归档带来的成本优势。
|
3月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
3月前
|
关系型数据库 数据库 数据安全/隐私保护
"告别繁琐!Python大神揭秘:如何一键定制阿里云RDS备份策略,让数据安全与效率并肩飞,轻松玩转云端数据库!"
【8月更文挑战第14天】在云计算时代,数据库安全至关重要。阿里云RDS提供自动备份,但标准策略难以适应所有场景。传统手动备份灵活性差、管理成本高且恢复效率低。本文对比手动备份,介绍使用Python自定义阿里云RDS备份策略的方法,实现动态调整备份频率、集中管理和智能决策,提升备份效率与数据安全性。示例代码演示如何创建自动备份任务。通过自动化与智能化备份管理,支持企业数字化转型。
78 2
|
4月前
|
关系型数据库 Serverless 数据库
函数计算产品使用问题之如何访问阿里云的RDS
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 运维 安全
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
174 2
|
4月前
|
存储 API 开发工具
阿里云OSS
【7月更文挑战第19天】阿里云OSS
155 1
|
4月前
|
人工智能 对象存储
【阿里云AI助理】自家产品提供错误答案。阿里云OSS 资源包类型: 下行流量 地域: 中国内地通用 下行流量包规格: 300 GB 套餐: 下行流量包(中国内地) ,包1年。那么这个是每月300GB,1年是3600GB的流量;还是1年只有300GB的流量?
自家产品提供错误答案。阿里云OSS 资源包类型: 下行流量 地域: 中国内地通用 下行流量包规格: 300 GB 套餐: 下行流量包(中国内地) ,包1年。那么这个是每月300GB,1年是3600GB的流量;还是1年只有300GB的流量?
122 1

相关产品

  • 云数据库 RDS
  • 云数据库 RDS PostgreSQL 版
  • 云数据库 RDS MySQL 版