OSS数据湖实践——EMR + Flink + OSS案例

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 构建基于OSS数据源的EMR大数据计算环境,使用Flink大数据计算引擎,实现简单的大数据分析案例。

本文介绍使用Flink大数据分析引擎,基于EMR,利用OSS云存储数据,实现一个分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通E-MapReduce服务和OSS服务。
• 已完成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件。
• 相关更多配置请参考OSS入门文档。

步骤一:数据上传至oss

hadoop fs -put course2.csv oss://your-bucket-name/
AI 代码解读

步骤二:编写处理代码,及打包


package org.myorg.quickstart

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.TableEnvironment

object OSSExample {

  def main(args: Array[String]) {
    // set up the batch execution environment

    case class Course(Id : Int, Subject : String, Level : String)
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val data: DataSet[(Long, String, String)] = env.readCsvFile("oss://your-bucket-name/course.csv")
    val  course = tableEnv.fromDataSet[(Long, String, String)](data, 'id, 'subject, 'level)
    val  counts = course.groupBy("subject, level").select("subject, level, level.count as cnt")
    val  maxcounts = counts.groupBy("subject").select("subject as subject1, cnt.max as cnt1")
    val result = maxcounts.leftOuterJoin(counts, "cnt=cnt1").select("subject, level, cnt")
    result.toDataSet[(String, String, Long)].print()
  }
}
AI 代码解读

IDEA Build -> Build Artifact ->Build 打包为OSSFlinkExample jar包

步骤三:上传jar包到Hadoop 或者OSS

把jar 上传到集群header节点,然后使用以下命令

hadoop fs -put OSSExample.jar oss://your-bucket-name/
AI 代码解读

步骤四:创建FLink作业job,运行作业

1589441726617_2f91171e_9a01_404a_ac74_2e998d9c3d2d

run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c org.myorg.quickstart.OSSExample  ossref://your-bucket-name/OSSFlinkExample.jar
AI 代码解读

步骤五:查看作业运行是否成功及查看运行结果

1589442550574_5b094f4d_cbc1_4c40_baea_b875bff45021
1589442588976_7872ee42_c97a_4082_a071_974c02006c98

总结

通过以上步骤,可以了解spark 处理OSS数据源的整个过程,这将对后续其他任务作业开发带来初步的参考。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
京东物流基于Flink & StarRocks的湖仓建设实践
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
478 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
Dify实践|Dify on DMS+对象存储OSS,实现多副本部署方案
本文介绍了在DMS上部署Dify的详细步骤,用户可选择一键购买资源或基于现有资源部署Dify,需配置RDS PostgreSQL、Redis、AnalyticDB for PostgreSQL等实例,并设置存储路径和资源规格。文中还提供了具体配置参数说明及操作截图,帮助用户顺利完成部署。
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
295 1
Flink CDC + Hologres高性能数据同步优化实践
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
280 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
141 2