最新的aliyun-emapreduce-sdk
将MaxCompute
数据以DataSource
的方式接入Spark 2.x,用户可以使用类似Spark 2.x中读写json/parquet/csv
的方式来访问MaxCompute.
0. DataSource
a) DataSource
提供了一种插件式的外部数据接入SparkSQL的方式,数据源只要实现相应的DataSource API
即可以整合进SparkSQL,它的特点如下:
- 通过DataSet/DataFrame/sparkSQLText等标准方式来访问数据源
- SparkSQL引擎优化
scala语言接入后,Spark支持的其它语言也可以进行访问,如pyspark等
Spark 2.x内置支持的数据源:
- json
- csv
- parquet
- orc
- text
- jdbc
Spark 2.x 访问数据源示例:
b) 读数据
val df = spark.read.json("pathToJson") // 提供schema信息 val schemaType = StructType(Seq( StructField("year", IntegerType, nullable = true)) spark.read.schema(schemaType)json("pathToJson") // 带一些参数设置,如csv的分隔符等 spark.read.option("header", "false").option("sep", ",").csv("pathToCsv"") // load api, 等同于spark.read.json("pathToLoad") spark.read.format("json").load("pathToLoad") // sql方式访问 df.createOrReplaceTempView("t") spark.sql("select * from t")
c) 写数据
```
val df = Seq(1, 2, 3).toDF("a")
df.write.json("jsonWritePath")// 等同上面写法
df.write.format("json").save("jsonWritePath")
// 带参数
df.write
.option("header", "true")
.option("compression", "gZiP")
.csv("csvWritePath")
// 路径存在,则覆盖
df.write.mode("overwrite").json("jsonWritePath")
**d)**sparkSQLText using DataSource
spark.sql("create table t(a string) using json")
spark.sql("insert int table t select 1")
spark.sql("select * from t")
...
#### 1. MaxCompute以DataSource接入Spark 2.x
如上介绍了DataSource的特点以及读写方式,MaxCompute作为一个数据源,通过E-MapReduce的`aliyun-emapreduce-sdk`也可以通过上述方式来访问。
##### 1.1. aliyun-emapreduce-sdk
[Git地址](https://github.com/aliyun/aliyun-emapreduce-sdk)
**branch: master-2.x**
#### 1.2 SparkSQL读写MaxCompute
###### a) option参数设置
访问MaxCompute表中的数据,需要一些参数,如下:
parameter | optional | value
------------ | ------------- | ------------
odpsUrl | No |
tunnelUrl | No |
accessKeySecret | No | 阿里云accessKeySecret
accessKeyId | No | 阿里云accessKeyId
project | No | MaxCompute项目空间
table | No | MaxCompute表名
numPartitions | Yes | 表的Partition个数,默认 1
partitionSpec | Yes | 分区信息,如pt=xxx,多个用逗号分开pt=xxx,dt=xxx
allowCreatNewPartition | Yes | 分区不存在是否创建,默认 false
######b) 写数据
* MaxCompute中必须已经存在表()),若没有需要去MaxCompute控制台进行创建
* 将`DataFrame`中的数据写入MaxCompute的表中
val df = Seq(("Hello", "E-MapReduce")).toDF("a","b")
df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", ")
.option("tunnelUrl"
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("overwrite") //覆盖写
.save()
case class MyClass(a: String, b: String)
val df1 = Seq(MyClass("Hello", "World")).toDF
df1.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl"
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()
// 写分区表 建表:create table t1(a string) partitioned by(b string)
val df2 = Seq("E-MapReduce").toDF("a") // 不包含分区列
df2.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t1")
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("allowCreatNewPartition","true") //若分区不存在,是否创建
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()
**备注:**
>
DataFrame的列名和类型必须和MaxCompute的表中一致
>
`不支持`spark.write.parititonBy
>
`不支持`动态分区
* MaxCompute控制台查询表数据进行验证
序号 | a | b
------------ | ------------- | ------------
1 | Hello | E-MapReduce |
2|Hello | World
###### c) 读数据
* 从上述表中读取数据到`DataFrame`
val df = spark
.read
.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.load()
df.show(false)
+-----+-----------+
|a |b |
+-----+-----------+
|Hello|E-MapReduce|
|Hello|World |
+-----+-----------+
// 读出为DataFrame后可进行DataFrame的各种操作,如join
val df1 = Seq(("Hello", "AliYun")).toDF("a", "c")
df.join(df1, "a").show(false)
+-----+-----------+-------+
|a |b |c |
+-----+-----------+-------+
|Hello|E-MapReduce|AliYun|
|Hello|World |AliYun|
+-----+-----------+-------+
// 也可注册为Spark的临时表
df.createOrReplaceTempView("test_t")
spark.sql("select * from test_t").show(false)
df1.createOrReplaceTempView("test_t_1")
spark.sql("select * from test_t join test_t_1 on test_t.a = test_t_1.a ")
// 读分区表 建表:create table t2(a string) partitioned by(b string)
spark.read.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t2") // table t2
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.save()
+-----------+
|a |
+-----------+
|E-MapReduce|
+-----------+
```
d) sparkSQLText
不支持
在sparkSQLText直接对MaxCompute表
进行相关操作- 可以通过上述读数据的方式使用
DataFrame
注册成临时表的方式,进行相关操作(insert不支持
)