Spark Core读取ES的分区问题分析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢?可想的具体关系可能是以下两种:1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。2).ES支持游标查询,那么是不是也可以对比较大ES 索引的分片进行拆分成多个RDD分区呢?那么下面浪尖带着大家翻一下源码看看具体情况。

写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?

稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢?

可想的具体关系可能是以下两种:

1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。

2).ES支持游标查询,那么是不是也可以对比较大ES 索引的分片进行拆分成多个RDD分区呢?

那么下面浪尖带着大家翻一下源码看看具体情况。

1.Spark Core读取ES

ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version = 2.7.1
hadoop22Version = 2.2.0
spark13Version = 1.6.2
spark20Version = 2.3.0
浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

a,导入整个elasticsearch-hadoop包

  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>7.1.1</version>
</dependency>

b,只导入spark模块的包

  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.11</artifactId>
  <version>7.1.1</version>
</dependency>

浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkrdd {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
conf.set(ConfigurationOptions.ES_PORT, "9200")
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")

// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)

conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

val sc = new SparkContext(conf)
import org.elasticsearch.spark._

sc.esRDD("posts").foreach(each=>{
  each._2.keys.foreach(println)
})
sc.esJsonRDD("posts").foreach(each=>{
  println(each._2)
})

sc.stop()

}
}
可以看到Spark Core读取RDD主要有两种形式的API:

a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]
b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

RDD[(String, String)]
虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。

要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。

2.源码分析

首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。

废话少说直接找到ScalaEsRDD,发现gePartitions是在游戏转让父类实现的,方法内容如下:

override def getPartitions: Array[Partition] = {

esPartitions.zipWithIndex.map { case(esPartition, idx) =>
  new EsPartition(id, idx, esPartition)
}.toArray

}
esPartitions是一个lazy型的变量:

@transient private[spark] lazy val esPartitions = {

RestService.findPartitions(esCfg, logger)

}
这种声明原因是什么呢?

lazy+transient的原因大家可以考虑一下。

RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。

final List partitions;
// 5.x及以后版本 同时没有配置es.input.max.docs.per.partition
if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {

 partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);

} else {

 partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);

}
a).findSlicePartitions

这个方法其实就是在5.x及以后的ES版本,同时配置了

es.input.max.docs.per.partition
以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

long numDocs;
if (readResource.isTyped()) {

numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);

} else {

numDocs = client.countIndexShard(index, Integer.toString(shardId), query);

}
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {

PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));

}
实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。

这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。

b).findShardPartitions方法

这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
partitions.add(partition);
3.总结

以上就是Spark Core读取ES数据的时候分片和RDD分区的对应关系分析,默认情况下是一个es 索引分片对应Spark RDD的一个分区。假如分片数过大,且ES版本在5.x及以上,可以配置参数

es.input.max.docs.per.partition
进行拆分。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
189 1
Spark快速大数据分析PDF下载读书分享推荐
|
7月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
146 0
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
177 2
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
109 0
|
2月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
61 0
|
2月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
63 0
|
4月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23729 42
|
4月前
|
分布式计算 并行计算 数据处理
|
7月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56611 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用