前言
Spark自从2014年1.2版本发布以来,已成为大数据计算的通用组件。网上介绍Spark的资源也非常多,但是不利于用户快速入门,所以本文主要通从用户的角度来介绍Spark,让用户能快速的认识Spark,知道Spark是什么、能做什么、怎么去做。
具体的概念可以参考spark社区的相关文章
Spark是什么
摘用官网的定义:
Spark是一个快速的、通用的分布式计算系统。
提供了高级API,如:Java、Scala、Python和R。
同时也支持高级工具,如:Spark SQL处理结构化数据、MLib处理机器学习、GraphX用于图计算、Spark Streming用于流数据处理。
也就是说Spark提供了灵活的、丰富接口的大数据处理能力。下图是Spark的模块图:
用户使用的SQL、Streaming、MLib、GraphX接口最终都会转换成Spark Core分布式运行。
目前用户用的比较多的是SQL和Streaming,这里先主要介绍下这两个。
Spark SQL
Spark SQL是Spark提供的SQL接口,用户使用Spark SQL可以像使用传统数据库一样使用SQL。例如:创建表、删除表、查询表、join表等。连接到Spark SQL后可以做如下操作。
# 在Spark中创建一个表:test_parquet,表的存储文件格式为:parquet
create table test_parquet(
id int,
name string,
value double
) using parquet;
此命令运行完毕后,Spark系统会在hdfs上创建一个名称为test_parquet的目录,例如/user/hive/warehouse/test_parquet/。
然后往Spark表中插入数据。
# 往Spark表:test_parquet插入数据
insert into test_parquet values(1001, 'name1001', 95.49);
insert into test_parquet values(1002, 'name1002', 73.25);
insert into test_parquet values(1003, 'name1003', 25.65);
insert into test_parquet values(1004, 'name1004', 23.39);
insert into test_parquet values(1005, 'name1005', 8.64);
insert into test_parquet values(1006, 'name1006', 52.60);
insert into test_parquet values(1007, 'name1007', 42.16);
insert into test_parquet values(1008, 'name1008', 85.39);
insert into test_parquet values(1009, 'name1009', 7.22);
insert into test_parquet values(1010, 'name1010', 10.43);
插入数据的步骤运行后,Spark会在hdfs目录:/user/hive/warehouse/test_parquet/中创建一些列后缀为.parquet的文件,如下:
/user/hive/warehouse/test_parquet/part-00000-49fefdf2-2ef0-4b7d-9414-6ac52e0390cb-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-52106855-8dd8-4f3a-8746-3025cf4898ea-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-89d738e9-754b-44b0-abd3-f2dd91cd0389-c000.snappy.parquet
/user/hive/warehouse/test_parquet/part-00000-bce8efef-13ef-42e2-a6c7-8611e21e931a-c000.snappy.parquet
插入完成后开始查询数据。
select * from test_parquet
查询数据的过程是Spark并行从hdfs系统上拉取每个Parquet,然后在Spark中并行计算。
这里只是简单列举了Spark 创建Parquet表的过程,Spark也可以支持读取其他格式的表,例如对接数据库等。需要了解可参考:Spark Connectors。
Spark SQL除了对用户提供了SQL命令的接口,也提供了API接口。Datasets(DataFrames),例如使用API创建Parquet如下:
//spark 读取json格式文件,返回一个DataFrames
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames保存为Parquet格式文件。
peopleDF.write.parquet("people.parquet")
//读取Parquet文件,返回DataFrames
val parquetFileDF = spark.read.parquet("people.parquet")
// DataFrames注册成一个Parquet表
parquetFileDF.createOrReplaceTempView("parquetFile")
//使用SQL查询Parquet表
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
//打印数据
namesDF.map(attributes => "Name: " + attributes(0)).show()
Spark Streaming
Spark Streaming是流式处理系统,可以处理流式数据。下面用个例子说明Streaming的过程。
Spark Streaming可以对接Kafka。假如kafka产生的数据格式为:
id values time
id001 98.2 1560414467
id002 99.2 1560414468
id001 87.2 1560414469
现在业务需要每分钟从Kafka读取一批数据,对数进行信息补齐,因为kafka拿到的数据只有id信息,用户想补齐name信息。
假如具有id、name信息的表存储在Phoenix中。
这样就可以通过Spark Streaming来完成这些业务诉求。在Spark的业务处理逻辑中拿到kafka的数据后,使用id关联Phoenix表拿到name信息,然再写入到其他数据库。
例如此业务的Spark Streaming的业务逻辑代码如下:
//scala代码样例
val words = messages.transform { rdd =>
rdd.map {line =>
println(s"==0== words = $line")
//逗号分隔
val words = line.value().split(",")
words
}
}.foreachRDD { lineArray =>
lineArray.foreachPartition { dataPartition =>
val connectionPool = Phoenix5xConnectionPool(queryServerAddress)
val phoenixConn = connectionPool.borrowObject()
val statment = phoenixConn.createStatement()
var i = 0
while (dataPartition.hasNext) {
val kv = dataPartition.next()
//关联id、获取name信息。
val rs = statment.executeQuery(s"select name from $phoenixTableName where id = ${kv(0)}")
val name = rs.getString(1)
//把结果写入到数据库
statment.execute(s"upsert into $resultTable values('${kv(0)}','${kv(1)}', ${kv(2)}, '$name')")
i = i + 1
if (i % batchSize == 0) {
phoenixConn.commit()
}
}
phoenixConn.commit()
connectionPool.returnObject(phoenixConn)
}
}
Spark Streming对接Kafka可参考:Spark对接kafkak快速入门。Spark Streming对接Phoenix代码可参考:SparkStreming,SparkSQL。
Spark适合做什么
先看下Spark在当前常用的BigData业务架构中的位置。
下图是常用的BigData 大数据组件Spark+HBase+Cassandra+ES(Solor),这些组件组合可覆盖BigData 95%以上的业务场景。
图中数据BigData分4个层次,由上到下分别为:
业务系统层:一般是直接面向用户的业务系统。
计算层:Spark的分布式计算。
数据库层:HBase+Cassandra数据库提供实时查询的能力。
存储层:HDFS或者OSS。
这里主要介绍下计算层Spark。
Spark计算层会把数据从数据库、列式存储(数仓)中拉去到Spark中进行分布式计算。我们把Spark打开看下是如何分布式计算的。先看下Spark运行时候的部署结构。
所以当Spark拉取数据库、数仓数据时会并行拉取到每个Executor做并行运算。
例如Spark SQL中查询表的例子,以及Spark Streming的中处理批数据的例子,Spark运算时是每个Executor并处理数据的,Executor处理数据的逻辑是由用户编码控制的,例如用户写的SQL语句,调用API写的业务代码等。
那么Spark适合什么样的计算呢?
下图列出了Spark 和HBase数据库各自适用的场景(摘自 HBase和Spark对比):
对比项目 | Phoenix(HBase) | Spark |
---|---|---|
SQL复杂度 | 简单查询, 必须命中索引 且 命中后 返回的数据较少,如果是join,则join任意一则返回的数据量在10w以下,且另一侧必须命中索引。 为了保障集群稳定性,一些复杂的sql及耗时的sql会被平台拒绝运行。 | 全部支持执行完成,支持Spark 映射到Phoenix,做到Spark在简单SQL查询能到Phoenix同样的性能,不过Spark定位为 分析的场景,与Phoenix 纯TP有本质的区别 |
集群 | HBase共享一个集群,本质是HBase提供的SQL | Spark需要单独购买的集群,Spark集群运算不影响其它数据库 |
并发 | 单机 1w-5w左右 | Spark最高不超过100 |
延迟 | 延迟在ms级别,一些命中较多的数据的sql会到 秒 | 一般延迟在300ms以上,大部分sql需要秒,分钟,甚至小时 |
数据Update | Phoenix支持 | Spark不支持 |
支持业务 | 在线业务 | 离线业务 或者 准在线业务 |
举个例子说明下上面每一项对应的场景,如下图:
此图描述的是用户登录手机淘宝,淘宝根据用户的ID信息在淘宝首页推荐商品这样的一个流程。我们看下每个流程中哪些场景适合Phoenix、Spark。
1、 获取用户的推荐列表。
用户登录后,手机淘宝要根据用户的ID从“用户推荐商品列表 user_reco_list”这个表中获取信息。SQL语句可能是这样的:
select * from user_reco_list where user_id = 'user0001' and time='2019-06-22'
这个SQL的特点如下:
- 简单:只有select *,没有join、group by。
- 有关键字过滤:user_id = 'user0001'。
- 返回的结果集少(大概返回几十行)。
- user_reco_list表数据量很大(百亿级别)
- 并发量很大,可能同时会有上万个用户同时登录。
- 低时延:用户一登录要立刻显示推荐。
类似这种特点的业务查询就适合使用在线数据Phoenix。
2、 统计用户的浏览记录。
“用户推荐商品列表 user_reco_list”中数据是怎么来的呢?是从用户的浏览记录、购买记录、加入购物车记录等信息统计而来的。后台任务每天凌晨从用户的记录中进行大量的统计分析,然后把结果写入“用户推荐商品列表 user_reco_list”。SQL语句可能是这的:
select sum(click) as counts, user_id, reco_id from user_scan_list group by user_id, reco_id where times>= '2018-06-30' and times<'2018-12-30'
这个SQL的特点如下:
- 统计分析:有sum、group by。
- 查询时间范围大:times的时间范围要半年,即扫描的数据量大。
- 返回的结果集大,可能返回百万级。
- user_scan_list表数据量很大,百亿级别、千亿级等。
- 并发量小、每天凌晨计算一次。
- 高时延:计算结果可能要分钟级别、甚至小时级别。
类似这种特点的业务查询就适合使用离线数仓:Spark 列存(Parquet)。
通过上面的例子大概可以认识到哪些场景适合Spark,哪些适合Phoenix。Spark和Phoenix相互配合,解决大数据的问题。
Spark如何建数仓
那Spark如何建数仓呢?本质就是把数据导入到Spark,使用Spark的列式存储文件格式(例如parquet)存储数据,使用Spark完成批处理、离线分析的业务。
例如在Spark创建一个以天为分区的明细表:
#创建parquet格式的分区表
create table test_parquet_pt(
id int,
name string,
value double,
dt string
) using parquet
partitioned by(dt);
#导入数据到Spark表:test_parquet_pt。
#例如从Phoenix表中增量导入1天的数据量。
insert into test_parquet_pt select * from phoenix_table where dt>='2019-01-02' and dt<'2019-01-03';
#分析Spark表
select avg(value) from test_parquet_pt group by dt;
上面只是个简单的实例,下面举例几个实际的业务场景。
先看下面的一个典型的业务场景。
上图是一个典型的复杂分析及查询系统。数据流程由图可见:
- 数据由APP、传感器、商业系统等客户的业务系统产生发送到Kafka系统。
- Spark Streming 对接kafka周期读取数据入库到在线数据库HBase/Phoenix,用户的运营系统实时查询在线数据库。
- HBase/Phoenix数据库周期同步到Spark数仓做离线计算。离线计算的结果写回HBase/Phoenix或者其他业务数据库。
上面是一个常用的方案。Spark创建数仓也有客户对数仓进行分层,例如下图:
客户把数仓分为四层:操作数据、公共明细、公共汇总、应用数据。每一层的数据由上一层汇聚、统计计算得来,每一层的数据应用于不同的业务场景。此场景的明细可参考: HBase+Spark构建游戏大数据平台。
小结
本文只是对Spark做个入门的介绍,更详细的资料可参考:Spark社区资料。X-Pack Spark请参考: X-Pack Spark分析引擎。