OSS数据湖实践——EMR + Flink + OSS案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
简介: 构建基于OSS数据源的EMR大数据计算环境,使用Flink大数据计算引擎,实现简单的大数据分析案例。

本文介绍使用Flink大数据分析引擎,基于EMR,利用OSS云存储数据,实现一个分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通E-MapReduce服务和OSS服务。
• 已完成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件。
• 相关更多配置请参考OSS入门文档。

步骤一:数据上传至oss

hadoop fs -put course2.csv oss://your-bucket-name/

步骤二:编写处理代码,及打包


package org.myorg.quickstart

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.TableEnvironment

object OSSExample {

  def main(args: Array[String]) {
    // set up the batch execution environment

    case class Course(Id : Int, Subject : String, Level : String)
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val data: DataSet[(Long, String, String)] = env.readCsvFile("oss://your-bucket-name/course.csv")
    val  course = tableEnv.fromDataSet[(Long, String, String)](data, 'id, 'subject, 'level)
    val  counts = course.groupBy("subject, level").select("subject, level, level.count as cnt")
    val  maxcounts = counts.groupBy("subject").select("subject as subject1, cnt.max as cnt1")
    val result = maxcounts.leftOuterJoin(counts, "cnt=cnt1").select("subject, level, cnt")
    result.toDataSet[(String, String, Long)].print()
  }
}

IDEA Build -> Build Artifact ->Build 打包为OSSFlinkExample jar包

步骤三:上传jar包到Hadoop 或者OSS

把jar 上传到集群header节点,然后使用以下命令

hadoop fs -put OSSExample.jar oss://your-bucket-name/

步骤四:创建FLink作业job,运行作业

1589441726617_2f91171e_9a01_404a_ac74_2e998d9c3d2d

run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c org.myorg.quickstart.OSSExample  ossref://your-bucket-name/OSSFlinkExample.jar

步骤五:查看作业运行是否成功及查看运行结果

1589442550574_5b094f4d_cbc1_4c40_baea_b875bff45021
1589442588976_7872ee42_c97a_4082_a071_974c02006c98

总结

通过以上步骤,可以了解spark 处理OSS数据源的整个过程,这将对后续其他任务作业开发带来初步的参考。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
1月前
|
SQL 安全 Serverless
活动实践 | 基于EMR StarRocks实现游戏玩家画像和行为分析
基于阿里云EMR Serverless StarRocks,利用其物化视图和DLF读写Paimon等能力,构建游戏玩家画像和行为分析平台。通过收集、处理玩家行为日志,最终以报表形式展示分析结果,帮助业务人员决策。
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
543 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
1月前
|
Serverless BI
有奖实践,基于EMR StarRocks实现游戏玩家画像和行为分析
阿里云EMR-StarRocks联合镜舟科技,基于EMR-StarRocks实现游戏实时湖仓分析,免费试用物化视图、Paimon写入查询等新能力,前45位赢取StarRocks定制T恤、Lamy钢笔,小米充电宝,阿里云拍拍灯等活动礼品,前500位均可获得创意马克杯。
90 3
|
5月前
|
分布式计算 大数据 MaxCompute
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
|
5月前
|
分布式计算 测试技术 调度
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
|
5月前
|
SQL 测试技术 流计算
EMR Remote Shuffle Service实践问题之Leader节点变化导致的中断如何解决
EMR Remote Shuffle Service实践问题之Leader节点变化导致的中断如何解决
|
5月前
|
缓存
EMR Remote Shuffle Service实践问题之Mapper的首次PushData请求如何解决
EMR Remote Shuffle Service实践问题之Mapper的首次PushData请求如何解决
|
5月前
|
存储 分布式计算 对象存储
EMR Remote Shuffle Service实践问题之混合Cosco和Zeus的设计如何解决
EMR Remote Shuffle Service实践问题之混合Cosco和Zeus的设计如何解决
|
5月前
|
存储 RDMA
EMR Remote Shuffle Service实践问题之改进Shuffle性能如何解决
EMR Remote Shuffle Service实践问题之改进Shuffle性能如何解决
|
4月前
|
SQL 存储 NoSQL
阿里云 EMR StarRocks 在七猫的应用和实践
本文整理自七猫资深大数据架构师蒋乾老师在 《阿里云 x StarRocks:极速湖仓第二季—上海站》的分享。
328 2