颠覆大数据分析之Spark为Shark所提供的扩展
译者:黄经业 购书
在Spark的RDD上执行SQL查询遵循的是传统并行数据库的三步流程:
- 查询解析
- 逻辑计划的生成
- 将逻辑计划映射为物理的执行计划
Shark使用Hive查询编译器来进行查询语句的解析。它会生成一棵抽象语法树,然后再将它转化成一个逻辑计划。Shark中逻辑计划的生成方式也类似于Hive中的。但两者的物理计划的生成方式则不尽相同。Hive中的物理计划是一系列的MR作业,而Shark中的则是分阶段RDD转换的一个有向无环图。由于Shark的高工作负荷的这个性质(通常在Hive中机器学习及用户定义函数(UDF)都很常见),因此在编译期很难获取到物理查询计划。对于新数据而言的确是这样的(之前还未被加载到Shark中)。值得注意的是, Hive和Shark都经常用来查询这类数据。因此,Shark引入了一个叫部分有向无环图执行(Partial DAG Execution,PDE)的概念。
部分DAG执行
这项技术会基于运行时收集的数据来生成查询语句的执行计划,而不是在编译期就去生成查询的物理执行计划。收集的数据包括分区大小,倾斜检测的记录条数,哪些记录是频繁出现的,以及RDD分区中数据分布的粗略的直方图。Spark在洗牌阶段之前会将map输出存储到内存中——之后reduce任务会通过MOT组件来使用这些数据。Shark的第一个改动是收集了指定分区以及全局环境的数据。另一个修改则是使得DAG可以在运行时根据所收集的数据来进行改变。必须注意的是,Shark是基于单个节点上的查询优化方法来构建的,它使用了PDE的概念来结合本地优化器进行查询的全局优化。
数据收集以及后续的DAG的修改对Shark实现分布式的连接操作至关重要。它提供了两种类型的连接操作:洗牌连接(shuffle join)以及映射/广播连接。广播连接是通过将小表发送到所有的节点来实现的,在这里它会和大表中不相交的分区进行本地合并。而在洗牌连接中,两张表都会根据连接的主键来进行哈希分区。广播连接只有在表比较小的时候才会比较高效——从这读者可以看到,为什么在Shark的动态查询优化中这些数据统计是如此重要了。Shark中数据统计用于优化查询的另一种方式就是通过检查分区的大小来合并较小分区以决定归约器的数量或者说并行度。
列内存存储
Spark中默认会将RDD存储为JVM内存中的反序列化的Java对象。这样的好处就是对JVM而言它们天生就是可用的,这加快了访问的速度。这样做的缺点就是无法在JVM内存中创建大量对象。读者应当时刻牢记,随着Java堆中对象数量的上升,垃圾回收器(GC)收集的时候就会越长。因此Shark(Muthukumar和Janakiram 2006)实现了列存储,所有基础类型的列都会创建一个对象来存储,而对于复杂类型,它会使用字节数组。这极大地减少了内存中的对象数量也提高了GC及Shark的性能。同时和Spark原生的实现相比,它还提高了空间的使用率。[1]
分布式数据加载
Shark使用Spark执行器来加载数据,但会对它进行定制化。每一张表都会进行分区,每一个分区都会由一个单独的Spark任务来进行加载。这个任务会独立决定是否进行压缩(这列是否需要压缩,如果需要的话用何种技术进行压缩——它是字典编码还是游程长度编码[Abadi 等2006])。每个分区还会单独保存压缩后的结果元数据。然而必须注意的是,世系图并不需要存储压缩元数据,这个会在重构RDD的时候进行计算。结果表明,和Hadoop相比,Shark加载数据到内存中的速度要更快,同时将数据加载到HDFS中的吞吐量也和Hadoop的一样。
完全分区智能连接(Full Partition-Wise Join)
正如在传统数据库中所了解到的那样,完全分区智能连接可以通过根据连接列来将两张表分区的方式来实现。尽管Hadoop并不支持这样的协同分区,Shark通过在数据定义中使用”distribute by”子句实现了这点。当连接两张一致分区的表时,Shark会创建Spark的map任务以避免使用昂贵的洗牌操作,从而获得了更高的运行效率。
分区修剪
正如在传统数据库中那样,分区修剪指的是优化器在构建分区访问列表时通过分析SQL中的where和from子句,删除掉不必要的分区。Shark还通过存储在分区元数据中的范围值(range value)和非重复值(distinct value, 对应枚举类型)增强了数据加载过程中的数据统计,以便在运行时指导分区修剪决策——这个过程又被Shark团队称之为映射修剪。
机器学习的支持
Shark的一个关键的独特卖点(Unique Selling Points ,USP)在于它能够支持机器学习算法。之所以能够实现这个是因为Shark允许在返回查询结果的同时还顺便返回代表执行计划的RDD对象。这说明用户可以初始化这个RDD上的操作——这点非常关键,因为它使得Spark RDD的能力可以为Shark查询所用。需要注意的是机器学习算法能够在Spark RDD上实现,Kraska等人所开发的MLbase库或本书的后续章节都会介绍到这点。
[1] 分代GC常用于现代的JVM。一类回收被称为minor collection,它是用来将分区中存活的对象拷贝到存活区(suvivor space)以及持久代中。剩下的对象会被回收掉。另一种是stop-the-world的major collection,它会对老生代进行压缩。