
Nebula Graph:一个开源的分布式图数据库。欢迎来 GitHub 交流:https://github.com/vesoft-inc/nebula
能力说明:
了解变量作用域、Java类的结构,能够创建带main方法可执行的java应用,从命令行运行java程序;能够使用Java基本数据类型、运算符和控制结构、数组、循环结构书写和运行简单的Java程序。
暂时未有相关云产品技术能力~
阿里云技能认证
详细说明摘要:本文主要介绍 Query 层的整体结构,并通过一条 nGQL 语句来介绍其通过 Query 层的四个主要模块的流程。 一、概述 分布式图数据库 Nebula Graph 2.0 版本相比 1.0 有较大改动,最明显的变化便是,在 1.0 版本中 Query、Storage 和 Meta 模块代码不作区分放在同一个代码仓中,而 Nebula Graph 2.0 开始在架构上先解耦成三个代码仓:nebula-graph、nebula-common 和 nebula-storage,其中 nebula-common 中主要是表达式的定义、函数定义和一些公共接口、nebula-graph 主要负责 Query 模块、nebula-storage 主要负责 Storage 和 Meta 模块。 本文主要介绍 Query 层的整体结构,并通过一条 nGQL 语句来介绍其通过 Query 层的四个主要模块的流程,由于 Nebula Graph 2.0 仍处于开发中,版本变化比较频繁,本文主要针对 2.0 的 nebula-graph 仓中 master 分支的 aea5befd179585c510fb83452cb82276a7756529 版本。 二、框架 Query 层主要框架如下所示: 主要分为 4 个子模块 Parser:词法语法解析模块 Validator:语句校验模块 Planner:执行计划和优化器模块 Executor:执行算子模块 三、代码结构 下面讲下 nebula-graph 的代码层次结构,如下所示 |--src |--context // 校验期和执行期上下文 |--daemons |--executor // 执行算子 |--mock |--optimizer // 优化规则 |--parser // 词法语法分析 |--planner // 执行计划结构 |--scheduler // 调度器 |--service |--util // 基础组件 |--validator // 语句校验 |--vistor 四、一个案例聊 Query 自 Nebula Graph v2.0 起,nGQL 的语法规则已经支持起始点的类型为 string ,正在兼容 1.0 的 int 类型。举个例子: GO FROM "Tim" OVER like WHERE like.likeness > 8.0 YIELD like._dst 上面的一条 nGQL 语句在 Nebula Graph 的 Query 层的数据流如下所示: 主要流程如下: 第一阶段:生成 AST 第一阶段:首先经过 Flex 和 Bison 组成的词法语法解析器模块 Parser 生成对应的 AST, 结构如下: 在此阶段 Parser 会拦截掉不符合语法规则的语句。举个例子,GO "Tim" FROM OVER like YIELD like._dst 这种语法使用错误的语句会在语法解析阶段直接被拦截。 第二阶段:校验 第二阶段:Validator 在 AST 上进行一系列的校验工作,主要工作如下: 元数据信息的校验 在解析 OVER 、 WHERE 和 YIELD 语句时,会查找 Schema,校验 edge、tag 的信息是否存在。或者在 INSERT 数据时校验插入数据类型和 Schema 中的是否一致 上下文引用校验 遇到多语句时,例如:$var = GO FROM "Tim" OVER like YIELD like._dst AS ID; GO FROM $var.ID OVER serve YIELD serve._dst ,Validator 会校验 $var.ID` 首先检查变量 `var` 是否定义,其次再检查属性 `ID` 是否属于变量 `var`, 如果是将 `$var.ID 替换为 $var1.ID` 或者 `$var.IID, 则会校验失败。 类型推断校验 推断表达式的结果属于什么类型,并根据具体的子句,校验类型是否正确。比如 WHERE 子句要求结果是 bool,null 或者 empty 。 *'' 展开** 例如,若输入语句为 GO FROM "Tim" OVER * YIELD like._dst, like.likeness, serve._dst,则在校验 OVER 子句时需要查询 Schema 将 * 展开为所有的边,假如 Schema 中只有 like 和 serve 两条边时,该语句会展开为:GO FROM "Tim" OVER serve, like YIELD like._dst, like.likeness, serve._dst 输入输出校验 遇到 PIPE 语句时,例如:GO FROM "Tim" OVER like YIELD like._dst AS ID | GO FROM $-.ID OVER serve YIELD serve._dst`,Validator 会校验 `$-.ID 由于 ID 在上一条语句中已经定义,则该子句合法,如果是将$-.ID 换为 `$-.a` 而此时 a 未定义,因此该子句非法。 第三阶段:生成可执行计划 第三阶段:经过 Validator 之后会生成一个可执行计划,其中执行计划的数据结构在 src/planner 目录下,其逻辑结构如下: Query 执行流 执行流:该执行计划是一个有向无环图,其中节点间的依赖关系在 Validator 中每个模块的 toPlan() 函数中确定,在这个例子中 Project 依赖 Filter, Filter 依赖 GetNeighbor,依次类推直到 Start 节点为止。 在执行阶段执行器会对每个节点生成一个对应的算子,并且从根节点(这个例子中是 Project 节点)开始调度,此时发现此节点依赖其他节点,就先递归调用依赖的节点,一直找到没有任何依赖的节点(此时为 Start 节点),然后开始执行,执行此节点后,继续执行此节点被依赖的其他节点(此时为 GetNeighbor 节点),一直到根节点为止。 Query 数据流 数据流:每个节点的输入输出也是在 toPlan() 中确定的, 虽然执行的时候会按照执行计划的先后关系执行,但是每个节点的输入并不完全依赖上个节点,可以自行定义,因为所有节点的输入、输出其实是存储在一个哈希表中的,其中 key 是在建立每个节点的时候自己定义的名称,假如哈希表的名字为 ResultMap,在建立 Filter 这个节点时,定义该节点从 ResultMap["GN1"] 中取数据,然后将结果放入 ResultMap["Filter2"] 中,依次类推,将每个节点的输入输出都确定好,该哈希表定义在 nebula-graph 仓下 src/context/ExecutionContext.cpp 中,因为执行计划并不是真正地执行,所以对应哈希表中每个 key 的 value 值都为空(除了开始节点,此时会将起始数据放入该节点的输入变量中),其值会在 Excutor 阶段被计算并填充。 这个例子比较简单,最后会放一个复杂点的例子以便更好地理解执行计划。 第四阶段:执行计划优化 第四阶段:执行计划优化。如果 etc/nebula-graphd.conf 配置文件中 enable_optimizer 设置为 true ,则会对执行计划的优化,例如上边的例子,当开启优化时: 此时会将 Filter 节点融入到 GetNeighbor 节点中,在执行阶段当 GetNeighbor 算子调用 Storage 层的接口获取一个点的邻边的时候,Storage 层内部会直接将不符合条件的边过滤掉,这样就可以极大的减少数据量的传输,俗称过滤下推。 在执行计划中,每个节点直接依赖另外一个节点。为了探索等价的变换和重用计划中相同的部分,会将节点的这种直接依赖关系转换为 OptGroupNode 与 OptGroup 的依赖。每个 OptGroup 中可以包含等价的 OptGroupNode 的集合,每个 OptGroupNode 都包含执行计划中的一个节点,同时 OptGroupNode 依赖的不再是 OptGroupNode 而是 OptGroup,这样从该 OptGroupNode 出发可以根据其依赖 OptGroup 中的不同的 OptGroupNode 拓展出很多等价的执行计划。同时 OptGroup 还可以被不同的 OptGroupNode 共用,节省存储的空间。 目前我们实现的所有优化规则认为是 RBO(rule-based optimization),即认为应用规则后的计划一定比应用前的计划要优。CBO(cost-based optimization) 目前正在同步开发。整个优化的过程是一个"自底向上"的探索过程,即对于每个规则而言,都会由执行计划的根节点(此例中是 Project 节点)开始,一步步向下找到最底层的节点,然后由该节点开始一步步向上探索每个 OptGroup 中的 OptGroupNode 是否匹配该规则,直到整个 Plan 都不能再应用该规则为止,再执行下一个规则的探索。 本例中的优化如下图所示: 例如,当搜索到 Filter 节点时,发现 Filter 节点的子节点是 GetNeighbors,和规则中事先定义的模式匹配成功,启动转换,将 Filter 节点融入到 GetNeighbors 节点中,然后移除掉 Filter 节点,继续匹配下一个规则。 优化的代码在 nebula-graph 仓下 src/optimizer/ 目录下。 第五阶段:执行 第五阶段:最后 Scheduler 会根据执行计划生成对应的执行算子,从叶子节点开始执行,一直到根节点结束。其结构如下: 其中每一个执行计划节点都一一对应一个执行算子节点,其输入输出在执行计划期间已经确定,每个算子只需要拿到输入变量中的值然后进行计算,最后将计算结果放入对应的输出变量中即可,所以只需要从开始节点一步步执行,最后一个算子的结果会作为最终结果返回给用户。 五、实例 下面执行一个最短路径的实例看看执行计划的具体结构,打开 nebula-console, 输入下面语句FIND SHORTEST PATH FROM "YAO MING" TO "Tim Duncan" OVER like, serve UPTO 5 STEPS ,在这条语句前加 EXPLAIN 关键字就可以得到该语句生成的执行计划详细信息: 上图从左到右依次显示执行计划中每个节点的唯一 ID、节点的名称、该节点所依赖的节点 ID、profiling data(执行 profile 命令时的信息)、该节点的详细信息(包括输入输出变量名称,输出结果的列名,节点的参数信息)。 如果想要可视化一点可以在这条语句前加 EXPLAIN format="dot",这时候 nebula-console 会生成 dot 格式的数据,然后打开 Graphviz Online 这个网站将生成的 dot 数据粘贴上去,就可以看到如下结构,该结构对应着执行阶段各个算子的执行流程。 因为最短路径使用了双向广度搜索算法分别从"YAO MING" 和 "Tim Duncan" 两边同时扩展,所以中间的 GetNeighbors、BFSShortest、 Project、 Dedup 分别有两个算子,通过 PassThrough 算子连接输入,由 ConjunctPath 算子拼接路径。然后由 LOOP 算子控制向外扩展的步数,可以看到 DataCollect 算子的输入其实是从 ConjuctPath 算子的输出变量中取值的。 各个算子的信息在 nebula-graph 仓下的 src/executor 目录下。 作者有话说:Hi,我是明泉,是图数据 Nebula Graph 研发工程师,主要工作和数据库查询引擎相关,希望本次的经验分享能给大家带来帮助,如有不当之处也希望能帮忙纠正,谢谢~ 喜欢这篇文章?来来来,给我们的 GitHub 点个 star 表鼓励啦~~ ♂️♀️ [手动跪谢] 交流图数据库技术?交个朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你进交流群~~ 推荐阅读 Nebula 架构剖析系列(二)图数据库的查询引擎设计
摘要:本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采用类似 Flink 提供的 Flink Connector 形式,支持 Flink 读写分布式图数据库 Nebula Graph。 文章首发 Nebula Graph 官网博客:https://nebula-graph.com.cn/posts/nebula-flink-connector/ 在关系网络分析、关系建模、实时推荐等场景中应用图数据库作为后台数据支撑已相对普及,且部分应用场景对图数据的实时性要求较高,如推荐系统、搜索引擎。为了提升数据的实时性,业界广泛应用流式计算对更新的数据进行增量实时处理。为了支持对图数据的流式计算,Nebula Graph 团队开发了 Nebula Flink Connector,支持利用 Flink 进行 Nebula Graph 图数据的流式处理和计算。 Flink 是新一代流批统一的计算引擎,它从不同的第三方存储引擎中读取数据,并进行处理,再写入另外的存储引擎中。Flink Connector 的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。 与外界进行数据交换时,Flink 支持以下 4 种方式: Flink 源码内部预定义 Source 和 Sink 的 API; Flink 内部提供了 Bundled Connectors,如 JDBC Connector。 Apache Bahir 项目中提供连接器 Apache Bahir 最初是从 Apache Spark 中独立出来的项目,以提供不限于 Spark 相关的扩展/插件、连接器和其他可插入组件的实现。 通过异步 I/O 方式。 流计算中经常需要与外部存储系统交互,比如需要关联 MySQL 中的某个表。一般来说,如果用同步 I/O 的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。异步 I/O 则可以并发处理多个请求,提高吞吐,减少延迟。 本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采用类似 Flink 提供的 Flink Connector 形式,支持 Flink 读写分布式图数据库 Nebula Graph。 一、Connector Source Flink 作为一款流式计算框架,它可处理有界数据,也可处理无界数据。所谓无界,即源源不断的数据,不会有终止,实时流处理所处理的数据便是无界数据;批处理的数据,即有界数据。而 Source 便是 Flink 处理数据的数据来源。 Nebula Flink Connector 中的 Source 便是图数据库 Nebula Graph。Flink 提供了丰富的 Connector 组件允许用户自定义数据源来连接外部数据存储系统。 1.1 Source 简介 Flink 的 Source 主要负责外部数据源的接入,Flink 的 Source 能力主要是通过 read 相关的 API 和 addSource 方法这 2 种方式来实现数据源的读取,使用 addSource 方法对接外部数据源时,可以使用 Flink Bundled Connector,也可以自定义 Source。 Flink Source 的几种使用方式如下: 本章主要介绍如何通过自定义 Source 方式实现 Nebula Graph Source。 1.2 自定义 Source 在 Flink 中可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 和 ExecutionEnvironment.createInput(inputFormat) 两种方式来为你的程序添加数据来源。 Flink 已经提供多个内置的 source functions ,开发者可以通过继承 RichSourceFunction来自定义非并行的 source ,通过继承 RichParallelSourceFunction 来自定义并行的 Source 。RichSourceFunction 和 RichParallelSourceFunction 是 SourceFunction 和 RichFunction 特性的结合。 其中SourceFunction 负责数据的生成, RichFunction 负责资源的管理。当然,也可以只实现 SourceFunction 接口来定义最简单的只具备获取数据功能的 dataSource 。 通常自定义一个完善的 Source 节点是通过实现 RichSourceFunction 类来完成的,该类兼具 RichFunction 和 SourceFunction 的能力,因此自定义 Flink 的 Nebula Graph Source 功能我们需要实现 RichSourceFunction 中提供的方法。 1.3 自定义 Nebula Graph Source 实现原理 Nebula Flink Connector 中实现的自定义 Nebula Graph Source 数据源提供了两种使用方式,分别是 addSource 和 createInput 方式。 Nebula Graph Source 实现类图如下: (1)addSource 该方式是通过 NebulaSourceFunction 类实现的,该类继承自 RichSourceFunction 并实现了以下方法: open准备 Nebula Graph 连接信息,并获取 Nebula Graph Meta 服务和 Storage 服务的连接。 close数据读取完成,释放资源。关闭 Nebula Graph 服务的连接。 run开始读取数据,并将数据填充到 sourceContext。 cancel取消 Flink 作业时调用,关闭资源。 (2)createInput 该方式是通过 NebulaInputFormat 类实现的,该类继承自 RichInputFormat 并实现了以下方法: openInputFormat准备 inputFormat,获取连接。 closeInputFormat数据读取完成,释放资源,关闭 Nebula Graph 服务的连接。 getStatistics 获取数据源的基本统计信息。 createInputSplits 基于配置的 partition 参数创建 GenericInputSplit。 getInputSplitAssigner 返回输入的 split 分配器,按原始计算的顺序返回 Source 的所有 split。 open开始 inputFormat 的数据读取,将读取的数据转换 Flink 的数据格式,构造迭代器。 close数据读取完成,打印读取日志。 reachedEnd是否读取完成 nextRecord通过迭代器获取下一条数据 通过 addSource 读取 Source 数据得到的是 Flink 的 DataStreamSource,表示 DataStream 的起点。 通过 createInput 读取数据得到的是 Flink 的 DataSource,DataSource 是一个创建新数据集的 Operator,这个 Operator 可作为进一步转换的数据集。DataSource 可以通过 withParameters 封装配置参数进行其他的操作。 1.4 自定义 Nebula Graph Source 应用实践 使用 Flink 读取 Nebula Graph 图数据时,需要构造 NebulaSourceFunction 和 NebulaOutputFormat,并通过 Flink 的 addSource 或 createInput 方法注册数据源进行 Nebula Graph 数据读取。 构造 NebulaSourceFunction 和 NebulaOutputFormat 时需要进行客户端参数的配置和执行参数的配置,说明如下: 配置项说明: NebulaClientOptions 配置 address,NebulaSource 需要配置 Nebula Graph Metad 服务的地址。 配置 username 配置 password VertexExecutionOptions 配置 GraphSpace 配置要读取的 tag 配置要读取的字段集 配置是否读取所有字段,默认为 false, 若配置为 true 则字段集配置无效 配置每次读取的数据量 limit,默认 2000 EdgeExecutionOptions 配置 GraphSpace 配置要读取的 edge 配置要读取的字段集 配置是否读取所有字段,默认为 false, 若配置为 true 则字段集配置无效 配置每次读取的数据量 limit,默认 2000 // 构造 Nebula Graph 客户端连接需要的参数 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:45500") .build(); // 创建 connectionProvider NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); // 构造 Nebula Graph 数据读取需要的参数 List<String> cols = Arrays.asList("name", "age"); VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") .setTag(tag) .setFields(cols) .setLimit(100) .builder(); // 构造 NebulaInputFormat NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); // 方式 1 使用 createInput 方式注册 Nebula Graph 数据源 DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment() .createInput(inputFormat); // 方式 2 使用 addSource 方式注册 Nebula Graph 数据源 NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment() .addSource(sourceFunction); Nebula Source Demo 编写完成后可以打包提交到 Flink 集群执行。 示例程序读取 Nebula Graph 的点数据并打印,该作业以 Nebula Graph 作为 Source,以 print 作为 Sink,执行结果如下: Source sent 数据为 59,671,064 条,Sink received 数据为 59,671,064 条。 二、Connector Sink Nebula Flink Connector 中的 Sink 即 Nebula Graph 图数据库。Flink 提供了丰富的 Connector 组件允许用户自定义数据池来接收 Flink 所处理的数据流。 2.1 Sink 简介 Sink 是 Flink 处理完 Source 后数据的输出,主要负责实时计算结果的输出和持久化。比如:将数据流写入标准输出、写入文件、写入 Sockets、写入外部系统等。 Flink 的 Sink 能力主要是通过调用数据流的 write 相关 API 和 DataStream.addSink 两种方式来实现数据流的外部存储。 类似于 Flink Connector 的 Source,Sink 也允许用户自定义来支持丰富的外部数据系统作为 Flink 的数据池。 Flink Sink 的使用方式如下: 本章主要介绍如何通过自定义 Sink 的方式实现 Nebula Graph Sink。 2.2 自定义 Sink 在 Flink 中可以使用 DataStream.addSink 和 DataStream.writeUsingOutputFormat 的方式将 Flink 数据流写入外部自定义数据池。 Flink 已经提供了若干实现好了的 Sink Functions ,也可以通过实现 SinkFunction 以及继承 RichOutputFormat 来实现自定义的 Sink。 2.3 自定义 Nebula Graph Sink 实现原理 Nebula Flink Connector 中实现了自定义的 NebulaSinkFunction,开发者通过调用 DataSource.addSink 方法并将 NebulaSinkFunction 对象作为参数传入即可实现将 Flink 数据流写入 Nebula Graph。 Nebula Flink Connector 使用的是 Flink 的 1.11-SNAPSHOT 版本,该版本中已经废弃了使用 writeUsingOutputFormat 方法来定义输出端的接口。 源码如下,所以请注意在使用自定义 Nebula Graph Sink 时请采用 DataStream.addSink 的方式。 /** @deprecated */ @Deprecated @PublicEvolving public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) { return this.addSink(new OutputFormatSinkFunction(format)); } Nebula Graph Sink 实现类图如下: 其中最重要的两个类是 NebulaSinkFunction 和 NebulaBatchOutputFormat。 NebulaSinkFunction 继承自 AbstractRichFunction 并实现了以下方法: open调用 NebulaBatchOutputFormat 的 open 方法,进行资源准备。 close调用 NebulaBatchOutputFormat 的 close 方法,进行资源释放。 invoke是 Sink 中的核心方法, 调用 NebulaBatchOutputFormat 中的 write 方法进行数据写入。 flush调用 NebulaBatchOutputFormat 的 flush 方法进行数据的提交。 NebulaBatchOutputFormat 继承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 继承自 RichOutputFormat,主要实现的方法有: open准备图数据库 Nebula Graph 的 Graphd 服务的连接,并初始化数据写入执行器 nebulaBatchExecutor close提交最后批次数据,等待最后提交的回调结果并关闭服务连接等资源。 writeRecord核心方法,将数据写入 nebulaBufferedRow 中,并在达到配置的批量写入 Nebula Graph 上限时提交写入。Nebula Graph Sink 的写入操作是异步的,所以需要执行回调来获取执行结果。 flush当 bufferRow 存在数据时,将数据提交到 Nebula Graph 中。 在 AbstractNebulaOutputFormat 中调用了 NebulaBatchExecutor 进行数据的批量管理和批量提交,并通过定义回调函数接收批量提交的结果,代码如下: /** * write one record to buffer */ @Override public final synchronized void writeRecord(T row) throws IOException { nebulaBatchExecutor.addToBatch(row); if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) { commit(); } } /** * put record into buffer * * @param record represent vertex or edge */ void addToBatch(T record) { boolean isVertex = executionOptions.getDataType().isVertex(); NebulaOutputFormatConverter converter; if (isVertex) { converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions); } else { converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions); } String value = converter.createValue(record, executionOptions.getPolicy()); if (value == null) { return; } nebulaBufferedRow.putRow(value); } /** * commit batch insert statements */ private synchronized void commit() throws IOException { graphClient.switchSpace(executionOptions.getGraphSpace()); future = nebulaBatchExecutor.executeBatch(graphClient); // clear waiting rows numPendingRow.compareAndSet(executionOptions.getBatch(),0); } /** * execute the insert statement * * @param client Asynchronous graph client */ ListenableFuture executeBatch(AsyncGraphClientImpl client) { String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields()); String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows()); // construct insert statement String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values); // execute insert statement ListenableFuture<Optional<Integer>> execResult = client.execute(exec); // define callback function Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() { @Override public void onSuccess(Optional<Integer> integerOptional) { if (integerOptional.isPresent()) { if (integerOptional.get() == ErrorCode.SUCCEEDED) { LOG.info("batch insert Succeed"); } else { LOG.error(String.format("batch insert Error: %d", integerOptional.get())); } } else { LOG.error("batch insert Error"); } } @Override public void onFailure(Throwable throwable) { LOG.error("batch insert Error"); } }); nebulaBufferedRow.clean(); return execResult; } 由于 Nebula Graph Sink 的写入是批量、异步的,所以在最后业务结束 close 资源之前需要将缓存中的批量数据提交且等待写入操作的完成,以防在写入提交之前提前把 Nebula Graph Client 关闭,代码如下: /** * commit the batch write operator before release connection */ @Override public final synchronized void close() throws IOException { if(numPendingRow.get() > 0){ commit(); } while(!future.isDone()){ try { Thread.sleep(10); } catch (InterruptedException e) { LOG.error("sleep interrupted, ", e); } } super.close(); } 2.4 自定义 Nebula Graph Sink 应用实践 Flink 将处理完成的数据 Sink 到 Nebula Graph 时,需要将 Flink 数据流进行 map 转换成 Nebula Graph Sink 可接收的数据格式。自定义 Nebula Graph Sink 的使用方式是通过 addSink 形式,将 NebulaSinkFunction 作为参数传给 addSink 方法来实现 Flink 数据流的写入。 NebulaClientOptions 配置 address,NebulaSource 需要配置 Nebula Graph Graphd 服务的地址。 配置 username 配置 password VertexExecutionOptions 配置 GraphSpace 配置要写入的 tag 配置要写入的字段集 配置写入的点 ID 所在 Flink 数据流 Row 中的索引 配置批量写入 Nebula Graph 的数量,默认 2000 EdgeExecutionOptions 配置 GraphSpace 配置要写入的 edge 配置要写入的字段集 配置写入的边 src-id 所在 Flink 数据流 Row 中的索引 配置写入的边 dst-id 所在 Flink 数据流 Row 中的索引 配置写入的边 rank 所在 Flink 数据流 Row 中的索引,不配则无 rank 配置批量写入 Nebula Graph 的数量,默认 2000 /// 构造 Nebula Graphd 客户端连接需要的参数 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:3699") .build(); NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); // 构造 Nebula Graph 写入操作参数 List<String> cols = Arrays.asList("name", "age") ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSink") .setTag(tag) .setFields(cols) .setIdIndex(0) .setBatch(20) .builder(); // 写入 Nebula Graph dataSource.addSink(nebulaSinkFunction); Nebula Graph Sink 的 Demo 程序以 Nebula Graph 的 space:flinkSource 作为 Source 读取数据,进行 map 类型转换后 Sink 入 Nebula Graph 的 space:flinkSink,对应的应用场景为将 Nebula Graph 中一个 space 的数据流入另一个 space 中。 三、 Catalog Flink 1.11.0 之前,用户如果依赖 Flink 的 Source/Sink 读写外部数据源时,必须要手动读取对应数据系统的 Schema。比如,要读写 Nebula Graph,则必须先保证明确地知晓在 Nebula Graph 中的 Schema 信息。但是这样会有一个问题,当 Nebula Graph 中的 Schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。 1.11.0 版本后,用户使用 Flink Connector 时可以自动获取表的 Schema。可以在不了解外部系统数据 Schema 的情况下进行数据匹配。 目前 Nebula Flink Connector 中已支持数据的读写,要实现 Schema 的匹配则需要为 Flink Connector 实现 Catalog 的管理。但为了确保 Nebula Graph 中数据的安全性,Nebula Flink Connector 只支持 Catalog 的读操作,不允许进行 Catalog 的修改和写入。 访问 Nebula Graph 指定类型的数据时,完整路径应该是以下格式:<graphSpace>.<VERTEX.tag> 或者 <graphSpace>.<EDGE.edge> 具体使用方式如下: String catalogName = "testCatalog"; String defaultSpace = "flinkSink"; String username = "root"; String password = "nebula"; String address = "127.0.0.1:45500"; String table = "VERTEX.player" // define Nebula catalog Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password); // define Flink table environment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); tEnv = StreamTableEnvironment.create(bsEnv); // register customed nebula catalog tEnv.registerCatalog(catalogName, catalog); // use customed nebula catalog tEnv.useCatalog(catalogName); // show graph spaces of Nebula Graph String[] spaces = tEnv.listDatabases(); // show tags and edges of Nebula Graph tEnv.useDatabase(defaultSpace); String[] tables = tEnv.listTables(); // check tag player exist in defaultSpace ObjectPath path = new ObjectPath(defaultSpace, table); assert catalog.tableExists(path) == true // get nebula tag schema CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table)); table.getSchema(); Nebula Flink Connector 支持的其他 Catalog 接口请查看 GitHub 代码 NebulaCatalog.java。 四、 Exactly-once Flink Connector 的 Exactly-once 是指 Flink 借助于 checkpoint 机制保证每个输入事件只对最终结果影响一次,在数据处理过程中即使出现故障,也不会存在数据重复和丢失的情况。 为了提供端到端的 Exactly-once 语义,Flink 的外部数据系统也必须提供提交或回滚的方法,然后通过 Flink 的 checkpoint 机制协调。Flink 提供了实现端到端的 Exactly-once 的抽象,即实现二阶段提交的抽象类 TwoPhaseCommitSinkFunction。 想为数据输出端实现 Exactly-once,则需要实现四个函数: beginTransaction在事务开始前,在目标文件系统的临时目录创建一个临时文件,随后可以在数据处理时将数据写入此文件。 preCommit在预提交阶段,关闭文件不再写入。为下一个 checkpoint 的任何后续文件写入启动一个新事务。 commit在提交阶段,将预提交阶段的文件原子地移动到真正的目标目录。二阶段提交过程会增加输出数据可见性的延迟。 abort在终止阶段,删除临时文件。 根据上述函数可看出,Flink 的二阶段提交对外部数据源有要求,即 Source 数据源必须具备重发功能,Sink 数据池必须支持事务提交和幂等写。 Nebula Graph v1.1.0 虽然不支持事务,但其写入操作是幂等的,即同一条数据的多次写入结果是一致的。因此可以通过 checkpoint 机制实现 Nebula Flink Connector 的 At-least-Once 机制,根据多次写入的幂等性可以间接实现 Sink 的 Exactly-once。 要使用 Nebula Graph Sink 的容错性,请确保在 Flink 的执行环境中开启了 checkpoint 配置: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000) // checkpoint every 10000 msecs .getCheckpointConfig() .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); Reference Nebula Source Demo [testNebulaSource]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java Nebula Sink Demo [testSourceSink]:https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java Apache Flink 源码:https://github.com/apache/flink ApacheFlink 零基础入门:https://www.infoq.cn/theme/28 Flink 文档:https://flink.apache.org/flink-architecture.html Flink 实践文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/ flink-connector-jdbc 源码:https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc Flink JDBC Catalog 详解:https://cloud.tencent.com/developer/article/1697913 喜欢这篇文章?来来来,给我们的 GitHub 点个 star 表鼓励啦~~ ♂️♀️ [手动跪谢] 交流图数据库技术?交个朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你进交流群~~
摘要:Nebula Operator 是 Nebula Graph 在 Kubernetes 系统上的自动化部署运维插件。在本文,你将了解到 Nebula Operator 的特性及它的工作原理。 从 Nebula Graph 的架构谈起 Nebula Graph 是一个高性能的分布式开源图数据库,从架构上可以看出,一个完整的 Nebula Graph 集群由三类服务组成,即 Meta Service, Query Service(Computation Layer)和 Storage Service(Storage Layer)。 每类服务都是一个由多副本组件组成的集群,在 Nebula Operator 中,我们分别称这三类组件为: Metad / Graphd / Storaged。 Metad:主要负责提供和存储图数据库的元数据,并承担集群中调度器的角色,指挥存储扩容和数据迁移,leader 变更等运维操作。 Graphd:主要负责处理 Nebula 查询语言语句(nGQL),每个 Graphd 都运行着一个无状态的查询计算引擎,且彼此间无任何通信关系。计算引擎仅从 Metad 集群中读取元信息,并和 Storaged 集群进行交互。同时,它也负责不同客户端的接入和交互。 Storaged:主要负责 Graph 数据存储。图数据被切分成很多的分片 Partition,相同 ID 的 Partition 组成一个 Raft Group,实现多副本一致性。Nebula Graph 默认的存储引擎是 RocksDB 的 Key-Value 存储。 在了解了 Nebula Graph 核心组件的功能后,我们可以得出一些结论: Nebula Graph 在设计上采用了存储计算分离的架构,组件间分层清晰,职责明确,这意味着各个组件都可以根据自身的业务需求进行独立地弹性扩容、缩容,非常适合部署在 Kubernetes 这类容器编排系统上,充分发挥 Nebula Graph 集群的弹性扩缩能力。 Nebula Graph 是一个较为复杂的分布式系统,它的部署和运维操作需要比较深入的领域知识,这带来了颇高的学习成本和负担。即使是部署运行在 Kubernetes 系统之上,有状态应用的状态管理、异常处理等需求,原生的Kubernetes 控制器也不能很好的满足,导致 Nebula Graph 集群不能发挥出它最大的能力。 因此,为了充分发挥 Nebula Graph 原生具备的弹性扩缩、故障转移等能力,也为了降低对 Nebula Graph 集群的运维管理门槛,我们开发了 Nebula Operator。 Nebula Operator 是 Nebula Graph 在 Kubernetes 系统上的自动化部署运维插件,依托于 Kubernetes 自身优秀的扩展机制,我们把 Nebula Graph 运维领域的知识,以 CRD + Controller 的形式全面注入到 Kubernetes 系统中,让 Nebula Graph 成为真正的云原生图数据库。 为了能够更好的理解 Nebula Operator 的工作原理,让我们先回顾一下什么是 Operator 什么是 Nebula Operator Operator 并不是什么很新的概念,早在 2017 年,就有 CoreOS 公司推出了 Etcd Operator。Operator 的初衷是为了扩展 Kubernetes 功能,以更好的管理有状态应用,这得益于 Kubernetes 的两大核心概念:声明式 API 和控制循环(Control Loop)。 我们可以用一段伪代码来描述这一过程。 在集群中声明对象X的期望状态并创建X for { 实际状态 := 获取集群中对象 X 的实际状态 期望状态 := 获取集群中对象 X 的期望状态 if 实际状态 == 期望状态 { 什么都不做 } else { 执行事先规定好的编排动作,将实际状态调协为期望状态 } } 在 Kubernetes 系统内,每一种内置资源对象,都运行着一个特定的控制循环,将它的实际状态通过事先规定好的编排动作,逐步调整为最终的期望状态。 对于 Kubernetes 系统内不存在的资源类型,我们可以通过添加自定义 API 对象的方式注册。常见的方法是使用 CustomResourceDefinition(CRD)和 Aggregation ApiServer(AA)。Nebula Operator 就使用 CRD 注册了一个 "Nebula Cluster" 资源,和一个 "Advanced Statefulset" 资源。 在注册了上述自定义资源之后,我们就可以通过编写自定义控制器的方式来感知自定义资源的状态变化,并按照我们编写的策略和逻辑去自动地运维 Nebula Graph,让集群的实际状态朝着期望状态趋近。这也是 Nebula Operator 降低用户运维门槛的核心原理。 apiVersion: nebula.com/v1alpha1 kind: NebulaCluster metadata: name: nebulaclusters namespace: default spec: graphd: replicas: 1 baseImage: vesoft/nebula-graphd imageVersion: v2-preview-nightly service: type: NodePort externalTrafficPolicy: Cluster storageClaim: storageClassName: fast-disks metad: replicas: 3 baseImage: vesoft/nebula-metad imageVersion: v2-preview-nightly storageClaim: storageClassName: fast-disks storaged: replicas: 3 baseImage: vesoft/nebula-storaged imageVersion: v2-preview-nightly storageClaim: storageClassName: fast-disks schedulerName: nebula-scheduler imagePullPolicy: Always 我们在这里展示了一个简单的 Nebula Cluster 实例,如果你想要扩展 Storaged 的副本数量至 10,你只需要简单修改 .spec.storaged.replicas 参数为 10,剩下的运维操作则由 Nebula Operator 通过控制循环来完成。 至此,想必你已经对 Nebula Graph 和 Operator 有了一个初步的认知,接下来,让我们来列举目前 Nebula Operator 已经具备了哪些能力,让你能更加深刻的体会到使用 Nebula Operator 带来的一些实际好处。 部署、卸载:我们将一整个 Nebula Graph 集群描述成一个 CRD 注册进 ApiServer 中,用户只需提供对应的 CR 文件,Operator 就能快速拉起或者删除一个对应的 Nebula Graph 集群,简化了用户部署、卸载集群的过程。 扩容、缩容:通过在控制循环中调用 Nebula Graph 原生提供的扩缩容接口,我们为 Nebula Operator 封装实现了扩缩容的逻辑,可以通过 yaml 配置进行简单的扩容,缩容,且保证数据的稳定性。 原地升级:我们在 Kubernetes 原生提供的 StatefulSet 基础上为其扩展了镜像原地替换的能力,它节省了 Pod 调度的耗时,并且在升级时,Pod 的位置、资源都不发生变化,极大提高了升级时集群的稳定性和确定性。 故障迁移:Nebula Operator 会内部调用 Nebula Graph 集群提供的接口,动态的感知服务是否正常运行,一旦发现异常,会自动的去做故障迁移操作,并根据错误类型配有对应的容错机制。 WebHook:一个标准的 Nebula Graph 最少需要三个 Metad 副本,如果用户错误地修改了此参数,可能会导致集群不可用,我们会通过 WebHook 的准入控制来检查一些必要参数是否设置正确,并通过变更控制来强制修改一些错误的声明,使集群始终能够稳定运行。 参考资料 Nebula Graph:https://github.com/vesoft-inc/nebula 作者有话说:Hi,我是刘鑫超,图数据库 Nebula Graph 的研发工程师,如果你对此文有疑问,欢迎来我们的 Nebula Graph 论坛交流下心得~~
首发于官方博客:https://nebula-graph.com.cn/posts/debug-nebula-graph-processes-docker/ 摘要:本文以 Nebula Graph 进程为例,讲解如何不破坏原有容器的内容,也不用在其中安装任何的工具包前提下,像在本地一样来调试进程 需求 在开发或者测试过程中,我们经常会用到 vesoft-inc/nebula-docker-compose 这个 repo 下的部署方式,因为当初为了尽可能的压缩每个 Nebula Graph 服务的 docker 镜像的体积,所以开发过程中常用的一切工具都没有安装,甚至连编辑器 VIM 都没有。 这给我们在容器内部定位问题带来一定的难度,因为每次只能去 install 一些工具包,才能开展接下来的工作,甚是费事。其实调试容器内部的进程还有另外一种方式,不需要破坏原有容器的内容,也不用在其中安装任何的工具包就能像在本地一样来调试。 这种技术在 k8s 环境下其实已经挺常用,就是 sidecar 模式。原理也比较朴素就是再起一个容器然后让这个容器跟你要调试的容器共享相同的 pid/network 的 namespace。这样原容器中的进程和网络空间在调试容器中就能“一览无余”,而在调试容器中安装了你想要的一切顺手工具,接下来的舞台就是留于你发挥了。 演示 接下来我就演示一下如何操作: 我们先用上述的 docker-compose 方式在本地部署一套 Nebula Graph 集群,教程见 repo 中的 README。部署好后的结果如下: $ docker-compose up -d Creating network "nebula-docker-compose_nebula-net" with the default driver Creating nebula-docker-compose_metad1_1 ... done Creating nebula-docker-compose_metad2_1 ... done Creating nebula-docker-compose_metad0_1 ... done Creating nebula-docker-compose_storaged2_1 ... done Creating nebula-docker-compose_storaged1_1 ... done Creating nebula-docker-compose_storaged0_1 ... done Creating nebula-docker-compose_graphd_1 ... done $ docker-compose ps Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- nebula-docker-compose_graphd_1 ./bin/nebula-graphd --flag ... Up (health: starting) 0.0.0.0:32907->13000/tcp, 0.0.0.0:32906->13002/tcp, 0.0.0.0:3699->3699/tcp nebula-docker-compose_metad0_1 ./bin/nebula-metad --flagf ... Up (health: starting) 0.0.0.0:32898->11000/tcp, 0.0.0.0:32896->11002/tcp, 45500/tcp, 45501/tcp nebula-docker-compose_metad1_1 ./bin/nebula-metad --flagf ... Up (health: starting) 0.0.0.0:32895->11000/tcp, 0.0.0.0:32894->11002/tcp, 45500/tcp, 45501/tcp nebula-docker-compose_metad2_1 ./bin/nebula-metad --flagf ... Up (health: starting) 0.0.0.0:32899->11000/tcp, 0.0.0.0:32897->11002/tcp, 45500/tcp, 45501/tcp nebula-docker-compose_storaged0_1 ./bin/nebula-storaged --fl ... Up (health: starting) 0.0.0.0:32901->12000/tcp, 0.0.0.0:32900->12002/tcp, 44500/tcp, 44501/tcp nebula-docker-compose_storaged1_1 ./bin/nebula-storaged --fl ... Up (health: starting) 0.0.0.0:32903->12000/tcp, 0.0.0.0:32902->12002/tcp, 44500/tcp, 44501/tcp nebula-docker-compose_storaged2_1 ./bin/nebula-storaged --fl ... Up (health: starting) 0.0.0.0:32905->12000/tcp, 0.0.0.0:32904->12002/tcp, 44500/tcp, 44501/tcp 这时我们分两个场景来演示,一个是进程空间,一个是网络空间。首先我们要先有一个顺手的调试镜像,我们就不自己构建了,从 docker hub 中找个已经打包好的用作演示,后期觉得不够用,我们可以维护一份 nebula-debug 的镜像,安装我们想要的所有调试工具,此处先借用社区内的方案 nicolaka/netshoot。我们先把镜像拉取到本地: $ docker pull nicolaka/netshoot $ docker images REPOSITORY TAG IMAGE ID CREATED SIZE vesoft/nebula-graphd nightly c67fe54665b7 36 hours ago 282MB vesoft/nebula-storaged nightly 5c77dbcdc507 36 hours ago 288MB vesoft/nebula-console nightly f3256c99eda1 36 hours ago 249MB vesoft/nebula-metad nightly 5a78d3e3008f 36 hours ago 288MB nicolaka/netshoot latest 6d7e8891c980 2 months ago 352MB 我们先看看直接执行这个镜像会是什么样: $ docker run --rm -ti nicolaka/netshoot bash bash-5.0# ps PID USER TIME COMMAND 1 root 0:00 bash 8 root 0:00 ps bash-5.0# 上面显示这个容器看不到任何 Nebula Graph 服务进程的内容,那么我们给其加点参数再看看: $ docker run --rm -ti --pid container:nebula-docker-compose_metad0_1 --cap-add sys_admin nicolaka/netshoot bash bash-5.0# ps PID USER TIME COMMAND 1 root 0:03 ./bin/nebula-metad --flagfile=./etc/nebula-metad.conf --daemonize=false --meta_server_addrs=172.28.1.1:45500,172.28.1.2:45500,172.28.1.3:45500 --local_ip=172.28.1.1 --ws_ip=172.28.1.1 --port=45500 --data_path=/data/meta --log_dir=/logs --v=15 --minloglevel=0 452 root 0:00 bash 459 root 0:00 ps bash-5.0# ls -al /proc/1/net/ total 0 dr-xr-xr-x 6 root root 0 Sep 18 07:17 . dr-xr-xr-x 9 root root 0 Sep 18 06:55 .. -r--r--r-- 1 root root 0 Sep 18 07:18 anycast6 -r--r--r-- 1 root root 0 Sep 18 07:18 arp dr-xr-xr-x 2 root root 0 Sep 18 07:18 bonding -r--r--r-- 1 root root 0 Sep 18 07:18 dev ... -r--r--r-- 1 root root 0 Sep 18 07:18 sockstat -r--r--r-- 1 root root 0 Sep 18 07:18 sockstat6 -r--r--r-- 1 root root 0 Sep 18 07:18 softnet_stat dr-xr-xr-x 2 root root 0 Sep 18 07:18 stat -r--r--r-- 1 root root 0 Sep 18 07:18 tcp -r--r--r-- 1 root root 0 Sep 18 07:18 tcp6 -r--r--r-- 1 root root 0 Sep 18 07:18 udp -r--r--r-- 1 root root 0 Sep 18 07:18 udp6 -r--r--r-- 1 root root 0 Sep 18 07:18 udplite -r--r--r-- 1 root root 0 Sep 18 07:18 udplite6 -r--r--r-- 1 root root 0 Sep 18 07:18 unix -r--r--r-- 1 root root 0 Sep 18 07:18 xfrm_stat 这次有点不一样了,我们看到 metad0 的进程了,并且其 pid 还是 1。看到这个进程再想对其做点啥就好办了,比如能不能直接在 gdb 中 attach 它,由于手边没有带 nebula binary 的对应 image,就留给大家私下探索吧。 我们已经看到 pid 空间通过指定 --pid container:<container_name|id> 可以共享了,那么我们接下来看看网络的情况,毕竟有时候需要抓个包,执行如下的命令: bash-5.0# netstat -tulpn Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name 啥也没有,跟预想的有点不一样,我们有 metad0 这个进程不可能一个连接都没有。要想看到这个容器内的网络空间还要再加点参数,像如下方式再启动调试容器: $ docker run --rm -ti --pid container:nebula-docker-compose_metad0_1 --network container:nebula-docker-compose_metad0_1 --cap-add sys_admin nicolaka/netshoot bash bash-5.0# netstat -tulpn Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 172.28.1.1:11000 0.0.0.0:* LISTEN - tcp 0 0 172.28.1.1:11002 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:45500 0.0.0.0:* LISTEN - tcp 0 0 0.0.0.0:45501 0.0.0.0:* LISTEN - tcp 0 0 127.0.0.11:33249 0.0.0.0:* LISTEN - udp 0 0 127.0.0.11:51929 0.0.0.0:* - 这回就跟上面的输出不一样了,加了 --network container:nebula-docker-compose_metad0_1 运行参数后,metad0 容器内的连接情况也能看到了,那么想抓包调试就都可以了。 总结 通过运行另外一个容器,并让其跟想要调试的容器共享 pid/network namespace 是我们能像本地调试的关键。社区里甚至还有人基于上述想法开发了一些小工具进一步方便使用: Docker-debug 推荐阅读 使用 Docker 构建 Nebula Graph 源码
本文由美团 NLP 团队高辰、赵登昌撰写首发于 Nebula Graph 官方论坛:https://discuss.nebula-graph.com.cn/t/topic/1377 1. 前言 近年来,深度学习和知识图谱技术发展迅速,相比于深度学习的“黑盒子”,知识图谱具有很强的可解释性,在搜索推荐、智能助理、金融风控等场景中有着广泛的应用。美团基于积累的海量业务数据,结合使用场景进行充分地挖掘关联,逐步建立起包括美食图谱、旅游图谱、商品图谱在内的近十个领域知识图谱,并在多业务场景落地,助力本地生活服务的智能化。 为了高效存储并检索图谱数据,相比传统关系型数据库,选择图数据库作为存储引擎,在多跳查询上具有明显的性能优势。当前业界知名的图数据库产品有数十款,选型一款能够满足美团实际业务需求的图数据库产品,是建设图存储和图学习平台的基础。我们结合业务现状,制定了选型的基本条件: 开源项目,对商业应用友好 拥有对源代码的控制力,才能保证数据安全和服务可用性。 支持集群模式,具备存储和计算的横向扩展能力 美团图谱业务数据量可以达到千亿以上点边总数,吞吐量可达到数万 qps,单节点部署无法满足存储需求。 能够服务 OLTP 场景,具备毫秒级多跳查询能力 美团搜索场景下,为确保用户搜索体验,各链路的超时时间具有严格限制,不能接受秒级以上的查询响应时间。 具备批量导入数据能力 图谱数据一般存储在 Hive 等数据仓库中。必须有快速将数据导入到图存储的手段,服务的时效性才能得到保证。 我们试用了 DB-Engines 网站上排名前 30 的图数据库产品,发现多数知名的图数据库开源版本只支持单节点,不能横向扩展存储,无法满足大规模图谱数据的存储需求,例如:Neo4j、ArangoDB、Virtuoso、TigerGraph、RedisGraph。经过调研比较,最终纳入评测范围的产品为:NebulaGraph(原阿里巴巴团队创业开发)、Dgraph(原 Google 团队创业开发)、HugeGraph(百度团队开发)。 2. 测试概要 2.1 硬件配置 数据库实例:运行在不同物理机上的 Docker 容器。 单实例资源:32 核心,64GB 内存,1TB SSD 存储。【Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz】 实例数量:3 2.2 部署方案 Nebula v1.0.1 Metad 负责管理集群元数据,Graphd 负责执行查询,Storaged 负责数据分片存储。存储后端采用 RocksDB。 |实例 1 | 实例 2 | 实例 3 ||-|-|-||Metad | Metad | Metad||Graphd | Graphd | Graphd||Storaged[RocksDB] | Storaged[RocksDB] | Storaged[RocksDB]| Dgraph v20.07.0 Zero 负责管理集群元数据,Alpha 负责执行查询和存储。存储后端为 Dgraph 自有实现。 |实例 1 | 实例 2 | 实例 3 ||-|-|-||Zero | Zero | Zero||Alpha | Alpha | Alpha| HugeGraph v0.10.4 HugeServer 负责管理集群元数据和查询。HugeGraph 虽然支持 RocksDB 后端,但不支持 RocksDB 后端的集群部署,因此存储后端采用 HBase。 |实例1 | 实例2 | 实例3 ||-|-|-||HugeServer[HBase]|HugeServer[HBase]|HugeServer[HBase]||JournalNode | JournalNode | JournalNode||DataNode | DataNode | DataNode||NodeManager | NodeManager | NodeManager||RegionServer | RegionServer | RegionServer||ZooKeeper | ZooKeeper | ZooKeeper||NameNode | NameNode[Backup] | -|| -|ResourceManager | ResourceManager[Backup]||HBase Master | HBase Master[Backup] |-| 3. 评测数据集 社交图谱数据集:https://github.com/ldbc011 生成参数:branch=stable, version=0.3.3, scale=1000 实体情况:4 类实体,总数 26 亿 关系情况:19 类关系,总数 177 亿 数据格式:csv GZip 压缩后大小:194 G 4. 测试结果 4.1 批量数据导入 4.1.1 测试说明 批量导入的步骤为:Hive 仓库底层 csv 文件 -> 图数据库支持的中间文件 -> 图数据库。各图数据库具体导入方式如下: Nebula:执行 Spark 任务,从数仓生成 RocksDB 的底层存储 sst 文件,然后执行 sst Ingest 操作插入数据。 Dgraph:执行 Spark 任务,从数仓生成三元组 rdf 文件,然后执行 bulk load 操作直接生成各节点的持久化文件。 HugeGraph:支持直接从数仓的 csv 文件导入数据,因此不需要数仓-中间文件的步骤。通过 loader 批量插入数据。 4.1.2 测试结果 4.1.3 数据分析 Nebula:数据存储分布方式是主键哈希,各节点存储分布基本均衡。导入速度最快,存储放大比最优。 Dgraph:原始 194G 数据在内存 392G 的机器上执行导入命令,8.7h 后 OOM 退出,无法导入全量数据。数据存储分布方式是三元组谓词,同一种关系只能保存在一个数据节点上,导致存储和计算严重偏斜。 HugeGraph:原始 194G 的数据执行导入命令,写满了一个节点 1,000G 的磁盘,造成导入失败,无法导入全量数据。存储放大比最差,同时存在严重的数据偏斜。 4.2 实时数据写入 4.2.1 测试说明 向图数据库插入点和边,测试实时写入和并发能力。 响应时间:固定的 50,000 条数据,以固定 qps 发出写请求,全部发送完毕即结束。取客户端从发出请求到收到响应的 Avg、p99、p999 耗时。 最大吞吐量:固定的 1,000,000 条数据,以递增 qps 发出写请求,Query 循环使用。取 1 分钟内成功请求的峰值 qps 为最大吞吐量。 插入点 Nebula INSERT VERTEX t_rich_node (creation_date, first_name, last_name, gender, birthday, location_ip, browser_used) VALUES ${mid}:('2012-07-18T01:16:17.119+0000', 'Rodrigo', 'Silva', 'female', '1984-10-11', '84.194.222.86', 'Firefox') Dgraph { set { <${mid}> <creation_date> "2012-07-18T01:16:17.119+0000" . <${mid}> <first_name> "Rodrigo" . <${mid}> <last_name> "Silva" . <${mid}> <gender> "female" . <${mid}> <birthday> "1984-10-11" . <${mid}> <location_ip> "84.194.222.86" . <${mid}> <browser_used> "Firefox" . } } HugeGraph g.addVertex(T.label, "t_rich_node", T.id, ${mid}, "creation_date", "2012-07-18T01:16:17.119+0000", "first_name", "Rodrigo", "last_name", "Silva", "gender", "female", "birthday", "1984-10-11", "location_ip", "84.194.222.86", "browser_used", "Firefox") 插入边 Nebula INSERT EDGE t_edge () VALUES ${mid1}->${mid2}:(); Dgraph { set { <${mid1}> <link> <${mid2}> . } } HugeGraph g.V(${mid1}).as('src').V(${mid2}).addE('t_edge').from('src') 4.2.2 测试结果 实时写入 4.2.3 数据分析 Nebula:如 4.1.3 节分析所述,Nebula 的写入请求可以由多个存储节点分担,因此响应时间和吞吐量均大幅领先。 Dgraph:如 4.1.3 节分析所述,同一种关系只能保存在一个数据节点上,吞吐量较差。 HugeGraph:由于存储后端基于 HBase,实时并发读写能力低于 RocksDB(Nebula)和 BadgerDB(Dgraph),因此性能最差。 4.3 数据查询 4.3.1 测试说明 以常见的 N 跳查询返回 ID,N 跳查询返回属性,共同好友查询请求测试图数据库的读性能。 响应时间:固定的 50,000 条查询,以固定 qps 发出读请求,全部发送完毕即结束。取客户端从发出请求到收到响应的 Avg、p99、p999 耗时。 60s 内未返回结果为超时。 最大吞吐量:固定的 1,000,000 条查询,以递增 qps 发出读请求,Query 循环使用。取 1 分钟内成功请求的峰值 qps 为最大吞吐量。 缓存配置:参与测试的图数据库都具备读缓存机制,默认打开。每次测试前均重启服务清空缓存。 N 跳查询返回 ID Nebula GO ${n} STEPS FROM ${mid} OVER person_knows_person Dgraph { q(func:uid(${mid})) { uid person_knows_person { #${n}跳数 = 嵌套层数 uid } } } HugeGraph g.V(${mid}).out().id() #${n}跳数 = out()链长度 N 跳查询返回属性 Nebula GO ${n} STEPS FROM ${mid} OVER person_knows_person YIELDperson_knows_person.creation_date, $$.person.first_name, $$.person.last_name, $$.person.gender, $$.person.birthday, $$.person.location_ip, $$.person.browser_used Dgraph { q(func:uid(${mid})) { uid first_name last_name gender birthday location_ip browser_used person_knows_person { #${n}跳数 = 嵌套层数 uid first_name last_name gender birthday location_ip browser_used } } } HugeGraph g.V(${mid}).out() #${n}跳数 = out()链长度 共同好友查询语句 Nebula GO FROM ${mid1} OVER person_knows_person INTERSECT GO FROM ${mid2} OVER person_knows_person Dgraph { var(func: uid(${mid1})) { person_knows_person { M1 as uid } } var(func: uid(${mid2})) { person_knows_person { M2 as uid } } in_common(func: uid(M1)) @filter(uid(M2)){ uid } } HugeGraph g.V(${mid1}).out().id().aggregate('x').V(${mid2}).out().id().where(within('x')).dedup() 4.3.2 测试结果 N 跳查询返回 ID N 跳查询返回属性 单个返回节点的属性平均大小为 200 Bytes。 共同好友本项未测试最大吞吐量。 4.3.3 数据分析 在 1 跳查询返回 ID「响应时间」实验中,Nebula 和 DGraph 都只需要进行一次出边搜索。由于 DGraph 的存储特性,相同关系存储在单个节点,1 跳查询不需要网络通信。而 Nebula 的实体分布在多个节点中,因此在实验中 DGraph 响应时间表现略优于 Nebula。 在 1 跳查询返回 ID「最大吞吐量」实验中,DGraph 集群节点的 CPU 负载主要落在存储关系的单节点上,造成集群 CPU 利用率低下,因此最大吞吐量仅有 Nebula 的 11%。 在 2 跳查询返回 ID「响应时间」实验中,由于上述原因,DGraph 在 qps=100 时已经接近了集群负载能力上限,因此响应时间大幅变慢,是 Nebula 的 3.9 倍。 在 1 跳查询返回属性实验中,Nebula 由于将实体的所有属性作为一个数据结构存储在单节点上,因此只需要进行【出边总数 Y】次搜索。而 DGraph 将实体的所有属性也视为出边,并且分布在不同节点上,需要进行【属性数量 X * 出边总数 Y】次出边搜索,因此查询性能比 Nebula 差。多跳查询同理。 在共同好友实验中,由于此实验基本等价于 2 次 1 跳查询返回 ID,因此测试结果接近,不再详述。 由于 HugeGraph 存储后端基于 HBase,实时并发读写能力低于 RocksDB(Nebula)和 BadgerDB(Dgraph),因此在多项实验中性能表现均落后于 Nebula 和 DGraph。 5. 结论 参与测试的图数据库中,Nebula 的批量导入可用性、导入速度、实时数据写入性能、数据多跳查询性能均优于竞品,因此我们最终选择了 Nebula 作为图存储引擎。 6. 参考资料 NebulaGraph Benchmark:https://discuss.nebula-graph.com.cn/t/topic/782 NebulaGraph Benchmark 微信团队:https://discuss.nebula-graph.com.cn/t/topic/1013 DGraph Benchmark:https://dgraph.io/blog/tags/benchmark/ HugeGraph Benchmark:https://hugegraph.github.io/hugegraph-doc/performance/hugegraph-benchmark-0.5.6.html TigerGraph Benchmark:https://www.tigergraph.com/benchmark/ RedisGraph Benchmark:https://redislabs.com/blog/new-redisgraph-1-0-achieves-600x-faster-performance-graph-databases/ 本次性能测试系美团 NLP 团队高辰、赵登昌撰写,如果你对本文有任意疑问,欢迎来原贴和作者交流:https://discuss.nebula-graph.com.cn/t/topic/1377
2021年01月
2020年12月
2020年11月
2020年10月
2020年09月
2020年08月