OSS数据湖实践——EMR + Flink + OSS案例

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 构建基于OSS数据源的EMR大数据计算环境,使用Flink大数据计算引擎,实现简单的大数据分析案例。

本文介绍使用Flink大数据分析引擎,基于EMR,利用OSS云存储数据,实现一个分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通E-MapReduce服务和OSS服务。
• 已完成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件。
• 相关更多配置请参考OSS入门文档。

步骤一:数据上传至oss

hadoop fs -put course2.csv oss://your-bucket-name/

步骤二:编写处理代码,及打包


package org.myorg.quickstart

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.TableEnvironment

object OSSExample {

  def main(args: Array[String]) {
    // set up the batch execution environment

    case class Course(Id : Int, Subject : String, Level : String)
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val data: DataSet[(Long, String, String)] = env.readCsvFile("oss://your-bucket-name/course.csv")
    val  course = tableEnv.fromDataSet[(Long, String, String)](data, 'id, 'subject, 'level)
    val  counts = course.groupBy("subject, level").select("subject, level, level.count as cnt")
    val  maxcounts = counts.groupBy("subject").select("subject as subject1, cnt.max as cnt1")
    val result = maxcounts.leftOuterJoin(counts, "cnt=cnt1").select("subject, level, cnt")
    result.toDataSet[(String, String, Long)].print()
  }
}

IDEA Build -> Build Artifact ->Build 打包为OSSFlinkExample jar包

步骤三:上传jar包到Hadoop 或者OSS

把jar 上传到集群header节点,然后使用以下命令

hadoop fs -put OSSExample.jar oss://your-bucket-name/

步骤四:创建FLink作业job,运行作业

1589441726617_2f91171e_9a01_404a_ac74_2e998d9c3d2d

run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c org.myorg.quickstart.OSSExample  ossref://your-bucket-name/OSSFlinkExample.jar

步骤五:查看作业运行是否成功及查看运行结果

1589442550574_5b094f4d_cbc1_4c40_baea_b875bff45021
1589442588976_7872ee42_c97a_4082_a071_974c02006c98

总结

通过以上步骤,可以了解spark 处理OSS数据源的整个过程,这将对后续其他任务作业开发带来初步的参考。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
25天前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
59 0
|
25天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
107 0
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
152 15
|
9天前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
42 9
|
10天前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验
|
11天前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
29 4
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
519 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
440 13
Flink CDC 在新能源制造业的实践
|
25天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
40 0
|
25天前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
36 0