基于PolarDB的图分析:通过DTS将其它数据库的数据表同步到PolarDB的图

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 本文介绍了使用DTS任务将数据从MySQL等数据源实时同步到PolarDB-PG的图数据库中的步骤.

内容简介和数据格式需求

在图数据库的应用场景中,部分客户会选择先将数据写入到其它数据库中,再进一步将数据同步到图数据库中进行图查询。

PolarDB-PostgreSQL兼容版(简称PolarDB-PG)有着强大的一库统管能力,鼓励用户将多模异构的数据统一放在PolarDB-PG中进行处理。同时,也可以单独使用其多模的能力。

本文以MySQL数据源为例,介绍了通过DTS任务将MySQL写入的数据,通过搭建实时同步链路,同步到PolarDB-PG所管理的图数据库的全过程。

这里我们要求我们的写入的节点和边数据有一列在其类型(label)中唯一的,小于2^48的id。而边数据除唯一id之外,还有两列分别指定其起点和终点对应的节点的id。

对于没有唯一id、或者唯一id不为整数类型的节点和边,通常可以添加一列serial类型的列作为唯一id。如果没有唯一id的帮助,我们将无法同步对数据的更改和删除。这列唯一id可以在数据表中使用serial的特性自动生成而不必手动插入,同时可以选择不将此列id加入到图。

样例场景

我们开通一个PolarDB-MySQL实例和一个PolarDB-PG实例。在PolarDB-MySQL实例上,我们将新增的数据写入到表中,通过DTS任务同步到PolarDB-PostgreSQL的图数据库中。

假设我们要同步的图数据由三部分组成:两个点表A和B,记录图上的点,各自有一个唯一id;一个边表C,记录其起点在A中和终点在B中的唯一ID。要在PolarDB-PostgreSQL中建立一张名为gra的图,包含A,B两种类型的节点和C这种类型的边。

在PolarDB-MySQL中的表定义为

CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
CREATE TABLE raw_C(id integer, id_a integer, id_b integer);

对于MySQL数据库,在使用DTS前,需要开启binlog功能。


之后,需要先在目标数据库库创建图。首先创建插件

CREATE EXTENSION age;
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname>
SET search_path = "$user", public, ag_catalog;
ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';

然后,使用SQL语句创建图、点和边

SELECT create_graph('gra');
SELECT create_vlabel('gra', 'label_a');
SELECT create_vlabel('gra', 'label_b');
SELECT create_elabel('gra', 'edge_c');


通过DTS将数据同步到PolarDB-PG

DTS控制台中,选择数据同步项目,并选择创建任务

在创建界面,选择源库为MySQL,目标库为PostgreSQL,并均选择 “专线/VPN网关/智能网关”方式接入,

然后根据实际的实例地区,vpc网段,集群地址,端口,用户名和密码。

之后在同步对象选择中,选择对应数据库下,TABLE栏目中的raw_A, raw_B, raw_C三张表。之后一直确认,等待同步完成。


通过触发器将数据同步到图

首先创建如下辅助函数:

CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
RETURNS bigint
AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
language SQL IMMUTABLE STRICT PARALLEL SAFE;



CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
  sql := $$

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
  $$;
  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;






CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
  column_names TEXT;
  sql TEXT;
BEGIN
  SELECT string_agg(format('val.%I', column_name), ', ')
    INTO column_names
    FROM information_schema.columns
    WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
  sql := $$

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;

CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
    RETURN OLD;
  END IF;
  RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();

ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
  $$;

  EXECUTE sql;
  RETURN true;
END;
$outer$
LANGUAGE plpgsql;

然后只需要执行

select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');

即可构建从同步表到图中的触发器。这里your_schema_name需要替换成raw_a等数据表所在的schema,可以通过

\d+ table_name

命令查看,通常为public或用户名。(注意:请统一使用小写,大小写敏感)


上述触发器只能同步增量数据,对于有存量数据的表,需要在创建上述触发器之后执行

INSERT INTO "gra"."label_a" (id, properties) SELECT _sync_public_raw_a_row_to_id(raw_A.id), _sync_public_raw_a_row_to_properties(raw_A) FROM raw_A;
INSERT INTO "gra"."label_b" (id, properties) SELECT _sync_public_raw_b_row_to_id(raw_B.id), _sync_public_raw_b_row_to_properties(raw_B) FROM raw_B;
INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT _sync_public_raw_c_row_to_id(raw_C.id), _sync_public_raw_a_row_to_id(raw_C.id_a), _sync_public_raw_b_row_to_id(raw_C.id_b), _sync_public_raw_c_row_to_properties(raw_C) FROM raw_C;



测试验证

在MySQL中,先向同步的表中插入测试数据:

INSERT INTO raw_a values(1,1,1,'2000-01-01');
INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
INSERT INTO raw_c values(1,1,1);

然后使用cypher语言进行图查询,验证数据插入成功:

SELECT * FROM cypher('gra', $$
MATCH (v)
RETURN v
$$) as (v agtype);
------
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
 {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
------
 {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge

接着验证修改属性

-- 在源库上
UPDATE raw_a SET name = '2' WHERE id = 1;

在图中使用SQL进行查询,看到边的属性已被修改

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v:label_a {id:1})
RETURN v
$$) as (v agtype);
-----
 {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex

最后验证删除数据

-- 在源库上
DELETE FROM raw_c WHERE id = 1;

看到图中的数据也同步地删除了

-- 在图中
SELECT * FROM cypher('gra', $$
MATCH (v)-[e]->(v2)
RETURN e
$$) as (e agtype);
-----

补充和注意事项

  1. 在搭建同步链路的过程中不可以写入数据,否则这部分数据会无法导入图。
  2. 在搭建完成DTS链路后,不能再修改同步表的数据结构(如增加、删除列等操作)。否则可能导致后续无法同步。
  3. 上述辅助函数会将全部的列作为属性加入到图的属性中,如果希望调整加入到图中的属性,可以修改形如_sync_<表名>_row_to_properties函数的定义(可以通过psql的\df+命令等方式查看到函数的sql定义),其常规定义如下面所示,需要修改加入的列或对列的值进行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如如果希望将id_a和id_b两列进行拼接,可以改为select val.id_a::text || val.id_b::text AS id
CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
LANGUAGE SQL;
  1. 在写入时,对于一条边,需要确保其两侧的点已经插入,再写入这条边,否则可能造成图数据库在查询时因为找不到对应的点而产生错误
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
1月前
|
Cloud Native 关系型数据库 MySQL
免费体验!高效实现自建 MySQL 数据库平滑迁移至 PolarDB-X
PolarDB-X 是阿里云推出的云原生分布式数据库,支持PB级存储扩展、高并发访问与数据强一致,助力企业实现MySQL平滑迁移。现已开放免费体验,点击即享高效、稳定的数据库升级方案。
免费体验!高效实现自建 MySQL 数据库平滑迁移至 PolarDB-X
|
3月前
|
存储 关系型数据库 分布式数据库
喜报|阿里云PolarDB数据库(分布式版)荣获国内首台(套)产品奖项
阿里云PolarDB数据库管理软件(分布式版)荣获「2024年度国内首版次软件」称号,并跻身《2024年度浙江省首台(套)推广应用典型案例》。
|
4月前
|
关系型数据库 分布式数据库 数据库
再获殊荣,阿里云PolarDB数据库蝉联SIGMOD最佳论文奖
内存池化技术新突破,阿里云PolarDB蝉联SIGMOD最佳论文奖
|
1月前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
5月前
|
Cloud Native 关系型数据库 分布式数据库
阿里云PolarDB与沃趣科技携手打造一体化数据库解决方案,助推国产数据库生态发展
阿里云瑶池数据库与沃趣科技将继续深化合作,共同推动国产数据库技术的持续创新与广泛应用,为行业生态的繁荣注入更强劲的技术动力。
阿里云PolarDB与沃趣科技携手打造一体化数据库解决方案,助推国产数据库生态发展
|
3月前
|
关系型数据库 分布式数据库 数据库
阿里云PolarDB数据库蝉联SIGMOD最佳论文奖
阿里云PolarDB凭借全球首创基于CXL Switch的分布式内存池技术,在SIGMOD 2025上荣获工业赛道“最佳论文奖”,连续两年蝉联该顶会最高奖项。其创新架构PolarCXLMem打破传统RDMA技术瓶颈,性能提升2.1倍,并已落地应用于内存池化场景,推动大模型推理与多模态存储发展,展现CXL Switch在高速互联中的巨大潜力。
阿里云PolarDB数据库蝉联SIGMOD最佳论文奖
|
4月前
|
Cloud Native 关系型数据库 分布式数据库
客户说|知乎基于阿里云PolarDB,实现最大数据库集群云原生升级
近日,知乎最大的风控业务数据库集群,基于阿里云瑶池数据库完成了云原生技术架构的升级。此次升级不仅显著提升了系统的高可用性和性能上限,还大幅降低了底层资源成本。
|
5月前
|
存储 Cloud Native 关系型数据库
PolarDB开源:云原生数据库的架构革命
本文围绕开源核心价值、社区运营实践和技术演进路线展开。首先解读存算分离架构的三大突破,包括基于RDMA的分布式存储、计算节点扩展及存储池扩容机制,并强调与MySQL的高兼容性。其次分享阿里巴巴开源治理模式,涵盖技术决策、版本发布和贡献者成长体系,同时展示企业应用案例。最后展望技术路线图,如3.0版本的多写多读架构、智能调优引擎等特性,以及开发者生态建设举措,推荐使用PolarDB-Operator实现高效部署。
305 3
|
5月前
|
SQL 关系型数据库 分布式数据库
PolarDB开源数据库入门教程
PolarDB是阿里云推出的云原生数据库,基于PostgreSQL、MySQL和Oracle引擎构建,具备高性能、高扩展性和高可用性。其开源版采用计算与存储分离架构,支持快速弹性扩展和100%兼容PostgreSQL/MySQL。本文介绍了PolarDB的安装方法(Docker部署或源码编译)、基本使用(连接数据库、创建表等)及高级特性(计算节点扩展、存储自动扩容、并行查询等)。同时提供了性能优化建议和监控维护方法,帮助用户在生产环境中高效使用PolarDB。
1784 21

相关产品

  • 云原生数据库 PolarDB