使用Hadoop同步Neo4j数据(亿级)

简介: 企业和人物节点数据以及关系数据导入实践,使用hive生成csv文件,进行 apoc.load.csv 操作。

Hadoop环境下进行Neo4j数据(亿级)同步,企业和人物关系数据导入实践

使用场景

  1. 使用远程csv文件进行数据同步,而不是本地csv文件
  2. 使用hdfs协议进行数据传输
  3. 使用hive生成数据文件

环境准备

CDH6

Neo4j==5.4

apoc-5.4.0-extended.jar

apoc-5.4.1-core.jar

APOC

由于APOC依赖于Neo4j的内部API,因此您需要使用匹配APOC版本进行Neo4j安装。确保前两个版本号Neo4j和APOC之间匹配

转到此处查看所有APOC扩展版本,并下载二进制jar以放入$NEO4J_HOME/plugins文件夹中。

将jar文件移动到插件文件夹后,您必须重新启动neo4j`neo4j restart

官方文档说明

https://neo4j.com/labs/apoc/5/installation/

https://neo4j.com/labs/apoc/5/overview/apoc.load/apoc.load.csv/

Neo4j 导入数据的方式对比

导入数据的几种方式有:

  • 使用 Cypher create 语句,为每一条数据写一个 create
  • 使用 Cypher load csv 语句,将数据转成 CSV 格式,通过 LOAD CSV 读取数据
  • 使用 APOC 插件,它提供了很多导入和导出的函数和过程
  • 使用编程语言(Java,Python,JS,C#,Go)导入数据
  • 使用 neo4j-admin 工具导入数据
  • 使用 ETL 工具导入数据

这些方式的效率和难易度各有不同。一般来说:

  • 如果数据量很小(几千条),可以使用 Cypher create 语句或者 APOC 插件
  • 如果数据量较大(几百万条),可以使用 Cypher load csv 语句或者编程语言
  • 如果数据量非常大(上亿条),可以使用 neo4j-admin 工具或者 ETL 工具

使用 Cypher load csv 语句导入数据的过程

1. 使用 Hive 生成对应的 csv 文件数据

  • 注意 hive 表的文件格式
  • hive 表插入数据后,会生成N个文件在 HDFS 目录
  • 遍历HDFS数据文件,进行 load csv 操作
  • 对每条数据生成唯一标识,方便日后进行增量更新
CREATE TABLE `node_company_table`(
  `company_id` string COMMENT '企业唯一标识',
  `company_name` string COMMENT '企业名称'
) COMMENT '节点-企业信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `node_person_table`(
  `person_id` string COMMENT '人物唯一标识',
  `person_name` string COMMENT '人物名称'
) COMMENT '节点-人物信息'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_person_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `person_id` string COMMENT '人物唯一标识'
) COMMENT '关系-企业和人物的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

CREATE TABLE `rel_invest_company_table`(,
  `company_id` string COMMENT '企业唯一标识',
  `relation_company_id` string COMMENT '关联企业唯一标识'
) COMMENT '关系-企业和企业的投资关系'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",", -- 字段分隔符
  "quoteChar" = "\"", -- 引号字符
  "escapeChar" = "\\" -- 转义字符
)STORED AS TEXTFILE;

2. 使用 python 读取 HDFS 目录

  • 使用 HDFS WEBHDFS 服务读取数据目录
def get_files(self, dir_path):
    httpfs = f"http://xx.xx.xx.x:14000/webhdfs/v1/
    params = {
        "op": "LISTSTATUS",
        "user.name": "httpfs"
    }
    res = requests.get(url=httpfs + dir_path, params=params)
    file_statuses = res.json()
    return file_statuses['FileStatuses']['FileStatus']

3. 使用 Cypher load csv 语句导入数据

  • apoc.load.csv 支持使用 hdfs 协议数据导入 所以就不再采用 httpfs 协议的方式进行远程 csv 文件导入
下面以导入 company 节点数据为例
def create_company_node():
    dir_path = "hdfs_path/node_company_table/"
    for file in get_files(dir_path):
        cql1 = f"WITH '{hdfs_url}/{dir_path}/{file['pathSuffix']}' AS url CALL apoc.load.csv(url,{{sep:'\001',quoteChar:'\u0000'}}) " \
                "yield lineNo,list return list"
        cql2 = "MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0],company_name: list[1]}"
        cql = f"CALL apoc.periodic.iterate(\"{cql1}\",\"{cql2}\",{{batchSize:1000, iterateList:true, parallel:true}})"
        print(cql)
        with self.driver.session() as session:
            session.run(cql)
事务分批处理

为了处理大文件,CALL ... IN TRANSACTIONS可以和LOAD CSV一起使用,但你必须注意Eager操作可能会破坏这种行为。
在apoc中,你也可以将任何数据源与apoc.periodic.iterate结合起来以实现同样的目的。

CALL apoc.periodic.iterate('
CALL apoc.load.csv({hdfs_file_path}) yield map as row return row
','
CREATE (p:Person) SET p = row
', {batchSize:10000, iterateList:true, parallel:true});
WITH {hdfs_file_path} AS url
CALL apoc.load.csv(url,{sep:'\001'}) yield lineNo,list 
CALL {WITH list MERGE (n:Company {company_id: list[0]}) set n={company_id: list[0], company_name: list[1]}}
IN TRANSACTIONS

总结

  1. 本次实践,导入企业节点约9000万、人物节点1.4亿、投资关系1.6亿,因没进行具体的时效记录,只能估算花费时间约1.5h
  2. 导入数据关系时,因采用merge操作出现了事务锁异常,导致部分batch导入失败,采取了以下措施
  • 优化 cypher 语句,使用 MERGE ON CREATE SET 语句来设置属性,减少不必要的 merge 操作
  • 减小 apoc.periodic.iterate batchSize
  • 执行 cypher 语句时,在脚本上做异常处理,记录异常,并重复执行。
相关文章
|
2天前
|
存储 分布式计算 Hadoop
Hadoop:驭服数据洪流的利器
在当今信息大爆炸的时代,海量数据成为企业决策的重要依据。本文将介绍大规模数据处理框架Hadoop的概念与实践,探讨其在解决大数据应用中的重要性和优势。从分布式计算、高可靠性、扩展性等方面深入剖析Hadoop的工作原理,并结合实例说明如何利用Hadoop来处理海量数据,为读者提供了解和运用Hadoop的基础知识。
|
6月前
|
存储 分布式计算 Hadoop
Hadoop怎么处理数据
Hadoop怎么处理数据
106 0
|
1天前
|
存储 分布式计算 Hadoop
Hadoop数据合并技巧
【5月更文挑战第10天】Hadoop数据合并技巧
20 2
|
2天前
|
存储 SQL 分布式计算
Hadoop数据整合
【5月更文挑战第9天】Hadoop数据整合
17 2
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
|
2天前
|
分布式计算 关系型数据库 Hadoop
使用Sqoop将数据从Hadoop导出到关系型数据库
使用Sqoop将数据从Hadoop导出到关系型数据库
|
2天前
|
存储 Linux
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
112 1
|
2天前
|
分布式计算 Hadoop 大数据
大数据成长之路-- hadoop集群的部署(4)退役旧数据节点
大数据成长之路-- hadoop集群的部署(4)退役旧数据节点
55 0
|
9月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)