使用EMR DataFrame 批处理 Tablestore

本文涉及的产品
表格存储 Tablestore,50G 2个月
简介: 使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ## 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar

使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。

前提条件

  • 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。

    • Spark相关:spark-core、spark-sql、spark-hive

    • Spark Tablestore connector:emr-tablestore-.jar

    • Tablestore Java SDK:tablestore--jar-with-dependencies.jar

    其中表示相应依赖包的版本号,请以实际为准。

  • 已在表格存储侧创建数据表,详情请参见概述

快速开始

通过项目样例了解快速使用批计算的操作。

  1. 从GitHub下载项目样例的源码,具体下载路径请参见TableStoreSparkDemo。项目中包含完整的依赖和使用样例,具体的依赖请参见项目中的pom文件。

  2. 阅读TableStoreSparkDemo项目的README文档,并安装最新版的Spark Tablestore connector和Tablestore Java SDK到本地maven库。

    说明:Spark Tablestore connector正式版发布以月为周期,目前最新版尚未正式发布,请先使用项目附带的预览版,正式发布后,本文也会进行更新,敬请期待。预览版和正式版只是版本号的区别,相互兼容,业务代码逻辑无需改动。

  3. 修改Sample代码
    以TableStoreBatchSample为例,对此示例代码的核心代码说明如下:

    • format("tablestore")表示使用ServiceLoader方式加载Spark Tablestore connector,具体配置请参见项目中的META-INF.services。

    • instanceName、tableName、endpoint、accessKeyId、accessKeySecret分别表示表格存储的实例名称、数据表名称、实例endpoint、阿里云账号的AccessKey ID和AccessKey Secret。

    • catalog是一个json串,包含字段名和类型,如下示例中的数据表有salt(Long类型)、UserId(String类型)、OrderId(String类型)、price(Double类型)和timestamp(Long类型)五个字段。最新版本中支持使用Schema方式替换catalog的配置,请根据实际选择。

    • split.size.mbs表示每个Split的切分大小,默认值为100,单位为MB,可不配置。此值越小产生的Split会越多,对应Spark的Task也会越多。

    val df = sparkSession.read
    .format("tablestore")
    .option("instance.name", instanceName)
    .option("table.name", tableName)
    .option("endpoint", endpoint)
    .option("access.key.id", accessKeyId)
    .option("access.key.secret", accessKeySecret)
    .option("split.size.mbs", 100)
    .option("catalog", dataCatalog)
    // 最新版本支持使用Schema方式替换catalog的配置。
    //.schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")
    .load()
    
    val dataCatalog: String =
      s"""
         |{"columns": {
         |    "salt": {"type":"long"},
         |    "UserId": {"type":"string"},
         |    "OrderId": {"type":"string"},
         |    "price": {"type":"double"},
         |    "timestamp": {"type":"long"}
         | }
         |}""".stripMargin

运行调试

根据需求修改示例代码后,可在本地或者通过Spark集群进行运行调试。以TableStoreBatchSample为例说明调试过程。

本地调试

以Intellij IDEA为例说明。

说明:本文测试使用的环境为Spark 2.4.3、Scala 2.11.7和Java SE Development Kit 8,如果使用中遇到问题,请联系表格存储技术支持。

  1. 在系统参数中,配置实例名称、数据表名称、实例endpoint、阿里云账号的AccessKey ID和AccessKey Secret等参数。您也可以自定义参数的加载方式。

  2. 选择include dependencies with "provided" scope,单击OK。

  3. 运行示例代码程序。

    image.png

通过Spark集群调试

以spark-submit方式为例说明。示例代码中的master默认为local[*],在Spark集群上运行时可以去掉,使用spark-submit参数传入。

  1. 执行mvn -U clean package命令打包,包的路径为target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar。

  2. 上传包到Spark集群的Driver节点,并使用spark-submit提交任务。

    spark-submit --class com.aliyun.tablestore.spark.demo.batch.TableStoreBatchSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar <ots-instanceName> <ots-tableName> <access-key-id> <access-key-secret> <ots-endpoint>
    

    image.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 JSON 分布式计算
使用EMR DataFrame 流处理 Tablestore
使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ### 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar
466 0
使用EMR DataFrame 流处理 Tablestore
|
SQL 存储 JSON
使用EMR 批处理的最佳实践-谓词下推
批计算中的多元索引查询方式可以自定义谓词下推配置。目前只能设置与Long、String类型的列做大小比较的谓词是否下推。 ## 背景信息 谓词下推适用于当多元索引中多字段过滤的中间结果数据量较大,则中间结果的合并较为耗时的场景。此时可以将某些字段的过滤从存储层(表格存储)提到计算层(Spark)处理,提高查询效率。 例如`select * from table where a =
386 0
|
SQL 存储 分布式计算
使用EMR SQL 批处理Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.al
355 0
|
NoSQL 分布式计算 Java
通过EMR Spark Streaming实时读取Tablestore数据
本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。 场景设计 随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
3555 0
|
5月前
|
分布式计算 大数据 MaxCompute
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
|
5月前
|
分布式计算 测试技术 调度
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
|
3月前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
本文介绍了阿里云EMR StarRocks在数据湖分析领域的应用,涵盖StarRocks的数据湖能力、如何构建基于Paimon的实时湖仓、StarRocks与Paimon的最新进展及未来规划。文章强调了StarRocks在极速统一、简单易用方面的优势,以及在数据湖分析加速、湖仓分层建模、冷热融合及全链路ETL等场景的应用。
342 8
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
|
3月前
|
SQL 存储 缓存
降本60% ,阿里云 EMR StarRocks 全新发布存算分离版本
阿里云 EMR Serverless StarRocks 现已推出全新存算分离版本,该版本不仅基于开源 StarRocks 进行了全面优化,实现了存储与计算解耦架构,还在性能、弹性伸缩以及多计算组隔离能力方面取得了显著进展。
431 6
|
3月前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
讲师焦明烨介绍了StarRocks的数据湖能力,如何使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓,StarRocks与Paimon的最新进展及未来规划。
151 3
|
4月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
187 3
阿里云 EMR Serverless Spark 版正式开启商业化