开发者社区> 问答> 正文

E-MapReduce Spark + ODPS是什么?



Spark + MaxCompute



Spark 接入 MaxCompute


本章节将介绍如何使用E-MapReduce SDK在Spark中完成一次MaxCompute数据的读写操作。


  1. 初始化一个OdpsOps对象。在 Spark 中,MaxCompute的数据操作通过OdpsOps类完成,请参照如下步骤创建一个OdpsOps对象:
  2. [backcolor=transparent]     [backcolor=transparent]import[backcolor=transparent] com[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]TableSchema
  3. [backcolor=transparent]     [backcolor=transparent]import[backcolor=transparent] com[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]data[backcolor=transparent].[backcolor=transparent]Record
  4. [backcolor=transparent]     [backcolor=transparent]import[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]spark[backcolor=transparent].[backcolor=transparent]aliyun[backcolor=transparent].[backcolor=transparent]odps[backcolor=transparent].[backcolor=transparent]OdpsOps
  5. [backcolor=transparent]     [backcolor=transparent]import[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]spark[backcolor=transparent].{[backcolor=transparent]SparkContext[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]}
  6. [backcolor=transparent]     [backcolor=transparent]object[backcolor=transparent] [backcolor=transparent]Sample[backcolor=transparent] [backcolor=transparent]{
  7. [backcolor=transparent]       [backcolor=transparent]def[backcolor=transparent] main[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Array[backcolor=transparent][[backcolor=transparent]String[backcolor=transparent]]):[backcolor=transparent] [backcolor=transparent]Unit[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{[backcolor=transparent]    
  8. [backcolor=transparent]         [backcolor=transparent]// == Step-1 ==
  9. [backcolor=transparent]         val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
  10. [backcolor=transparent]         val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
  11. [backcolor=transparent]         [backcolor=transparent]// 以内网地址为例
  12. [backcolor=transparent]         val urls [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Seq[backcolor=transparent]([backcolor=transparent]"http://odps-ext.aliyun-inc.com/api"[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]"http://dt-ext.odps.aliyun-inc.com"[backcolor=transparent])[backcolor=transparent]
  13. [backcolor=transparent]         val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Odps"[backcolor=transparent])
  14. [backcolor=transparent]         val sc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent])
  15. [backcolor=transparent]         val odpsOps [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]OdpsOps[backcolor=transparent]([backcolor=transparent]sc[backcolor=transparent],[backcolor=transparent] accessKeyId[backcolor=transparent],[backcolor=transparent] accessKeySecret[backcolor=transparent],[backcolor=transparent] urls[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent]),[backcolor=transparent] urls[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent]))
  16. [backcolor=transparent]         [backcolor=transparent]// 下面是一些调用代码
  17. [backcolor=transparent]         [backcolor=transparent]// == Step-2 ==
  18. [backcolor=transparent]         [backcolor=transparent]...
  19. [backcolor=transparent]         [backcolor=transparent]// == Step-3 ==
  20. [backcolor=transparent]         [backcolor=transparent]...
  21. [backcolor=transparent]       [backcolor=transparent]}
  22. [backcolor=transparent]       [backcolor=transparent]// == Step-2 ==
  23. [backcolor=transparent]       [backcolor=transparent]// 方法定义1
  24. [backcolor=transparent]       [backcolor=transparent]// == Step-3 ==
  25. [backcolor=transparent]       [backcolor=transparent]// 方法定义2
  26. [backcolor=transparent]     [backcolor=transparent]

从MaxCompute中加载表数据到Spark中。通过OdpsOps对象的readTable方法,可以将MaxCompute中的表加载到Spark中,即生成一个RDD,如下所示:
  1. [backcolor=transparent]         [backcolor=transparent]// == Step-2 ==
  2. [backcolor=transparent]         val project [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]<[backcolor=transparent]odps[backcolor=transparent]-[backcolor=transparent]project[backcolor=transparent]>
  3. [backcolor=transparent]         val table [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]<[backcolor=transparent]odps[backcolor=transparent]-[backcolor=transparent]table[backcolor=transparent]>
  4. [backcolor=transparent]         val numPartitions [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]2
  5. [backcolor=transparent]         val inputData [backcolor=transparent]=[backcolor=transparent] odpsOps[backcolor=transparent].[backcolor=transparent]readTable[backcolor=transparent]([backcolor=transparent]project[backcolor=transparent],[backcolor=transparent] table[backcolor=transparent],[backcolor=transparent] read[backcolor=transparent],[backcolor=transparent] numPartitions[backcolor=transparent])
  6. [backcolor=transparent]         inputData[backcolor=transparent].[backcolor=transparent]top[backcolor=transparent]([backcolor=transparent]10[backcolor=transparent]).[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]println[backcolor=transparent])
  7. [backcolor=transparent]         [backcolor=transparent]// == Step-3 ==
  8. [backcolor=transparent]         [backcolor=transparent]...

在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,如下所示:
  1. [backcolor=transparent]         [backcolor=transparent]def[backcolor=transparent] read[backcolor=transparent]([backcolor=transparent]record[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Record[backcolor=transparent],[backcolor=transparent] schema[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]TableSchema[backcolor=transparent]):[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
  2. [backcolor=transparent]           record[backcolor=transparent].[backcolor=transparent]getString[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])
  3. [backcolor=transparent]         [backcolor=transparent]}

这个函数的含义是将MaxCompute表的第一列加载到Spark运行环境中。
将 Spark 中的结果数据保存到MaxCompute表中。通过OdpsOps对象的saveToTable方法,可以将Spark RDD持久化到MaxCompute中。
  1. [backcolor=transparent]         val resultData [backcolor=transparent]=[backcolor=transparent] inputData[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] s[backcolor=transparent]"$e has been processed."[backcolor=transparent])
  2. [backcolor=transparent]         odpsOps[backcolor=transparent].[backcolor=transparent]saveToTable[backcolor=transparent]([backcolor=transparent]project[backcolor=transparent],[backcolor=transparent] table[backcolor=transparent],[backcolor=transparent] dataRDD[backcolor=transparent],[backcolor=transparent] write[backcolor=transparent])

在上面的代码中,您还需要定义一个write函数,用作写MaxCompute表前数据预处理,如下所示:
  1. [backcolor=transparent]         [backcolor=transparent]def[backcolor=transparent] write[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent],[backcolor=transparent] emptyReord[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Record[backcolor=transparent],[backcolor=transparent] schema[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]TableSchema[backcolor=transparent]):[backcolor=transparent] [backcolor=transparent]Unit[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
  2. [backcolor=transparent]           val r [backcolor=transparent]=[backcolor=transparent] emptyReord
  3. [backcolor=transparent]           r[backcolor=transparent].[backcolor=transparent]set[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent],[backcolor=transparent] s[backcolor=transparent])
  4. [backcolor=transparent]         [backcolor=transparent]}

这个函数的含义是将RDD的每一行数据写到对应MaxCompute表的第一列中。
分区表参数写法说明
SDK支持对MaxCompute分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列pt和ps:
  • 读分区pt为1的表数据:pt=‘1’
  • 读分区pt为1和分区ps为2的表数据:pt=‘1’,ps=‘2’


附录


示例代码请看:

展开
收起
nicenelly 2017-10-27 16:03:47 2276 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
spark替代HIVE实现ETL作业 立即下载
Hive Bucketing in Apache Spark 立即下载
Hive-Bucke in Apache Spark 立即下载