使用Hadoop同步Neo4j数据(亿级)

简介: 企业和人物节点数据以及关系数据导入实践,使用hive生成csv文件,进行 apoc.load.csv 操作。

Hadoop环境下进行Neo4j数据(亿级)同步,企业和人物关系数据导入实践

使用场景

  1. 使用远程csv文件进行数据同步,而不是本地csv文件
  2. 使用hdfs协议进行数据传输
  3. 使用hive生成数据文件

环境准备

CDH6

Neo4j==5.4

apoc-5.4.0-extended.jar

apoc-5.4.1-core.jar

APOC

由于APOC依赖于Neo4j的内部API,因此您需要使用匹配APOC版本进行Neo4j安装。确保前两个版本号Neo4j和APOC之间匹配

转到此处查看所有APOC扩展版本,并下载二进制jar以放入$NEO4J_HOME/plugins文件夹中。

将jar文件移动到插件文件夹后,您必须重新启动neo4j`neo4j restart

官方文档说明

https://neo4j.com/labs/apoc/5/installation/

https://neo4j.com/labs/apoc/5/overview/apoc.load/apoc.load.csv/

Neo4j 导入数据的方式对比

导入数据的几种方式有:

  • 使用 Cypher create 语句,为每一条数据写一个 create
  • 使用 Cypher load csv 语句,将数据转成 CSV 格式,通过 LOAD CSV 读取数据
  • 使用 APOC 插件,它提供了很多导入和导出的函数和过程
  • 使用编程语言(Java,Python,JS,C#,Go)导入数据
  • 使用 neo4j-admin 工具导入数据
  • 使用 ETL 工具导入数据

这些方式的效率和难易度各有不同。一般来说:

  • 如果数据量很小(几千条),可以使用 Cypher create 语句或者 APOC 插件
  • 如果数据量较大(几百万条),可以使用 Cypher load csv 语句或者编程语言
  • 如果数据量非常大(上亿条),可以使用 neo4j-admin 工具或者 ETL 工具

使用 Cypher load csv 语句导入数据的过程

1. 使用 Hive 生成对应的 csv 文件数据

  • 注意 hive 表的文件格式
  • hive 表插入数据后,会生成N个文件在 HDFS 目录
  • 遍历HDFS数据文件,进行 load csv 操作
  • 对每条数据生成唯一标识,方便日后进行增量更新
CREATE TABLE `node_company_table`(
  `company_id` string COMMENT '企业唯一标识',
  `company_name` string COMMENT '企业名称'
) COMMENT '节点-企业信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `node_person_table`(
  `person_id` string COMMENT '人物唯一标识',
  `person_name` string COMMENT '人物名称'
) COMMENT '节点-人物信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_person_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `person_id` string COMMENT '人物唯一标识'
) COMMENT '关系-企业和人物的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `relation_company_id` string COMMENT '关联企业唯一标识'
) COMMENT '关系-企业和企业的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

2. 使用 python 读取 HDFS 目录

  • 使用 HDFS WEBHDFS 服务读取数据目录
def get_files(self, dir_path):
    httpfs = f"http://xx.xx.xx.x:14000/webhdfs/v1/
    params = {
        "op": "LISTSTATUS",
        "user.name": "httpfs"
    }
    res = requests.get(url=httpfs + dir_path, params=params)
    file_statuses = res.json()
    return file_statuses['FileStatuses']['FileStatus']

3. 使用 Cypher load csv 语句导入数据

  • apoc.load.csv 支持使用 hdfs 协议数据导入 所以就不再采用 httpfs 协议的方式进行远程 csv 文件导入
下面以导入 company 节点数据为例
def create_company_node():
    dir_path = "hdfs_path/node_company_table/"
    for file in get_files(dir_path):
        cql1 = f"WITH '{hdfs_url}/{dir_path}/{file['pathSuffix']}' AS url CALL apoc.load.csv(url,{{sep:'\001',quoteChar:'\u0000'}}) " \
                "yield lineNo,list return list"
        cql2 = "MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0],company_name: list[1]}"
        cql = f"CALL apoc.periodic.iterate(\"{cql1}\",\"{cql2}\",{{batchSize:1000, iterateList:true, parallel:true}})"
        print(cql)
        with self.driver.session() as session:
            session.run(cql)
事务分批处理

为了处理大文件,CALL ... IN TRANSACTIONS可以和LOAD CSV一起使用,但你必须注意Eager操作可能会破坏这种行为。
在apoc中,你也可以将任何数据源与apoc.periodic.iterate结合起来以实现同样的目的。

CALL apoc.periodic.iterate('
CALL apoc.load.csv({hdfs_file_path}) yield map as row return row
','
CREATE (p:Person) SET p = row
', {batchSize:10000, iterateList:true, parallel:true});
WITH {hdfs_file_path} AS url
CALL apoc.load.csv(url,{sep:'\001'}) yield lineNo,list 
CALL {WITH list MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0], company_name: list[1]}}
IN TRANSACTIONS

总结

  1. 本次实践,导入企业节点约9000万、人物节点1.4亿、投资关系1.6亿,因没进行具体的时效记录,只能估算花费时间约1.5h
  2. 导入数据关系时,因采用merge操作出现了事务锁异常,导致部分batch导入失败,采取了以下措施
  • 优化 cypher 语句,使用 MERGE ON CREATE SET 语句来设置属性,减少不必要的 merge 操作
  • 减小 apoc.periodic.iterate batchSize
  • 执行 cypher 语句时,在脚本上做异常处理,记录异常,并重复执行。
相关文章
|
2月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
37 1
|
19天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
56 4
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
90 3
|
2月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
43 2
|
2月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
70 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
105 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
51 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
57 0
|
5月前
|
分布式计算 Hadoop