PostgreSQL 相似搜索分布式架构设计与实践 - dblink异步调用与多机并行(远程 游标+记录 UDF实例)

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

标签

PostgreSQL , 多表并行 , 多机并行 , dblink , 异步调用 , 相似搜索


背景

背景请参考如下:

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

当需要进行相似搜索的数据量大于单机处理能力时,我们需要水平拆分来提高搜索能力。

或者可以使用阿里云的PolarDB for PG的产品(类似ORACLE RAC,支持增加计算节点)。比水平分库的好处是数据是共享存储的,不需要拆分。

回到水平分库的场景,如果我们把数据库拆成了多个,那么,如何让查询并行起来呢?

用DBLINK异步调用,可以让查询并行起来。架构设计如下:

pic

实际上采用DBLINK异步调用实现并行的例子很多:

《PostgreSQL dblink异步调用实现 并行hash分片JOIN - 含数据交、并、差 提速案例》

进入正题,下面是一个DEMO,按部就班的演示如何使用异步调用实现多库并行相似搜索。

DEMO

1、我们这里使用本地的4个DB来代表远程数据库,这4个DB完全可以安装到远程。这里只是为了测试方便。

本地库名:

postgres  

远程库名:

db0  
db1  
db2  
db3  

2、首先需要创建用户和测试DB

create role test login encrypted password 'secret';  
create database db0 with owner test;  
create database db1 with owner test;  
create database db2 with owner test;  
create database db3 with owner test;  

3、在本地某个库中创建dblink插件

create extension dblink;  

4、创建连接远程库的SERVER

CREATE SERVER db0 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db0');  
CREATE SERVER db1 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db1');  
CREATE SERVER db2 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db2');  
CREATE SERVER db3 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db3');  

5、配置连接远程库的用户密码(用户密码都是远程库的,可不是本地的哦,你想用本地用户连远程库,没门)

CREATE USER MAPPING FOR postgres SERVER db0 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db1 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db2 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db3 OPTIONS (user 'test', password 'secret');  

远程库操作

在所有远程库上创建测试表,灌入测试数据,创建相似搜索函数。(注意下面的脚本需要调整好对应调度dbname)

1、必要的插件(注意下面的脚本需要调整好对应调度dbname)

\c db3 postgres  
create extension pg_trgm;    
create extension dblink;  

2、主表和相似搜索依赖的索引(注意下面的脚本需要调整好对应调度dbname)

\c db3 test  
create unlogged table tbl(id int primary key, info text);    
create index idx_tbl_info on tbl using gin (info gin_trgm_ops);  
    
-- alter table tbl set (parallel_workers =64);    

3、创建分区(本文仅做测试,真正的分区表用法请参考: 《PostgreSQL 11 分区表用法及增强 - 增加HASH分区支持 (hash, range, list)》 )

do language plpgsql $$    
declare    
begin    
  for i in 0..63    
  loop    
    execute format('drop table if exists tbl%s ', i);    
    execute format('create unlogged table tbl%s (like tbl including all) inherits(tbl)', i);    
    -- 提前设置好表级并行度,方便后面做并行测试    
    -- execute format('alter table tbl%s set (parallel_workers =64)', i);    
  end loop;    
end;    
$$;    

4、创建连接函数

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;     

5、创建生成随机函数的函数

-- 生成随机汉字符串      
create or replace function gen_hanzi(int) returns text as $$      
declare      
  res text;      
begin      
  if $1 >=1 then      
    select string_agg(chr(19968+(random()*20901)::int), '') into res from generate_series(1,$1);      
    return res;      
  end if;      
  return null;      
end;      
$$ language plpgsql strict;      

6、写入测试数据,随机文本(注意下面的脚本需要调整好对应调度dbname)

do language plpgsql $$    
declare    
  dbname name := 'db3';  
begin    
  for i in 0..63    
  loop    
    perform conn('link'||i,  'hostaddr=127.0.0.1 user=test password=secret dbname='||dbname);     
    perform dblink_send_query('link'||i, format('insert into tbl%s select generate_series(1, 15625), gen_hanzi(64); analyze tbl%s;', i, i));    
  end loop;    
end;    
$$;    

7、创建相似搜索用到的UDF

create or replace function get_res(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof record as $$    
declare    
  lim float4 := 1;    
begin    
  -- 判定  
  if not ($3 <= 1 and $3 > 0) then   
    raise notice '$3 must >0 and <=1';  
    return;  
  end if;  
    
  if not ($4 > 0 and $4 < 1) then  
    raise notice '$4 must >0 and <=1';  
    return;  
  end if;  
  loop    
    -- 设置相似度阈值    
    perform set_limit(lim);    
        
    -- 查看当前阈值下,有没有相似记录    
    perform similarity(info, $1) as sml, * from tbl where info % $1 limit 1;    
        
    -- 如果有,则返回N条    
    if found then    
      return query select similarity(info, $1) as sml, * from tbl where info % $1 order by sml desc limit $2;    
      return;    
    end if;    
    
    -- 否则继续,降低阈值    
    -- 当阈值小于0.3时,不再降阈值搜索,认为没有相似。    
    if lim < $3 then    
      return;    
    else    
      lim := lim - $4;    
    end if;    
  end loop;    
end;    
$$ language plpgsql strict;    

本地库操作

创建建立远程连接的函数

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;        

返回游标

1、定义UDF1 - 返回游标(如果返回记录数很多,建议使用游标,因为PLPGSQL是需要等所有记录都拿到才会开始返回,返回记录的话RT会较高)

例子

create or replace function get_res_cursor(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof refcursor as $$    
declare    
  i int := 1;  
  ref refcursor[];    
  res refcursor;   
  dbname name[] := array['db0', 'db1', 'db2', 'db3'];  -- 定义集群  
  db name;  
begin  
  foreach db in array dbname  
  loop   
    ref[i] := 'link'||i;  
    res := ref[i];  
    perform conn('link'||i,  db);         
    perform dblink_open('link'||i, 'link'||i, format('select * from get_res(%L, %s, %s, %s) as t(sml real, id int, info text)', $1, $2, $3, $4));    
    return next res;  
    i := i+1;  
  end loop;  
end;  
$$ language plpgsql strict;  

使用例子

postgres=# begin;  
BEGIN  
Time: 0.045 ms  
postgres=# select * from get_res_cursor('怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂', 10, 0.1, 0.05);  
 get_res_cursor   
----------------  
 link1  
 link2  
 link3  
 link4  
(4 rows)  
  
Time: 18.624 ms  
postgres=# select * from dblink_fetch('link1','link1',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 219.972 ms  
postgres=# select * from dblink_fetch('link1','link1',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 0.252 ms  
postgres=# select * from dblink_fetch('link2','link2',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 215.891 ms  
postgres=# select * from dblink_fetch('link3','link3',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 215.188 ms  
postgres=# select * from dblink_fetch('link4','link4',10) as t(sml real, id int, info text);  
   sml    | id |                                                               info                                                                 
----------+----+----------------------------------------------------------------------------------------------------------------------------------  
 0.779412 |  1 | 递陊怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂轪氐洚重銄懟諔  
(1 row)  
  
Time: 106.692 ms  

返回记录

1、定义UDF2 - 返回记录(注意,建议限制返回的条数,因为PLPGSQL是需要等所有记录都拿到才会开始返回)

例子

create or replace function get_res_record(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof record as $$    
declare    
  i int;  
  ref refcursor[];    
  res refcursor;   
  dbname name[] := array['db0', 'db1', 'db2', 'db3'];  -- 定义集群  
  db name;  
begin  
  i := 1;  
  foreach db in array dbname  
  loop   
    perform conn('link'||i,  db);     
    perform 1 from dblink_get_result('link'||i) as t(sml real, id int, info text);       
    perform dblink_send_query('link'||i, format('select * from get_res(%L, %s, %s, %s) as t(sml real, id int, info text)', $1, $2, $3, $4));    
    i := i+1;  
  end loop;  
  
  i := 1;  
  foreach db in array dbname  
  loop   
    return query SELECT * FROM dblink_get_result('link'||i) as t(sml real, id int, info text);     
    i := i+1;  
  end loop;  
end;  
$$ language plpgsql strict;  

使用例子

postgres=# select * from get_res_record('怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂', 10, 0.77, 0.4) as (sml real, id int, info text);  
   sml    | id |                                                               info                                                                 
----------+----+----------------------------------------------------------------------------------------------------------------------------------  
 0.779412 |  1 | 递陊怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂轪氐洚重銄懟諔  
(1 row)  
  
Time: 32.329 ms  

小结

使用本文提到的方法,你就可以将多个PostgreSQL当成一个PostgreSQL来使用,实现并行相似搜索的线性扩容。

性能指标,详见:

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

使用dblink异步调用,实现相似文本搜索的横向线性扩展,性能不衰减。

参考

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

《PostgreSQL dblink异步调用实现 并行hash分片JOIN - 含数据交、并、差 提速案例》

https://www.postgresql.org/docs/10/static/dblink.html

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
2月前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
3年前的云栖大会,我们发布分布式云容器平台ACK One,随着3年的发展,很高兴看到ACK One在混合云,分布式云领域帮助到越来越多的客户,今天给大家汇报下ACK One 3年来的发展演进,以及如何帮助客户解决分布式领域多云多集群管理的挑战。
阿里云容器服务 ACK One 分布式云容器企业落地实践
|
3月前
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
96 1
|
3月前
|
机器学习/深度学习 人工智能 负载均衡
【AI大模型】分布式训练:深入探索与实践优化
在人工智能的浩瀚宇宙中,AI大模型以其惊人的性能和广泛的应用前景,正引领着技术创新的浪潮。然而,随着模型参数的指数级增长,传统的单机训练方式已难以满足需求。分布式训练作为应对这一挑战的关键技术,正逐渐成为AI研发中的标配。
200 5
|
3月前
|
存储 Kubernetes 监控
深入浅出分布式事务:理论与实践
在数字化时代的浪潮中,分布式系统如同星辰大海般浩瀚而深邃。本文将带你航行于这片星辰大海,探索分布式事务的奥秘。我们将从事务的基本概念出发,逐步深入到分布式事务的核心机制,最后通过一个实战案例,让你亲自体验分布式事务的魅力。让我们一起揭开分布式事务的神秘面纱,领略其背后的科学与艺术。
85 1
|
3月前
|
Go API 数据库
[go 面试] 分布式事务框架选择与实践
[go 面试] 分布式事务框架选择与实践
|
3月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
94 0
|
3月前
|
机器学习/深度学习 TensorFlow 数据处理
分布式训练在TensorFlow中的全面应用指南:掌握多机多卡配置与实践技巧,让大规模数据集训练变得轻而易举,大幅提升模型训练效率与性能
【8月更文挑战第31天】本文详细介绍了如何在Tensorflow中实现多机多卡的分布式训练,涵盖环境配置、模型定义、数据处理及训练执行等关键环节。通过具体示例代码,展示了使用`MultiWorkerMirroredStrategy`进行分布式训练的过程,帮助读者更好地应对大规模数据集与复杂模型带来的挑战,提升训练效率。
82 0
|
3月前
|
存储 负载均衡 中间件
构建可扩展的分布式数据库:技术策略与实践
【8月更文挑战第3天】构建可扩展的分布式数据库是一个复杂而具有挑战性的任务。通过采用数据分片、复制与一致性模型、分布式事务管理和负载均衡与自动扩展等关键技术策略,并合理设计节点、架构模式和网络拓扑等关键组件,可以构建出高可用性、高性能和可扩展的分布式数据库系统。然而,在实际应用中还需要注意解决数据一致性、故障恢复与容错性以及分布式事务的复杂性等挑战。随着技术的不断发展和创新,相信分布式数据库系统将在未来发挥更加重要的作用。
|
3月前
|
消息中间件 存储 Kafka
微服务实践之分布式定时任务
微服务实践之分布式定时任务
|
4月前
|
存储 关系型数据库 MySQL
深度评测:PolarDB-X 开源分布式数据库的优势与实践
本文对阿里云开源分布式数据库 PolarDB-X 进行了详细评测。PolarDB-X 以其高性能、强可用性和出色的扩展能力在云原生数据库市场中脱颖而出。文章首先介绍了 PolarDB-X 的核心产品优势,包括金融级高可靠性、海量数据处理能力和高效的混合负载处理能力。随后,分析了其分布式架构设计,包括计算节点、存储节点、元数据服务和日志节点的功能分工。评测还涵盖了在 Windows 平台通过 WSL 环境部署 PolarDB-X 的过程,强调了环境准备和工具安装的关键步骤。使用体验方面,PolarDB-X 在处理分布式事务和实时分析时表现稳定,但在网络问题和性能瓶颈上仍需优化。最后,提出了改进建
7023 2

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 下一篇
    无影云桌面