/** Spark SQL源码分析系列文章*/
接上一篇文章Spark
SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节:
我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD。
-
lazy val toRdd: RDD[Row] = executedPlan.execute()
Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这种稍复杂的。
如图:

一、BasicOperator
1.1、Project
Project 的大致含义是:传入一系列表达式Seq[NamedExpression],给定输入的Row,经过Convert(Expression的计算eval)操作,生成一个新的Row。
Project的实现是调用其child.execute()方法,然后调用mapPartitions对每一个Partition进行操作。
这个f函数其实是new了一个MutableProjection,然后循环的对每个partition进行Convert。
-
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
-
override def output = projectList.map(_.toAttribute)
-
override def execute() = child.execute().mapPartitions { iter =>
-
@transient val reusableProjection = new MutableProjection(projectList)
-
iter.map(reusableProjection)
-
}
-
}
通过观察MutableProjection的定义,可以发现,就是bind references to a schema 和 eval的过程:
将一个Row转换为另一个已经定义好schema column的Row。
如果输入的Row已经有Schema了,则传入的Seq[Expression]也会bound到当前的Schema。
-
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
-
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
-
this(expressions.map(BindReferences.bindReference(_, inputSchema)))
-
-
private[this] val exprArray = expressions.toArray
-
private[this] val mutableRow = new GenericMutableRow(exprArray.size)
-
def currentValue: Row = mutableRow
-
def apply(input: Row): Row = {
-
var i = 0
-
while (i < exprArray.length) {
-
mutableRow(i) = exprArray(i).eval(input)
-
i += 1
-
}
-
mutableRow
-
}
-
}
1.2、Filter
Filter的具体实现是传入的condition进行对input row的eval计算,最后返回的是一个Boolean类型,
如果表达式计算成功,返回true,则这个分区的这条数据就会保存下来,否则会过滤掉。
-
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
-
override def output = child.output
-
-
override def execute() = child.execute().mapPartitions { iter =>
-
iter.filter(condition.eval(_).asInstanceOf[Boolean])
-
}
-
}
1.3、Sample
Sample取样操作其实是调用了child.execute()的结果后,返回的是一个RDD,对这个RDD调用其sample函数,原生方法。
-
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
-
extends UnaryNode
-
{
-
override def output = child.output
-
-
-
override def execute() = child.execute().sample(withReplacement, fraction, seed)
-
}
1.4、Union
Union操作支持多个子查询的Union,所以传入的child是一个Seq[SparkPlan]
execute()方法的实现是对其所有的children,每一个进行execute(),即select查询的结果集合RDD。
通过调用SparkContext的union方法,将所有子查询的结果合并起来。
-
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
-
-
override def output = children.head.output
-
override def execute() = sqlContext.sparkContext.union(children.map(_.execute()))
-
-
override def otherCopyArgs = sqlContext :: Nil
-
}
1.5、Limit
Limit操作在RDD的原生API里也有,即take().
但是Limit的实现分2种情况:
第一种是 limit作为结尾的操作符,即select xxx from yyy limit zzz。 并且是被executeCollect调用,则直接在driver里使用take方法。
第二种是 limit不是作为结尾的操作符,即limit后面还有查询,那么就在每个分区调用limit,最后repartition到一个分区来计算global limit.
-
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
-
extends UnaryNode {
-
-
-
-
override def otherCopyArgs = sqlContext :: Nil
-
-
override def output = child.output
-
-
override def executeCollect() = child.execute().map(_.copy()).take(limit)
-
-
override def execute() = {
-
val rdd = child.execute().mapPartitions { iter =>
-
val mutablePair = new MutablePair[Boolean, Row]()
-
iter.take(limit).map(row => mutablePair.update(false, row))
-
}
-
val part = new HashPartitioner(1)
-
val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part)
-
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
-
shuffled.mapPartitions(_.take(limit).map(_._2))
-
}
-
}
1.6、TakeOrdered
TakeOrdered是经过排序后的limit N,一般是用在sort by 操作符后的limit。
可以简单理解为TopN操作符。
-
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
-
(@transient sqlContext: SQLContext) extends UnaryNode {
-
override def otherCopyArgs = sqlContext :: Nil
-
-
override def output = child.output
-
-
@transient
-
lazy val ordering = new RowOrdering(sortOrder)
-
-
override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)
-
-
-
-
override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
-
}
1.7、Sort
Sort也是通过RowOrdering这个类来实现排序的,child.execute()对每个分区进行map,每个分区根据RowOrdering的order来进行排序,生成一个新的有序集合。
也是通过调用Spark RDD的sorted方法来实现的。
-
case class Sort(
-
sortOrder: Seq[SortOrder],
-
global: Boolean,
-
child: SparkPlan)
-
extends UnaryNode {
-
override def requiredChildDistribution =
-
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-
@transient
-
lazy val ordering = new RowOrdering(sortOrder)
-
-
override def execute() = attachTree(this, "sort") {
-
-
child.execute()
-
.mapPartitions(
-
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator,
-
preservesPartitioning = true)
-
}
-
-
override def output = child.output
-
}
1.8、ExistingRdd
ExistingRdd是
-
object ExistingRdd {
-
def convertToCatalyst(a: Any): Any = a match {
-
case o: Option[_] => o.orNull
-
case s: Seq[Any] => s.map(convertToCatalyst)
-
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
-
case other => other
-
}
-
-
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
-
data.mapPartitions { iterator =>
-
if (iterator.isEmpty) {
-
Iterator.empty
-
} else {
-
val bufferedIterator = iterator.buffered
-
val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
-
-
bufferedIterator.map { r =>
-
var i = 0
-
while (i < mutableRow.length) {
-
mutableRow(i) = convertToCatalyst(r.productElement(i))
-
i += 1
-
}
-
-
mutableRow
-
}
-
}
-
}
-
}
-
-
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
-
ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
-
}
-
}
二、 Join Related Operators
HashJoin:
在讲解Join Related Operator之前,有必要了解一下HashJoin这个位于execution包下的joins.scala文件里的trait。
Join操作主要包含BroadcastHashJoin、LeftSemiJoinHash、ShuffledHashJoin均实现了HashJoin这个trait.
主要类图如下:
HashJoin这个trait的主要成员有:
buildSide是左连接还是右连接,有一种基准的意思。
leftKeys是左孩子的expressions, rightKeys是右孩子的expressions。
left是左孩子物理计划,right是右孩子物理计划。
buildSideKeyGenerator是一个Projection是根据传入的Row对象来计算buildSide的Expression的。
streamSideKeyGenerator是一个MutableProjection是根据传入的Row对象来计算streamSide的Expression的。
这里buildSide如果是left的话,可以理解为buildSide是左表,那么去连接这个左表的右表就是streamSide。
HashJoin关键的操作是joinIterators,简单来说就是join两个表,把每个表看着Iterators[Row].
方式:
1、首先遍历buildSide,计算buildKeys然后利用一个HashMap,形成 (buildKeys, Iterators[Row])的格式。
2、遍历StreamedSide,计算streamedKey,去HashMap里面去匹配key,来进行join
3、最后生成一个joinRow,这个将2个row对接。
见代码注释:
-
trait HashJoin {
-
val leftKeys: Seq[Expression]
-
val rightKeys: Seq[Expression]
-
val buildSide: BuildSide
-
val left: SparkPlan
-
val right: SparkPlan
-
-
lazy val (buildPlan, streamedPlan) = buildSide match {
-
case BuildLeft => (left, right)
-
case BuildRight => (right, left)
-
}
-
-
lazy val (buildKeys, streamedKeys) = buildSide match {
-
-
case BuildLeft => (leftKeys, rightKeys)
-
case BuildRight => (rightKeys, leftKeys)
-
}
-
-
def output = left.output ++ right.output
-
-
@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
-
@transient lazy val streamSideKeyGenerator =
-
() => new MutableProjection(streamedKeys, streamedPlan.output)
-
-
def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = {
-
-
-
val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
-
var currentRow: Row = null
-
-
-
while (buildIter.hasNext) {
-
currentRow = buildIter.next()
-
val rowKey = buildSideKeyGenerator(currentRow)
-
if(!rowKey.anyNull) {
-
val existingMatchList = hashTable.get(rowKey)
-
val matchList = if (existingMatchList == null) {
-
val newMatchList = new ArrayBuffer[Row]()
-
hashTable.put(rowKey, newMatchList)
-
newMatchList
-
} else {
-
existingMatchList
-
}
-
matchList += currentRow.copy()
-
}
-
}
-
-
new Iterator[Row] {
-
private[this] var currentStreamedRow: Row = _
-
private[this] var currentHashMatches: ArrayBuffer[Row] = _
-
private[this] var currentMatchPosition: Int = -1
-
-
-
private[this] val joinRow = new JoinedRow
-
-
private[this] val joinKeys = streamSideKeyGenerator()
-
-
override final def hasNext: Boolean =
-
(currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
-
(streamIter.hasNext && fetchNext())
-
-
override final def next() = {
-
val ret = buildSide match {
-
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
-
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
-
}
-
currentMatchPosition += 1
-
ret
-
}
-
-
-
-
-
-
-
-
private final def fetchNext(): Boolean = {
-
currentHashMatches = null
-
currentMatchPosition = -1
-
-
while (currentHashMatches == null && streamIter.hasNext) {
-
currentStreamedRow = streamIter.next()
-
if (!joinKeys(currentStreamedRow).anyNull) {
-
currentHashMatches = hashTable.get(joinKeys.currentValue)
-
}
-
}
-
-
if (currentHashMatches == null) {
-
false
-
} else {
-
currentMatchPosition = 0
-
true
-
}
-
}
-
}
-
}
-
}
joinRow的实现,实现2个Row对接:
实际上就是生成一个新的Array,将2个Array合并。
-
class JoinedRow extends Row {
-
private[this] var row1: Row = _
-
private[this] var row2: Row = _
-
.........
-
def copy() = {
-
val totalSize = row1.size + row2.size
-
val copiedValues = new Array[Any](totalSize)
-
var i = 0
-
while(i < totalSize) {
-
copiedValues(i) = apply(i)
-
i += 1
-
}
-
new GenericRow(copiedValues)
-
}
2.1、LeftSemiJoinHash
left semi join,不多说了,hive早期版本里替代IN和EXISTS 的版本。
将右表的join keys放到HashSet里,然后遍历左表,查找左表的join key是否能匹配。
-
case class LeftSemiJoinHash(
-
leftKeys: Seq[Expression],
-
rightKeys: Seq[Expression],
-
left: SparkPlan,
-
right: SparkPlan) extends BinaryNode with HashJoin {
-
-
val buildSide = BuildRight
-
-
override def requiredChildDistribution =
-
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
-
override def output = left.output
-
-
def execute() = {
-
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
-
val hashSet = new java.util.HashSet[Row]()
-
var currentRow: Row = null
-
-
-
while (buildIter.hasNext) {
-
currentRow = buildIter.next()
-
val rowKey = buildSideKeyGenerator(currentRow)
-
if(!rowKey.anyNull) {
-
val keyExists = hashSet.contains(rowKey)
-
if (!keyExists) {
-
hashSet.add(rowKey)
-
}
-
}
-
}
-
-
val joinKeys = streamSideKeyGenerator()
-
streamIter.filter(current => {
-
!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
-
})
-
}
-
}
-
}
2.2、BroadcastHashJoin
名约: 广播HashJoin,呵呵。
是InnerHashJoin的实现。这里用到了concurrent并发里的future,异步的广播buildPlan的表执行后的的RDD。
如果接收到了广播后的表,那么就用streamedPlan来匹配这个广播的表。
实现是RDD的mapPartitions和HashJoin里的joinIterators最后生成join的结果。
-
case class BroadcastHashJoin(
-
leftKeys: Seq[Expression],
-
rightKeys: Seq[Expression],
-
buildSide: BuildSide,
-
left: SparkPlan,
-
right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {
-
-
override def otherCopyArgs = sqlContext :: Nil
-
-
override def outputPartitioning: Partitioning = left.outputPartitioning
-
-
override def requiredChildDistribution =
-
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
-
-
@transient
-
lazy val broadcastFuture = future {
-
sqlContext.sparkContext.broadcast(buildPlan.executeCollect())
-
}
-
-
def execute() = {
-
val broadcastRelation = Await.result(broadcastFuture, 5.minute)
-
-
streamedPlan.execute().mapPartitions { streamedIter =>
-
joinIterators(broadcastRelation.value.iterator, streamedIter)
-
}
-
}
-
}
2.3、ShuffleHashJoin
ShuffleHashJoin顾名思义就是需要shuffle数据,outputPartitioning是左孩子的的Partitioning。
会根据这个Partitioning进行shuffle。然后利用SparkContext里的zipPartitions方法对每个分区进行zip。
这里的requiredChildDistribution,的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。
关于这里面的分区这里不赘述,可以去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。
-
case class ShuffledHashJoin(
-
leftKeys: Seq[Expression],
-
rightKeys: Seq[Expression],
-
buildSide: BuildSide,
-
left: SparkPlan,
-
right: SparkPlan) extends BinaryNode with HashJoin {
-
-
override def outputPartitioning: Partitioning = left.outputPartitioning
-
-
override def requiredChildDistribution =
-
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
-
def execute() = {
-
buildPlan.execute().zipPartitions(streamedPlan.execute()) {
-
(buildIter, streamIter) => joinIterators(buildIter, streamIter)
-
}
-
}
-
}
未完待续 :)