使用EMR DataFrame 流处理 Tablestore

简介: 使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ### 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar

使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。

前提条件

  • 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。

    • Spark相关:spark-core、spark-sql、spark-hive
    • Spark Tablestore connector:emr-tablestore-.jar
    • Tablestore Java SDK:tablestore--jar-with-dependencies.jar

    其中表示相应依赖包的版本号,请以实际为准。

  • 已在表格存储侧创建Source表和在Source表上创建通道,详情请参见概述

快速开始

通过项目样例了解快速使用流计算的操作。

  1. 从GitHub下载项目样例的源码,具体下载路径请参见TableStoreSparkDemo。项目中包含完整的依赖和使用样例,具体的依赖请参见项目中的pom文件。

2.阅读TableStoreSparkDemo项目的README文档,并安装最新版的Spark Tablestore connector和Tablestore Java SDK到本地maven库。

说明
Spark Tablestore connector正式版发布以月为周期,目前最新版尚未正式发布,请先使用项目附带的预览版,正式发布后,本文也会进行更新,敬请期待。预览版和正式版只是版本号的区别,相互兼容,业务代码逻辑无需改动。

3.修改Sample代码。
以StructuredTableStoreAggSQLSample为例,对此示例代码的核心代码说明如下:

  • format("tablestore")表示使用ServiceLoader方式加载Spark Tablestore connector,具体配置请参见项目中的META-INF.services。
  • instanceName、tableName、tunnel.id、endpoint、accessKeyId、accessKeySecret分别表示表格存储的实例名称、数据表名称、通道ID、实例endpoint、阿里云账号的AccessKey ID和AccessKey Secret。
  • catalog是一个JSON串,包含字段名和类型,如下示例中的数据表有UserId(STRING类型)、OrderId(STRING类型)、price(DOUBLE类型)和timestamp(LONG类型)四个字段。
  • maxoffsetsperchannel表示每一个mini-batch中每一个channel(分区)最多读取的数据量,默认值为10000。
    val ordersDF = sparkSession.readStream
      .format("tablestore")
      .option("instance.name", instanceName)
      .option("table.name", tableName)
      .option("tunnel.id", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxoffsetsperchannel", maxOffsetsPerChannel) //默认值为10000。
      .option("catalog", dataCatalog)
      .load()
      .createTempView("order_source_stream_view")

  val dataCatalog: String =
    s"""
       |{"columns": {
       |    "UserId": {"type":"string"},
       |    "OrderId": {"type":"string"},
       |    "price": {"type":"double"},
       |    "timestamp": {"type":"long"}
       | }
       |}""".stripMargin

运行调试

根据需求修改示例代码后,可在本地或者通过Spark集群进行运行调试。以StructuredTableStoreAggSQLSample为例说明调试过程。

  • 本地调试

以Intellij IDEA为例说明。

说明
本文测试使用的环境为Spark 2.4.3、Scala 2.11.7和Java SE Development Kit 8,如果使用中遇到问题,请联系表格存储技术支持。
  • 在系统参数中,配置实例名称、数据表名称、实例endpoint、阿里云账号的AccessKey ID和AccessKey Secret等参数。您也可以自定义参数的加载方式。
  • 选择include dependencies with "provided" scope,单击OK。
  • 运行示例代码程序。

1.png

  • 通过Spark集群调试

以spark-submit方式为例说明。示例代码中的master默认为local[*],在Spark集群上运行时可以去掉,使用spark-submit参数传入。

  • 执行mvn -U clean package命令打包,包的路径为target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar。
  • 上传包到Spark集群的Driver节点,并使用spark-submit提交任务。

    spark-submit --class com.aliyun.tablestore.spark.demo.streaming.StructuredTableStoreAggSQLSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar <ots-instanceName> <ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint> <max-offsets-per-channel>

2.png

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
SQL 存储 分布式计算
使用EMR SQL 批处理Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.al
329 0
|
存储 SQL 分布式计算
使用EMR SQL 流处理 Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.aliyun.com/document_
251 0
使用EMR SQL 流处理 Tablestore
|
存储 JSON 分布式计算
使用EMR DataFrame 批处理 Tablestore
使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ## 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar
290 0
使用EMR DataFrame 批处理 Tablestore
|
NoSQL 分布式计算 Java
通过EMR Spark Streaming实时读取Tablestore数据
本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。 场景设计 随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
3493 0
|
7月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
65 3
|
SQL 存储 弹性计算
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1658 0
|
11天前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
11天前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
36 1
|
11月前
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
636 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
277 0
物联网数据通过规则引擎流转到OTS|学习笔记