前言
图分析是数据科学中的一个重要领域,其专注于通过图结构来表示数据,并执行各种计算和分析任务。图结构由节点(或称为顶点)和边组成,节点通常代表实体,边表示实体之间的关系。图计算广泛应用于社交网络分析、推荐系统、知识图谱、路径优化等多个领域。
PolarDB PostgreSQL版(下文简称为 PolarDB)是一款阿里云自主研发的云原生关系型数据库产品,100% 兼容 PostgreSQL,高度兼容Oracle语法(公有云版支持Oracle语法);采用基于 Shared-Storage 的存储计算分离架构,具有极致弹性、毫秒级延迟、HTAP 的能力和高可靠、高可用、弹性扩展等企业级数据库特性。同时,PolarDB 具有大规模并行计算能力,可以应对OLTP与OLAP混合负载。
本文介绍的图分析能力,依托阿里云云原生关系型数据库PolarDB PostgreSQL版建设输出。
业务场景
背景信息
保险理赔欺诈一般根据保险提供商所具备的患者、疾病和索赔等数据,分析与被保人相关的理赔单、疾病等实体的关联关系,识别异常理赔记录,发现欺诈团伙。
数据和模型
数据来源自以保险领域公开数据集。
数据包括保险行业基本元素,可以将数据模型抽象为下图:
- 点:投保人(policyholder)、保单(incharge)、理赔(claim)、病人(patient)、疾病(disease)。
- 边:疾病-病人(has_disease)、投保人-理赔(policyholder_of_claim)、保单-理赔(inchagre_of_claim)、病人-理赔(insured_of_claim)、相似理赔(similar_claim)、投保人关联(policyholder_connection)。
- 属性:姓名(name),是否高危(high_risk)、风险分数(risk_score)、疾病名称(disease_name)、相似度(similarity_score)、关联等级(level)、理赔时间(claim_date)、保额(charge)等。
部分数据如下所示:
- POLICYHOLDER(投保人)
"POLICYHOLDER_ID","FNAME","LNAME","RISK_SCORE","HIGH_RISK" PH3068,ADAM,OCHSENBEIN,88,1 PH3069,MALINDA,MEHSERLE,42,0 PH3070,SANDRA,KUHTA,20,0 PH3071,DORA,TAHU,62,1 ...
- CLAIM(理赔)
"CLAIM_ID","CHARGE","CLAIM_DATE","DURATION","INSURED_ID","DIAGNOSIS","PERSON_INCHARGE_ID","TYPE","POLICYHOLDER_ID" C3571,6517.53,2013-08-11 00:00:00,13,"28523",no exception,PI23070,services,PH9507 C3572,49273.65,2017-02-10 00:00:00,3,"1220",no exception,PI21197,services,PH406 C3573,52005.98,2014-06-29 00:00:00,27,"23735",no exception,PI22361,services,PH7911
- POLICYHOLDER 和 CLAIM的关联关系
"CLAIM_ID","POLICYHOLDER_ID" C1528,PH2963 C1529,PH1353 C1530,PH1071
最佳实践
技术实现
PolarDB 图数据库引擎 AGE(A Graph Extension) 是一个为 PostgreSQL系列数据库打造的扩展,旨在增强其处理图数据的能力。AGE 旨在结合关系型数据库与图数据库的优势,提供一个高性能、灵活且易于扩展的解决方案。
AGE主要包含以下特点
- 完全兼容PostgreSQL
AGE 是PolarDB PostgreSQL版的一个扩展,这意味着可以在现有的PolarDB数据库中使用,而无需重新构建数据库。AGE 继承了 PolarDB 的所有强大功能,包括事务、并发控制、以及多种索引和优化技术。
- 统一的图形和关系型查询
AGE 允许同时处理关系型数据和图形数据,可以在同一个查询中混合使用 SQL 和图查询语言。这使得处理复杂的数据模型更加容易和高效。
- 支持 Cypher 查询语言
AGE 支持使用 Cypher 查询语言,这是一种专为图数据库设计的查询语言,语法简单且灵活。为用户提供了一种直观的方式来进行图数据的查询和操作。
- 高性能
通过结合 PolarDB 的优化技术和专为图数据设计的索引,AGE 能够高效地处理大规模图形数据和复杂的图形查询。
如上所述, 借助于AGE强大的能力,PolarDB可以简单、高效地处理各类图查询。
建议配置
为了得到良好的体验,建议使用以下配置:
项目 |
推荐配置 |
PolarDB 版本 |
标准版 兼容PostgreSQL 14 |
CPU |
>16 Core |
内存 |
>64 GB |
磁盘 |
>100GB (AUTOPL) |
版本 |
>2.0.14.23.1 |
数据库准备
数据库中需要预先创建age
扩展
create extension age;
在每个会话执行时,需要设置search_path并执行SQL来加载扩展:
SET search_path = ag_catalog, "$user", public; select * from get_cypher_keywords() limit 0;
如不想在每个会话中设置search_path,可对数据库进行该项操作
ALTER DATABASE xxx SET search_path = ag_catalog, "$user", public;
数据入库
创建图
使用函数create_graph 可以创建一个图,创建时需要指定图的名称。
SELECT create_graph('graph');
插入节点和边
由于下载的数据为csv文件,不包含所需的id信息,因此需要将数据进行转换后进行入库操作。
本文附录中提供了将数据转换为PolarDB中vertex和edge的的python脚本,转换后如下所示:
- POLICYHOLDER(投保人)
SELECT create_vlabel('graph','policyholder'); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3068',fname:'ADAM',lname:'OCHSENBEIN',risk_score:'88',high_risk:'1'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3069',fname:'MALINDA',lname:'MEHSERLE',risk_score:'42',high_risk:'0'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3070',fname:'SANDRA',lname:'KUHTA',risk_score:'20',high_risk:'0'}) $$ ) as (n agtype); ...
- CLAIM(理赔)
- Create vlabel SELECT create_vlabel('graph','claim'); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3571',charge:'6517.53',claim_date:'2013-08-11 00:00:00',duration:'13',insured_id:'28523',diagnosis:'no exception',person_incharge_id:'PI23070',type:'services',policyholder_id:'PH9507'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3572',charge:'49273.65',claim_date:'2017-02-10 00:00:00',duration:'3',insured_id:'1220',diagnosis:'no exception',person_incharge_id:'PI21197',type:'services',policyholder_id:'PH406'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3573',charge:'52005.98',claim_date:'2014-06-29 00:00:00',duration:'27',insured_id:'23735',diagnosis:'no exception',person_incharge_id:'PI22361',type:'services',policyholder_id:'PH7911'}) $$ ) as (n agtype); ...
- POLICYHOLDER 和 CLAIM的关联关系
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1528' AND b.policyholder_id = 'PH2963' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1529' AND b.policyholder_id = 'PH1353' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1530' AND b.policyholder_id = 'PH1071' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1531' AND b.policyholder_id = 'PH8102' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1532' AND b.policyholder_id = 'PH4768' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); ...
将转换后的结果保存为sql文件,配合客户端工具,如psql等可完成数据导入。
使用示例
简单查询
数据统计
- 各种类型节点数量
SELECT count(*) FROM cypher('graph', $$ MATCH (v) RETURN v $$) as (v agtype); count -------- 120567 -- claim SELECT count(*) FROM cypher('graph', $$ MATCH (v:claim) RETURN v $$) as (v agtype); count -------- 100001 -- policyholder SELECT count(*) FROM cypher('graph', $$ MATCH (v:policyholder) RETURN v $$) as (v agtype); count ------- 10006 -- incharge SELECT count(*) FROM cypher('graph', $$ MATCH (v:incharge) RETURN v $$) as (v agtype); count ------- 10001 --disease SELECT count(*) FROM cypher('graph', $$ MATCH (v:disease) RETURN v $$) as (v agtype); count ------- 393 --patient SELECT count(*) FROM cypher('graph', $$ MATCH (v:patient) RETURN v $$) as (v agtype); count ------- 166
过滤查询、排序查询
- 查询理赔单C4377的投保、理赔、被保情况
SELECT 'policyholder_id' as type, policyholder_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(policyholder:policyholder) RETURN policyholder.policyholder_id $$) as (policyholder_id agtype) UNION SELECT 'incharge_id', incharge_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(v:incharge) RETURN v.incharge_id $$) as (incharge_id agtype) UNION SELECT 'patient_id', patient_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(v:patient) RETURN v.patient_id $$) as (patient_id agtype); type | policyholder_id -----------------+----------------- patient_id | "11279" policyholder_id | "PH3759" incharge_id | "PI26607"
通用场景
K阶邻居
- 已知保单C4377为欺诈保单,查询和理赔单C4377有相同理赔病人的理赔单,说明该理赔人有涉嫌骗保的嫌疑。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:patient)<-[]-(c:claim) RETURN c.claim_id $$) as (claim_id agtype); ?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C3679" claim_id | "C96545" claim_id | "C26586" claim_id | "C26754" claim_id | "C87278" claim_id | "C87603" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"
- 查询已知欺诈保单C4377的投保人的社交关系,可以对这些人的理赔情况提前预警。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(a:policyholder)-[r*1..3]->(p:policyholder) RETURN p.policyholder_id $$) as (policyholder_id agtype); ?column? | policyholder_id -----------------+----------------- policyholder_id | "PH52532" policyholder_id | "PH11283" policyholder_id | "PH11328" policyholder_id | "PH1" policyholder_id | "PH5" policyholder_id | "PH512" policyholder_id | "PH1569" policyholder_id | "PH4722" policyholder_id | "PH4731"
路径检索
- 查询投保人PH3759和投保人PH4722的路径,分析投保人之间的关联关系。
SELECT * FROM cypher('graph', $$ MATCH path = (:policyholder {policyholder_id: 'PH3759'})-[r*1..3]->(:policyholder {policyholder_id: 'PH4722'}) RETURN path $$) AS (v agtype); ------- [{"id": 844424930136988, "label": "policyholder", "properties": {"fname": "KURTIS", "lname": "ALKEMA", "high_risk": "1", "risk_score": "78", "policyholder_id": "PH3759"}}::vertex, {"id": 2251799813685487, "label": "RELTYPE", "end_id": 844424930133473, "start_id": 844424930136988, "properties": {"level": "65"}}::edge, {"id": 844424930133473, "label": "policyholder", "properties": {"fname": "TERRA", "lname": "SWARB", "high_risk": "0", "risk_score": "25", "policyholder_id": "PH512"}}::vertex, {"id": 2251799813685546, "label": "RELTYPE", "end_id": 844424930138502, "start_id": 844424930133473, "properties": {"level": "62"}}::edge, {"id": 844424930138502, "label": "policyholder", "properties": {"fname": "VETA", "lname": "SEDLACK", "high_risk": "0", "risk_score": "31", "policyholder_id": "PH1569"}}::vertex, {"id": 2251799813685594, "label": "RELTYPE", "end_id": 844424930136281, "start_id": 844424930138502, "properties": {"level": "92"}}::edge, {"id": 844424930136281, "label": "policyholder", "properties": {"fname": "DEANNA", "lname": "BALSER", "high_risk": "0", "risk_score": "36", "policyholder_id": "PH4722"}}::vertex]::path
共同邻居
- 查询保单C4377和保单C67594的共同邻居,从而找到两个保单的共同投保人。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(:claim {claim_id: 'C67594'}) RETURN p.policyholder_id $$) as (policyholder_id agtype); ?column? | policyholder_id -----------------+----------------- policyholder_id | "PH3759"
协同推荐
- 已知保单C4377为欺诈保单,查找和保单C4377有共同投保人的保单,从而找到欺诈疑似涉诈保单。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(c:claim) RETURN c.claim_id $$) as (claim_id agtype); ?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C96545" claim_id | "C3679" claim_id | "C87603" claim_id | "C26754" claim_id | "C26586" claim_id | "C87278" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"
- 与已知涉诈保单 C4377 相似度最大的保单,返回前20个:
with t as ( SELECT claim_id, replace(trim(both '"' from to_jsonb(properties)::text), '\"', '"') as similarity FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[e]->(c:claim) RETURN properties(e), c.claim_id $$) as (properties agtype, claim_id agtype) ) SELECT claim_id, replace((similarity::jsonb->'similarity_score')::text, '"','')::integer as s from t ORDER BY s DESC LIMIT 20; claim_id | s ----------+---- "C67594" | 13 "C69395" | 13 "C10160" | 13 "C87603" | 13 "C28963" | 13 "C3679" | 13 "C26754" | 13 "C96155" | 13 "C26586" | 13 "C87278" | 13 "C96545" | 13 "C20113" | 8 "C70759" | 8 "C28785" | 8 "C12793" | 8 "C59736" | 8 "C38059" | 8 "C34068" | 8 "C71827" | 8 "C15760" | 8
总结
本文介绍了如何利用阿里云云原生关系型数据库PolarDB PostgreSQL版的图分析能力来进行图数据分析。
PolarDB结合AGE扩展,提供了图数据计算分析的功能, 包括使用Cypher查询语言,高效处理查询图数据。本文以公开的保险数据集为例,示例了在保险理赔场景下,执行图查询来发现异常理赔记录和欺诈团伙:例如,查询与欺诈保单有相同理赔病人的其他保单,或者找出欺诈保单的投保人社交关系,以便进行欺诈预警。PolarDB在关系型数据库的基础上,提供了图分析能力,为企业的统一数据管理和分析,提供了强有力的支撑。
试用体验
欢迎访问PolarDB免费试用页面,选择试用“云原生数据库PolarDB PostgreSQL版”,体验PolarDB的图计算能力
附录
数据转换脚本
import csv import os def convert_vertex_csv(file_path, graph): file_name = os.path.splitext(os.path.basename(file_path))[0].lower() # create vlabel print("------------------------------------------------") print("-- Create vlabel") print("SELECT create_vlabel('{}','{}');".format(graph, file_name)) with open(file_path, 'r') as csvfile: reader = csv.reader(csvfile, delimiter=',') header = next(reader) for row in reader: p = "" for h in header: if p != "": p += "," else: p += "{" p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip()) if p != "": p += "}" print("SELECT * FROM cypher('{}', $$ CREATE (:{} {}) $$ ) as (n agtype);".format(graph, file_name, p)) def convert_edge_csv(file_path, graph, from_type, to_type): file_name = os.path.splitext(os.path.basename(file_path))[0].lower() with open(file_path, 'r') as csvfile: reader = csv.reader(csvfile, delimiter=',') header = next(reader) for row in reader: p = "" for h in header: if (h.endswith("ID")): continue; if p != "": p += "," else: p += "{" p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip()) if p != "": p += "}" print("SELECT * FROM cypher('{0}', $$ MATCH (a:{1}), (b:{2}) WHERE a.{1}_id = '{3}' AND " "b.{2}_id = '{4}' CREATE (a)-[e:RELTYPE {5} ]->(b) RETURN e$$) as (e agtype);".format(graph, from_type, to_type, row[0].strip(), row[1].strip(), p)) def generate_graph_csv(directory, graph): print("------------------------------------------------") print("-- Create graph") print("SELECT create_graph('{}');".format(graph)) print("------------------------------------------------") print("-- Create vertex") convert_vertex_csv(directory + "/POLICYHOLDER.csv", graph) convert_vertex_csv(directory + "/INCHARGE.csv", graph) convert_vertex_csv(directory + "/PATIENT.csv", graph) convert_vertex_csv(directory + "/CLAIM.csv", graph) convert_vertex_csv(directory + "/DISEASE.csv", graph) print("------------------------------------------------") print("-- Create edge") convert_edge_csv(directory + "/POLICYHOLDER_CONNECTION.csv", graph, 'policyholder','policyholder') convert_edge_csv(directory + "/INCHARGE_OF_CLAIM.csv", graph,'claim', 'incharge') convert_edge_csv(directory + "/CLAIM_SIMILARITY.csv", graph, 'claim','claim') convert_edge_csv(directory + "/POLICYHOLDER_OF_CLAIM.csv", graph,'claim', 'policyholder') convert_edge_csv(directory + "/INSURED_OF_CLAIM.csv", graph, 'claim','patient') convert_edge_csv(directory + "/HAS_DISEASE.csv", graph, 'patient','disease') generate_graph_csv("analyzing-insurance-claims-using-ibm-db2-graph-master/data", "graph")