前言
在上篇文章中,我们分析了Spark Operator内部的机制,今天我们会讨论一个在大数据领域中最重要的话题 - 存储。大数据已经无声无息的融入了每个人的生活中。大到旅游买房,小到外卖打车,都可以看到通过大数据提供数据分析、数据推荐、数据决策的使用场景。大数据要想能够更准确地协助决策,需要在数据多维度、数据完备性等方面有较高要求。可预知的在未来,数据的量级会越来越大,特别是随着5G时代的到来,数据的吞吐量级成指数的增长,数据的维度与来源会越来越多,数据的种类也会变得越来越异质化,对大数据平台也带来新的挑战。成本低、存得多、读写快成为大数据存储的三大问题,而今天我们就会针对这三大问题进行探讨。
容器化大数据的计算与存储
计算和存储分离是大数据领域被大家讨论过很多次的问题了,通常我们会通过如下几个角度来看这个问题:
- 硬件限制:机器的带宽成倍数的增长,但是磁盘的速度基本不变,从而造成数据本地读写优势减弱。
- 计算成本:计算和存储的量级不匹配,可能造成算力的大量浪费,独立计算资源可以节约成本。
- 存储成本:集中式存储可以在保证更高SLA的前提下降低存储成本,使得自建数据仓库的优势减少。
这三大问题,随着容器时代的到来也变得愈发的凸显。我们知道在kubernetes中,Pod是运行在底层的资源池上,而Pod所需要的存储是通过PV或者PVC的方式动态分配与挂载的,从某种意义来讲,容器本身的架构就是计算与存储分离的。那么使用了存储与计算分离方式的大数据容器集群会带来哪些变化与优势呢?
- 成本更低
通常在阿里云上建立一个Spark大数据平台的时候,首先会选择D系列的机器,在上面搭建HDFS、Hadoop等一系列的基础组件,然后再将Spark等作业任务通过Yarn进行调度,跑在这个集群之上。D系列的内网带宽范围是3Gbps-20Gbps,默认可以绑定(4-28块) 5.5T的本地盘。因为在云上,云盘的IO和网络的IO是共享的,而本地盘的IO是独立的,因此D系列+本地盘的IO性能会比同规格传统机型+云盘的性能更好。
但是在实际生产中,我们会发现存储的数据对着时间变得越来越多,而由于数据具有一定的时效性,造成单位时间的算力与存储的增长是不相匹配的,这个时候就会带来了成本的浪费。那么如果我们使用计算和存储分离的思想,使用外部存储,例如OSS、Nas或者DFS(阿里云HDFS产品),会有哪些变化呢?
首先,我们屏蔽由于存储的IO差异造成的影响,先都使用远程的DFS作为文件存储。然后我们选择了ecs.ebmhfg5.2xlarge(8C32G6Gbps)和ecs.d1ne.2xlarge (8C32G6Gbps) 两款分别面向计算和大数据场景规格配置相同的热门机型,进行了比对。
ecs.ebmhfg5.2xlarge(8C32G)的测试结果
ecs.d1ne.2xlarge (8C32G)的测试结果
通过Hibench我们可粗略的估算,在假定IO性能基本一致的场景下,ecs.ebmhfg5.2xlarge会比ecs.d1ne.2xlarge计算性能高30%左右,而成本上ecs.ebmhfg5.2xlarge会比ecs.d1ne.2xlarge低25%左右。
也就是说如果单单只看计算的能力,是可以有更高效、更节省的机型选择的。当存储和计算分离后,我们可以从存储和计算两个维度分开去估算所需的用量,在机型上可以更多的考虑高主频计算能力较强ECS,而存储上可以使用OSS或者DFS,存储成本也相较本地存储更为低廉。此外通常D系列的机型都是1:4的CPU内存比,随着大数据作业的场景越来越丰富,1:4的CPU内存比也不完全是最佳的配比,当存储与计算分离后,我们可以根据业务的类型选择合适的计算资源,甚至可以在一个计算资源池中维护多种计算资源,从而提高资源使用率。
数据存储的SLA和计算任务的SLA也是完全不同的,存储上是无法忍受宕机或者中断的,但是对于计算任务而言,本身已经被切割为子任务了,单个子任务的异常只需重试即可,那么进一步就可以使用类似竞价实例这种成本更低的资源来作为计算任务运行时环境,实现成本的进一步优化。
此外容器最大的特点就是弹性,通过弹性的能力,容器可以在短时间内获得超远原本自身几十甚至上百倍的计算资源,而计算任务完成后又自动释放掉。目前阿里云容器服务提供autoscaler进行节点级别的弹性伸缩,可以做到在1分半内伸缩500台节点。传统的计算与存储耦合的场景下,存储是阻碍弹性的一大障碍,而存储和计算分离后,就可以针对近乎无状态的计算来实现弹性的能力,实现真正的按需使用、按量消费。
- 存的更多
使用外部存储后,我们不止存储量级上可以做到近乎无限,而且可以有更多的选择。在本文开头位置,我们已经提到了大数据时代的到来,将引入更多维度、更异质化的数据。而这也对数据存储的方式与类型也带来了更多的挑战。单纯的HDFS、Hbase、Kafka等数据存储与链路将无法满足我们的需求。例如从IoT设备采集的数据更倾向于使用时序存储进行离线、从应用上下游的产生的数据更倾向于存放在结构化数据库中,数据的来源与链路会越来越多,大数据平台的底层基础设施与依赖就会越变越多。在云上,阿里云提供了多种类型的存储服务,可以满足各种大数据处理的场景。除了传统的HDFS、Hbase、kafka、OSS、Nas、CPFS等存储,还包包含MNS、TSDB、OAS(冷数据归档)等等。使用存储服务可以让大数据平台更关注在业务的开发,而不是底层基础架构的运维。不但能够存的更多,还可以存的更好、存的更省。
- 读写更快
从某种角度来讲,读写更快是不可能的,因为独立本地盘可以通过挂足够盘并行的方式进行提升的,但是要注意的问题在于,当我们通过MR进行任务切割后,每个子任务的瓶颈是否还是在磁盘IO上,大部分情况下答案是否定。上面我们测试的ECS规格内网的带宽已经可以到达6Gbps,如果全部网络带宽都换算成磁盘的IO的话,这个量级的数据吞吐IO相比8C32G的算力而言是冗余的,所以此处我们提到的读写更快是指在IO冗余的前提下提升读写速度的方式。OSS是阿里云上提供的对象存储,读取不同单个文件的IO是并行的,也就是说如果你的业务场景是大量中小型文件的并行读取,例如在Spark中读写目录的方式,那么此时IO的读写速度近似是线性增强的。如果依然希望使用HDFS的开发者,阿里云也提HDFS存储服务,提供了大量存储与查询的优化,和传统的自建的HDFS相比有50%左右的提升。
阿里云容器服务的存储方案
阿里云容器服务在多个维度多个层次满足大数据处理中的需求。开发者可以根据不同的业务场景和IO的新更能指标要求,选择合适自己的存储方式。
大量小文件存储
OSS是面向这个场景的最佳使用方式,在容器中可以使用两种方式操作OSS,一种是将OSS挂载为一个文件系统,一种是直接在Spark中使用SDK来操作。第一种方案在大数据的场景下是非常不适用的,特别是文件比较多的场景,如果没有类似SmartFS的优化手段,会带来很大的时延与不一致性。而使用SDK的方式则非常直接简单,只需将相应的Jar放在CLASSPATH下即可,可以参考如下代码,直接处理OSS中的文件内容。
package com.aliyun.emr.example
object OSSSample extends RunLocally {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
"""Usage: bin/spark-submit --class OSSSample examples-1.0-SNAPSHOT-shaded.jar <inputPath> <numPartition>
|
|Arguments:
|
| inputPath Input OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/a/b.txt
| numPartitions the number of RDD partitions.
|
""".stripMargin)
System.exit(1)
}
val inputPath = args(0)
val numPartitions = args(1).toInt
val ossData = sc.textFile(inputPath, numPartitions)
println("The top 10 lines are:")
ossData.top(10).foreach(println)
}
override def getAppName: String = "OSS Sample"
}
另外针对Spark SQL的场景,阿里云也提供了https://yq.aliyun.com/articles/593910">oss-select的方式进行支持,可以通过SparkSQL的方式对单文件检索和查询。代码仓库地址。特别注意:当使用Spark Operator的方式进行任务执行是,需要在Driver Pod与Exector Pod的CLASSPATH下预置好相应的Jar包。
OSS的方式主要面向单个文件在百兆之下,文件数目比较多的场景优化较好,数据存储是几种常见存储中最便宜的,支持冷热数据的分离,主要面向读多写少或者不写的场景。
HDFS文件存储
阿里云上新推出了DFS服务,可以像在Hadoop分布式文件系统 (Hadoop Distributed File System) 中管理和访问数据。无需对现有大数据分析应用做任何修改,即可使用具备无限容量及性能扩展、单一命名空间、多共享、高可靠和高可用等特性的分布式文件系统。
DFS服务兼容HDFS协议,开发者只需将相应的调用Jar包放置在Driver Pod与Exector Pod的CLASSPATH中即可,调用时可以如下的方式进行调用。
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "dfs://f-5d68cc61ya36.cn-beijing.dfs.aliyuncs.com:10290/logdata/ab.log"
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
DFS服务的方式主要是面向高IO读写的热数据场景,价格会高于OSS存储,但低于Nas以及其他结构化存储。对于已经习惯了HDFS的开发者而言,是最佳的的方案。在所有的存储方案中,目前IO性能最佳,同容量场景,IO优于本地盘。
常规文件存储
OSS的方式对于某些场景而言,数据的上传与传输依赖SDK,操作会略显不便。那么Nas也是一种备选的方案,Nas的本身的协议是强一致性的,开发者可以像操作本地文件的方式,读写数据。使用方式如下:
- 首先在容器服务控制台创建Nas相关的存储PV与PVC。
- 然后在Spark Operator的定义中声明所使用的PodVolumeClain。
apiVersion: "sparkoperator.k8s.io/v1alpha1"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
restartPolicy:
type: Never
volumes:
- name: pvc-nas
persistentVolumeClaim:
claimName: pvc-nas
driver:
cores: 0.1
coreLimit: "200m"
memory: "512m"
labels:
version: 2.4.0
serviceAccount: spark
volumeMounts:
- name: "pvc-nas"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.0
volumeMounts:
- name: "pvc-nas"
mountPath: "/tmp"
当然对于Kubernetes比较熟悉的开发者,同样可以使用动态存储的方式直接挂载。具体文档地址如下。
Nas存储的方式在Spark的场景下用途比较少,主要是因为在IO方面与HDFS有一定差距,在存储价格方面比OSS也贵了不少。不过对于需要复用一些data workflow的产生结果,且IO要求要求不是特别高的场景,Nas的使用还是非常简单的。
其他存储结构
在Spark Streaming的场景中,我们还经常使用例如mns或者kafka,有的时候也会使用Elasticsearch与Hbase等等。这些在阿里云上面也都有对应的服务支持,开发者可以通过这些云服务的集成与使用,将精力更多的放在数据开发上。
最后
本文主要和大家探讨了当容器遇到大数据后,改如何通过存储与计算分离的方式,降低资源的使用成本,通过不同的场景,选择合适的存储方式,实现云原生的大数据容器化计算。在下一篇文章中,我们会为大家介绍如何通过弹性的方式,在存储和计算分离的场景下,实现计算资源池的成本节约。