E-MapReduce中Spark 2.x读写MaxCompute数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: 最新的`aliyun-emapreduce-sdk`将`MaxCompute`数据以`DataSource`的方式接入Spark 2.x,用户可以使用类似Spark 2.x中读写`json/parquet/csv`的方式来访问MaxCompute.

最新的aliyun-emapreduce-sdkMaxCompute数据以DataSource的方式接入Spark 2.x,用户可以使用类似Spark 2.x中读写json/parquet/csv的方式来访问MaxCompute.

0. DataSource

a) DataSource提供了一种插件式的外部数据接入SparkSQL的方式,数据源只要实现相应的DataSource API即可以整合进SparkSQL,它的特点如下:

  • 通过DataSet/DataFrame/sparkSQLText等标准方式来访问数据源
  • SparkSQL引擎优化
  • scala语言接入后,Spark支持的其它语言也可以进行访问,如pyspark等

    Spark 2.x内置支持的数据源:

    • json
    • csv
    • parquet
    • orc
    • text
    • jdbc

    Spark 2.x 访问数据源示例:

    b) 读数据

    val df = spark.read.json("pathToJson")
    
    // 提供schema信息
    val schemaType = StructType(Seq(
      StructField("year", IntegerType, nullable = true))
    spark.read.schema(schemaType)json("pathToJson")
    
    // 带一些参数设置,如csv的分隔符等
    spark.read.option("header", "false").option("sep", ",").csv("pathToCsv"")
    
    // load api, 等同于spark.read.json("pathToLoad")
    spark.read.format("json").load("pathToLoad")
    
    // sql方式访问
    df.createOrReplaceTempView("t")
    spark.sql("select * from t")
    

    c) 写数据

    ```
    val df = Seq(1, 2, 3).toDF("a")
    df.write.json("jsonWritePath")

    // 等同上面写法
    df.write.format("json").save("jsonWritePath")

 // 带参数
 df.write
   .option("header", "true")
   .option("compression", "gZiP")
   .csv("csvWritePath")

 // 路径存在,则覆盖
 df.write.mode("overwrite").json("jsonWritePath")
 **d)**sparkSQLText using DataSource
  spark.sql("create table t(a string) using json")
  spark.sql("insert int table t select 1")
  spark.sql("select * from t")
  ...

#### 1. MaxCompute以DataSource接入Spark 2.x

  如上介绍了DataSource的特点以及读写方式,MaxCompute作为一个数据源,通过E-MapReduce的`aliyun-emapreduce-sdk`也可以通过上述方式来访问。

##### 1.1.  aliyun-emapreduce-sdk

  [Git地址](https://github.com/aliyun/aliyun-emapreduce-sdk) 

  **branch: master-2.x**

#### 1.2 SparkSQL读写MaxCompute

###### a) option参数设置

访问MaxCompute表中的数据,需要一些参数,如下:

parameter | optional | value
------------ | ------------- | ------------
odpsUrl | No  | 
tunnelUrl | No  | 
accessKeySecret | No  | 阿里云accessKeySecret
accessKeyId | No  | 阿里云accessKeyId
project | No  | MaxCompute项目空间
table | No  | MaxCompute表名
numPartitions | Yes  | 表的Partition个数,默认 1
partitionSpec | Yes  | 分区信息,如pt=xxx,多个用逗号分开pt=xxx,dt=xxx
allowCreatNewPartition | Yes  | 分区不存在是否创建,默认 false

######b) 写数据

* MaxCompute中必须已经存在表()),若没有需要去MaxCompute控制台进行创建
* 将`DataFrame`中的数据写入MaxCompute的表中

val df = Seq(("Hello", "E-MapReduce")).toDF("a","b")
df.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl", ")
.option("tunnelUrl"
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("overwrite") //覆盖写
.save()

case class MyClass(a: String, b: String)
val df1 = Seq(MyClass("Hello", "World")).toDF
df1.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl"
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()

// 写分区表 建表:create table t1(a string) partitioned by(b string)
val df2 = Seq("E-MapReduce").toDF("a") // 不包含分区列
df2.write.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t1")
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("allowCreatNewPartition","true") //若分区不存在,是否创建
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.mode("append") //append追加
.save()

**备注:** 
>
DataFrame的列名和类型必须和MaxCompute的表中一致
>
`不支持`spark.write.parititonBy
>
 `不支持`动态分区

* MaxCompute控制台查询表数据进行验证

序号 | a | b
------------ | ------------- | ------------
1 | Hello  | E-MapReduce |
2|Hello | World

###### c) 读数据
* 从上述表中读取数据到`DataFrame`

val df = spark
.read
.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t")
.option("project", "test_odpss")
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.load()

df.show(false)

+-----+-----------+
|a |b |
+-----+-----------+
|Hello|E-MapReduce|
|Hello|World |
+-----+-----------+

// 读出为DataFrame后可进行DataFrame的各种操作,如join
val df1 = Seq(("Hello", "AliYun")).toDF("a", "c")
df.join(df1, "a").show(false)

+-----+-----------+-------+
|a |b |c |
+-----+-----------+-------+
|Hello|E-MapReduce|AliYun|
|Hello|World |AliYun|
+-----+-----------+-------+

// 也可注册为Spark的临时表
df.createOrReplaceTempView("test_t")
spark.sql("select * from test_t").show(false)

df1.createOrReplaceTempView("test_t_1")
spark.sql("select * from test_t join test_t_1 on test_t.a = test_t_1.a ")

// 读分区表 建表:create table t2(a string) partitioned by(b string)
spark.read.format("org.apache.spark.aliyun.maxcompute.datasource")
.option("odpsUrl")
.option("tunnelUrl")
.option("table", "t2") // table t2
.option("project", "test_odpss")
.option("partitionSpec","b='Hello'") // 分区描述
.option("accessKeyId", "your accessKeyId")
.option("accessKeySecret", "your accessKeySecret")
.save()

+-----------+
|a |
+-----------+
|E-MapReduce|
+-----------+

```

d) sparkSQLText
  • 不支持在sparkSQLText直接对MaxCompute表进行相关操作
  • 可以通过上述读数据的方式使用DataFrame注册成临时表的方式,进行相关操作(insert不支持)
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
17天前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
88 14
|
2月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
88 0
|
18天前
|
传感器 人工智能 监控
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
97 14
|
28天前
|
机器学习/深度学习 传感器 监控
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
63 1
|
28天前
|
数据采集 自动驾驶 机器人
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
92 1
|
2月前
|
机器学习/深度学习 监控 大数据
数据当“安全带”:金融市场如何用大数据玩转风险控制?
数据当“安全带”:金融市场如何用大数据玩转风险控制?
99 10
|
2月前
|
机器学习/深度学习 自然语言处理 监控
大数据如何影响新兴市场投资决策?——数据才是真正的风向标
大数据如何影响新兴市场投资决策?——数据才是真正的风向标
61 3
|
2月前
|
机器学习/深度学习 传感器 大数据
大数据真能治堵吗?聊聊交通行业用数据疏通“城市血管”
大数据真能治堵吗?聊聊交通行业用数据疏通“城市血管”
117 4
|
2月前
|
机器学习/深度学习 人工智能 大数据
从数据到决策:政府如何用大数据把事儿办得更明白?
从数据到决策:政府如何用大数据把事儿办得更明白?
78 0
|
3月前
|
SQL 缓存 监控
大数据之路:阿里巴巴大数据实践——实时技术与数据服务
实时技术通过流式架构实现数据的实时采集、处理与存储,支持高并发、低延迟的数据服务。架构涵盖数据分层、多流关联,结合Flink、Kafka等技术实现高效流计算。数据服务提供统一接口,支持SQL查询、数据推送与定时任务,保障数据实时性与可靠性。