基于PolarDB的图分析:通过DTS将其它数据库的数据表同步到PolarDB的图

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文介绍了使用DTS任务将数据从MySQL等数据源实时同步到PolarDB-PG的图数据库中的步骤.

内容简介和数据格式需求

在图数据库的应用场景中,部分客户会选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。

PolarDB-PostgreSQL兼容版(简称PolarDB-PG)有着强大的一库统管能力,鼓励用户将多模异构的数据统一放在PolarDB-PG中进行处理。同时,也可以单独使用其多模的能力。

本文以MySQL数据源为例,介绍了通过DTS任务将MySQL写入的数据,通过搭建实时同步链路,同步到PolarDB-PG所管理的图数据库的全过程。

这里我们要求我们的写入的节点和边数据有一列在其类型(label)中唯一的,小于2^48的id。而边数据除唯一id之外,还有两列分别指定其起点和终点对应的节点的id。

对于没有唯一id、或者唯一id不为整数类型的节点和边,通常可以添加一列serial类型的列作为唯一id。如果没有唯一id的帮助,我们将无法同步对数据的更改和删除。这列唯一id可以在数据表中使用serial的特性自动生成而不必手动插入,同时可以选择不将此列id加入到图。

样例场景

我们开通一个PolarDB-MySQL实例和一个PolarDB-PG实例。在PolarDB-MySQL实例上,我们将新增的数据写入到表中,通过DTS任务同步到PolarDB-PostgreSQL的图数据库中。

假设我们要同步的图数据由三部分组成:两个点表A和B,记录图上的点,各自有一个唯一id;一个边表C,记录其起点在A中和终点在B中的唯一ID。要在PolarDB-PostgreSQL中建立一张名为gra的图,包含A,B两种类型的节点和C这种类型的边。

在PolarDB-MySQL中的表定义为

CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
CREATE TABLE raw_C(id integer, id_a integer, id_b integer);

对于MySQL数据库,在使用DTS前,需要开启binlog功能。


之后,需要先在目标数据库库创建图。首先创建插件

CREATE EXTENSION age;
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname>
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';

然后,使用SQL语句创建图、点和边

SELECT create_graph('gra');
SELECT create_vlabel('gra', 'label_a');
SELECT create_vlabel('gra', 'label_b');
SELECT create_elabel('gra', 'edge_c');


通过DTS将数据同步到PolarDB-PG

DTS控制台中,选择数据同步项目,并选择创建任务

在创建界面,选择源库为MySQL,目标库为PostgreSQL,并均选择 “专线/VPN网关/智能网关”方式接入,

然后根据实际的实例地区,vpc网段,集群地址,端口,用户名和密码。

之后在同步对象选择中,选择对应数据库下,TABLE栏目中的raw_A, raw_B, raw_C三张表。之后一直确认,等待同步完成。


通过触发器将数据同步到图

首先创建如下辅助函数:

CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
RETURNS bigint
AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
language SQL IMMUTABLE STRICT PARALLEL SAFE;



CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
  sql := $$

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
  $$;
  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;






CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
  sql := $$

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
  $$;

  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;

然后只需要执行

select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');

即可构建从同步表到图中的触发器。这里your_schema_name需要替换成raw_a等数据表所在的schema,可以通过

\d+ table_name

命令查看,通常为public或用户名。(注意:请统一使用小写,大小写敏感)


上述触发器只能同步增量数据,对于有存量数据的表,需要在创建上述触发器之后执行

INSERT INTO "gra"."label_a" (id, properties) SELECT _sync_public_raw_a_row_to_id(raw_A.id), _sync_public_raw_a_row_to_properties(raw_A) FROM raw_A;
INSERT INTO "gra"."label_b" (id, properties) SELECT _sync_public_raw_b_row_to_id(raw_B.id), _sync_public_raw_b_row_to_properties(raw_B) FROM raw_B;
INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT _sync_public_raw_c_row_to_id(raw_C.id), _sync_public_raw_a_row_to_id(raw_C.id_a), _sync_public_raw_b_row_to_id(raw_C.id_b), _sync_public_raw_c_row_to_properties(raw_C) FROM raw_C;



测试验证

在MySQL中,先向同步的表中插入测试数据:

INSERT INTO raw_a values(1,1,1,'2000-01-01');
INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
INSERT INTO raw_c values(1,1,1);

然后使用cypher语言进行图查询,验证数据插入成功:

SELECT * FROM cypher('gra', $$
MATCH (v)
RETURN v
$$) as (v agtype);
------
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
 {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
------
 {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge

接着验证修改属性

-- 在源库上
UPDATE raw_a SET name = '2' WHERE id = 1;

在图中使用SQL进行查询,看到边的属性已被修改

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v:label_a {id:1})
RETURN v
$$) as (v agtype);
-----
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex

最后验证删除数据

-- 在源库上
DELETE FROM raw_c WHERE id = 1;

看到图中的数据也同步地删除了

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
-----

补充和注意事项

  1. 在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。
  2. 在搭建完成DTS链路后,不能再修改同步表的数据结构(如增加、删除列等操作)。否则可能导致后续无法同步。
  3. 上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以修改形如_sync_<表名>_row_to_properties函数的定义(可以通过psql的\df+命令等方式查看到函数的sql定义),其常规定义如下面所示,需要修改加入的列或对列的值进行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如如果希望将id_a和id_b两列进行拼接,可以改为select val.id_a::text || val.id_b::text AS id
CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
LANGUAGE SQL;
  1. 在写入时,对于一条边,需要确保其两侧的点已经插入,再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
1月前
|
Cloud Native 关系型数据库 分布式数据库
让PolarDB更了解您--PolarDB云原生数据库核心功能体验馆
让PolarDB更了解您——PolarDB云原生数据库核心功能体验馆,由阿里云数据库产品事业部负责人宋震分享。内容涵盖PolarDB技术布局、开源进展及体验馆三大部分。技术布局包括云计算加速数据库演进、数据处理需求带来的变革、软硬协同优化等;开源部分介绍了兼容MySQL和PostgreSQL的两款产品;体验馆则通过实际操作让用户直观感受Serverless、无感切换、SQL2Map等功能。
106 7
|
4天前
|
存储 关系型数据库 分布式数据库
PolarDB 开源基础教程系列 8 数据库生态
PolarDB是一款开源的云原生分布式数据库,源自阿里云商业产品。为降低使用门槛,PolarDB携手伙伴打造了完整的开源生态,涵盖操作系统、芯片、存储、集成管控、监控、审计、开发者工具、数据同步、超融合计算、ISV软件、开源插件、人才培养、社区合作及大型用户合作等领域。通过这些合作伙伴,PolarDB提供了丰富的功能和服务,支持多种硬件和软件环境,满足不同用户的需求。更多信息请访问[PolarDB开源官方网站](https://openpolardb.com/home)。
35 4
|
28天前
|
存储 关系型数据库 分布式数据库
PolarDB PostgreSQL版:商业数据库替换与企业上云首选
PolarDB PostgreSQL版是商业数据库替换与企业上云的首选。其技术架构实现存储计算分离,具备极致弹性和扩展性,支持Serverless、HTAP等特性。产品在弹性、性能、成本优化和多模处理方面有显著提升,如冷热数据自动分层、Ganos多模引擎等。已在汽车、交通、零售等行业成功应用,典型案例包括小鹏汽车、中远海科等,帮助企业大幅降低运维成本并提高业务效率。
46 13
|
28天前
|
容灾 关系型数据库 分布式数据库
PolarDB分布式版:与云融合的分布式数据库发展新阶段
PolarDB分布式版标志着分布式数据库与云融合的新阶段。它经历了三个发展阶段:从简单的分布式中间件,到一体化分布式架构,再到云原生分布式数据库。PolarDB充分利用云资源的弹性、高性价比、高可用性和隔离能力,解决了大规模数据扩展性问题,并支持多租户场景和复杂事务处理。零售中台的建设背景包括国家数字化转型战略及解决信息孤岛问题,采用分布式数据库提升高可用性和性能,满足海量订单处理需求。展望未来,零售中台将重点提升容灾能力、优化资源利用并引入AI技术,以实现更智能的服务和更高的业务连续性。
|
30天前
|
关系型数据库 分布式数据库 数据库
瑶池数据库大讲堂|PolarDB HTAP:为在线业务插上实时分析的翅膀
瑶池数据库大讲堂介绍PolarDB HTAP,为在线业务提供实时分析能力。内容涵盖MySQL在线业务的分析需求与现有解决方案、PolarDB HTAP架构优化、针对分析型负载的优化(如向量化执行、多核并行处理)及近期性能改进和用户体验提升。通过这些优化,PolarDB HTAP实现了高效的数据处理和查询加速,帮助用户更好地应对复杂业务场景。
|
28天前
|
运维 关系型数据库 分布式数据库
阿里云PolarDB:引领云原生数据库创新发展
阿里云PolarDB引领云原生数据库创新,2024云栖大会将分享其最新发展及在游戏行业的应用。PolarDB凭借弹性、高可用性、多写技术等优势,支持全球80多个站点,服务1万多家企业。特别是针对游戏行业,PolarDB助力Funplus等公司实现高效运维、成本优化和业务扩展。通过云原生能力,PolarDB推动游戏业务的全球化部署与快速响应,提升用户体验并保障数据安全。未来,PolarDB将继续探索AI、多云管理等前沿技术,为用户提供更智能的数据基础设施。
|
1月前
|
关系型数据库 分布式数据库 数据库
1月17日|阿里云云谷园区,PolarDB V2.0技术沙龙,畅聊国产数据库
为了助力国产化项目顺利推进,阿里云邀请企业开发者和数据库负责人到云谷园区,与PolarDB V2.0技术专家面对面交流。扫描海报二维码报名,我们将根据信息为您申请入园。欢迎参与,共同探讨PolarDB的最新技术和应用!
|
1月前
|
关系型数据库 Serverless 分布式数据库
瑶池数据库微课堂 | PolarDB Serverless弹性&价格力观测
瑶池数据库微课堂介绍阿里云PolarDB Serverless的弹性与性价比优势。通过瑶池解决方案体验馆,用户可免费实操,直观感受Serverless的秒级弹性及超高性价比。内容涵盖Serverless概念、操作步骤、压测演示及性能曲线分析,展示PolarDB在不同负载下的自动扩展能力。适合希望了解云数据库弹性和成本效益的技术人员。
|
1月前
|
关系型数据库 OLAP 分布式数据库
瑶池数据库微课堂|PolarDB/RDS+ADB Zero-ETL:一种免费、易用、高效的数据同步方式
瑶池数据库微课堂介绍阿里云PolarDB/RDS与ADB的Zero-ETL功能,实现免费、易用、高效的数据同步。内容涵盖OLTP与OLAP的区别、传统ETL存在的问题及Zero-ETL的优势(零成本、高效同步),并演示了从RDS MySQL到AnalyticDB MySQL的具体操作步骤。未来将优化和迭代此功能,提供更好的用户体验。

相关产品

  • 云原生数据库 PolarDB