Spark + MaxCompute
Spark 接入 MaxCompute
本章节将介绍如何使用E-MapReduce SDK在Spark中完成一次MaxCompute数据的读写操作。
初始化一个OdpsOps对象。在 Spark 中,MaxCompute的数据操作通过OdpsOps类完成,请参照如下步骤创建一个OdpsOps对象:- [backcolor=transparent] [backcolor=transparent]import[backcolor=transparent] com[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]TableSchema
- [backcolor=transparent] [backcolor=transparent]import[backcolor=transparent] com[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]data[backcolor=transparent].[backcolor=transparent]Record
- [backcolor=transparent] [backcolor=transparent]import[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]spark[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]OdpsOps
- [backcolor=transparent] [backcolor=transparent]import[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]spark[backcolor=transparent].{[backcolor=transparent]SparkContext[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]}
- [backcolor=transparent] [backcolor=transparent]object[backcolor=transparent] [backcolor=transparent]Sample[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] [backcolor=transparent]def[backcolor=transparent] main[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Array[backcolor=transparent][[backcolor=transparent]String[backcolor=transparent]]):[backcolor=transparent] [backcolor=transparent]Unit[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{[backcolor=transparent]
- [backcolor=transparent] [backcolor=transparent]// == Step-1 ==
- [backcolor=transparent] val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
- [backcolor=transparent] val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
- [backcolor=transparent] [backcolor=transparent]// 以内网地址为例
- [backcolor=transparent] val urls [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Seq[backcolor=transparent]([backcolor=transparent]"http://odps-ext.aliyun-inc.com/api"[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]"http://dt-ext.odps.aliyun-inc.com"[backcolor=transparent])[backcolor=transparent]
- [backcolor=transparent] val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Odps"[backcolor=transparent])
- [backcolor=transparent] val sc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent])
- [backcolor=transparent] val odpsOps [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]OdpsOps[backcolor=transparent]([backcolor=transparent]sc[backcolor=transparent],[backcolor=transparent] accessKeyId[backcolor=transparent],[backcolor=transparent] accessKeySecret[backcolor=transparent],[backcolor=transparent] urls[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent]),[backcolor=transparent] urls[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent]// 下面是一些调用代码
- [backcolor=transparent] [backcolor=transparent]// == Step-2 ==
- [backcolor=transparent] [backcolor=transparent]...
- [backcolor=transparent] [backcolor=transparent]// == Step-3 ==
- [backcolor=transparent] [backcolor=transparent]...
- [backcolor=transparent] [backcolor=transparent]}
- [backcolor=transparent] [backcolor=transparent]// == Step-2 ==
- [backcolor=transparent] [backcolor=transparent]// 方法定义1
- [backcolor=transparent] [backcolor=transparent]// == Step-3 ==
- [backcolor=transparent] [backcolor=transparent]// 方法定义2
- [backcolor=transparent] [backcolor=transparent]}
从MaxCompute中加载表数据到Spark中。通过OdpsOps对象的readTable方法,可以将MaxCompute中的表加载到Spark中,即生成一个RDD,如下所示:
- [backcolor=transparent] [backcolor=transparent]// == Step-2 ==
- [backcolor=transparent] val project [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]<[backcolor=transparent]odps[backcolor=transparent]-[backcolor=transparent]project[backcolor=transparent]>
- [backcolor=transparent] val table [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]<[backcolor=transparent]odps[backcolor=transparent]-[backcolor=transparent]table[backcolor=transparent]>
- [backcolor=transparent] val numPartitions [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]2
- [backcolor=transparent] val inputData [backcolor=transparent]=[backcolor=transparent] odpsOps[backcolor=transparent].[backcolor=transparent]readTable[backcolor=transparent]([backcolor=transparent]project[backcolor=transparent],[backcolor=transparent] table[backcolor=transparent],[backcolor=transparent] read[backcolor=transparent],[backcolor=transparent] numPartitions[backcolor=transparent])
- [backcolor=transparent] inputData[backcolor=transparent].[backcolor=transparent]top[backcolor=transparent]([backcolor=transparent]10[backcolor=transparent]).[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]println[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]// == Step-3 ==
- [backcolor=transparent] [backcolor=transparent]...
在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,如下所示:
- [backcolor=transparent] [backcolor=transparent]def[backcolor=transparent] read[backcolor=transparent]([backcolor=transparent]record[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Record[backcolor=transparent],[backcolor=transparent] schema[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]TableSchema[backcolor=transparent]):[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] record[backcolor=transparent].[backcolor=transparent]getString[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]}
这个函数的含义是将MaxCompute表的第一列加载到Spark运行环境中。
将 Spark 中的结果数据保存到MaxCompute表中。通过OdpsOps对象的saveToTable方法,可以将Spark RDD持久化到MaxCompute中。
- [backcolor=transparent] val resultData [backcolor=transparent]=[backcolor=transparent] inputData[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] s[backcolor=transparent]"$e has been processed."[backcolor=transparent])
- [backcolor=transparent] odpsOps[backcolor=transparent].[backcolor=transparent]saveToTable[backcolor=transparent]([backcolor=transparent]project[backcolor=transparent],[backcolor=transparent] table[backcolor=transparent],[backcolor=transparent] dataRDD[backcolor=transparent],[backcolor=transparent] write[backcolor=transparent])
在上面的代码中,您还需要定义一个write函数,用作写MaxCompute表前数据预处理,如下所示:
- [backcolor=transparent] [backcolor=transparent]def[backcolor=transparent] write[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent],[backcolor=transparent] emptyReord[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Record[backcolor=transparent],[backcolor=transparent] schema[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]TableSchema[backcolor=transparent]):[backcolor=transparent] [backcolor=transparent]Unit[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] val r [backcolor=transparent]=[backcolor=transparent] emptyReord
- [backcolor=transparent] r[backcolor=transparent].[backcolor=transparent]set[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent],[backcolor=transparent] s[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]}
这个函数的含义是将RDD的每一行数据写到对应MaxCompute表的第一列中。
分区表参数写法说明
SDK支持对MaxCompute分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列pt和ps:
- 读分区pt为1的表数据:pt=‘1’
- 读分区pt为1和分区ps为2的表数据:pt=‘1’,ps=‘2’
附录
示例代码请看: