Hadoop环境下进行Neo4j数据(亿级)同步,企业和人物关系数据导入实践
使用场景
- 使用远程csv文件进行数据同步,而不是本地csv文件
- 使用hdfs协议进行数据传输
- 使用hive生成数据文件
环境准备
CDH6
Neo4j==5.4
apoc-5.4.0-extended.jar
apoc-5.4.1-core.jar
APOC
由于APOC依赖于Neo4j的内部API,因此您需要使用匹配的APOC版本进行Neo4j安装。确保前两个版本号在Neo4j和APOC之间匹配。
转到此处查看所有APOC扩展版本,并下载二进制jar以放入$NEO4J_HOME/plugins
文件夹中。
将jar文件移动到插件文件夹后,您必须重新启动neo4j`neo4j restart
官方文档说明
https://neo4j.com/labs/apoc/5/installation/
https://neo4j.com/labs/apoc/5/overview/apoc.load/apoc.load.csv/
Neo4j 导入数据的方式对比
导入数据的几种方式有:
- 使用 Cypher create 语句,为每一条数据写一个 create
- 使用 Cypher load csv 语句,将数据转成 CSV 格式,通过 LOAD CSV 读取数据
- 使用 APOC 插件,它提供了很多导入和导出的函数和过程
- 使用编程语言(Java,Python,JS,C#,Go)导入数据
- 使用 neo4j-admin 工具导入数据
- 使用 ETL 工具导入数据
这些方式的效率和难易度各有不同。一般来说:
- 如果数据量很小(几千条),可以使用 Cypher create 语句或者 APOC 插件
- 如果数据量较大(几百万条),可以使用 Cypher load csv 语句或者编程语言
- 如果数据量非常大(上亿条),可以使用 neo4j-admin 工具或者 ETL 工具
使用 Cypher load csv 语句导入数据的过程
1. 使用 Hive 生成对应的 csv 文件数据
- 注意 hive 表的文件格式
- hive 表插入数据后,会生成N个文件在 HDFS 目录
- 遍历HDFS数据文件,进行 load csv 操作
- 对每条数据生成唯一标识,方便日后进行增量更新
CREATE TABLE `node_company_table`(
`company_id` string COMMENT '企业唯一标识',
`company_name` string COMMENT '企业名称'
) COMMENT '节点-企业信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",", -- 字段分隔符
"quoteChar" = "\"", -- 引号字符
"escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;
CREATE TABLE `node_person_table`(
`person_id` string COMMENT '人物唯一标识',
`person_name` string COMMENT '人物名称'
) COMMENT '节点-人物信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",", -- 字段分隔符
"quoteChar" = "\"", -- 引号字符
"escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;
CREATE TABLE `rel_invest_company_person_table`(,
`company_id` string COMMENT '企业唯一标识',
`person_id` string COMMENT '人物唯一标识'
) COMMENT '关系-企业和人物的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",", -- 字段分隔符
"quoteChar" = "\"", -- 引号字符
"escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;
CREATE TABLE `rel_invest_company_table`(,
`company_id` string COMMENT '企业唯一标识',
`relation_company_id` string COMMENT '关联企业唯一标识'
) COMMENT '关系-企业和企业的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = ",", -- 字段分隔符
"quoteChar" = "\"", -- 引号字符
"escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;
2. 使用 python 读取 HDFS 目录
- 使用 HDFS WEBHDFS 服务读取数据目录
def get_files(self, dir_path):
httpfs = f"http://xx.xx.xx.x:14000/webhdfs/v1/
params = {
"op": "LISTSTATUS",
"user.name": "httpfs"
}
res = requests.get(url=httpfs + dir_path, params=params)
file_statuses = res.json()
return file_statuses['FileStatuses']['FileStatus']
3. 使用 Cypher load csv 语句导入数据
- apoc.load.csv 支持使用 hdfs 协议数据导入 所以就不再采用 httpfs 协议的方式进行远程 csv 文件导入
下面以导入 company 节点数据为例
def create_company_node():
dir_path = "hdfs_path/node_company_table/"
for file in get_files(dir_path):
cql1 = f"WITH '{hdfs_url}/{dir_path}/{file['pathSuffix']}' AS url CALL apoc.load.csv(url,{{sep:'\001',quoteChar:'\u0000'}}) " \
"yield lineNo,list return list"
cql2 = "MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0],company_name: list[1]}"
cql = f"CALL apoc.periodic.iterate(\"{cql1}\",\"{cql2}\",{{batchSize:1000, iterateList:true, parallel:true}})"
print(cql)
with self.driver.session() as session:
session.run(cql)
事务分批处理
为了处理大文件,CALL ... IN TRANSACTIONS
可以和LOAD CSV
一起使用,但你必须注意Eager操作可能会破坏这种行为。
在apoc中,你也可以将任何数据源与apoc.periodic.iterate
结合起来以实现同样的目的。
CALL apoc.periodic.iterate('
CALL apoc.load.csv({hdfs_file_path}) yield map as row return row
','
CREATE (p:Person) SET p = row
', {batchSize:10000, iterateList:true, parallel:true});
WITH {hdfs_file_path} AS url
CALL apoc.load.csv(url,{sep:'\001'}) yield lineNo,list
CALL {WITH list MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0], company_name: list[1]}}
IN TRANSACTIONS
总结
- 本次实践,导入企业节点约9000万、人物节点1.4亿、投资关系1.6亿,因没进行具体的时效记录,只能估算花费时间约1.5h
- 导入数据关系时,因采用
merge
操作出现了事务锁异常,导致部分batch
导入失败,采取了以下措施
- 优化 cypher 语句,使用 MERGE ON CREATE SET 语句来设置属性,减少不必要的 merge 操作
- 减小 apoc.periodic.iterate batchSize
- 执行 cypher 语句时,在脚本上做异常处理,记录异常,并重复执行。