开发者社区> 琸然> 正文

通过EMR Spark Streaming实时读取Tablestore数据

简介: 本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。 场景设计 随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
+关注继续查看

本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。

场景设计

随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
而本文使用的Tablestore数据库可以直接利用它的通道服务功能,供Spark Streaming流式消费,进而计算订单的数量及金额,简化了整个流程,具体如下图所示
1

本文将介绍一个简单的demo,流式统计Tablestore数据表中字段出现的个数。

前提条件

确保将Tablestore实例部署在E-MapReduce集群相同的VPC环境下

准备工作

步骤一 创建Tablestore数据源表

详细开通步骤请参考官方文档,本文demo中所创建出来的表名为SmallTarget,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
2

为表SmallTarget建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。

3

技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

  • 全量:对数据表中历史存量数据消费处理
  • 增量:对数据表中新增数据消费处理
  • 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费

通道服务的详细介绍可查询> Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群

  • 获取环境依赖的JAR包。
Jar包 获取方法
emr-tablestore-X.X.X.jar
X.X.X: Since 1.9.0+
Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar 下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar
  • 集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面。
  • 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
  • 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
  • 上传所有JAR包到emr-header-1节点的某个目录下。

步骤三 运行Spark Streaming作业

  1. 以一个基于emr demo修改的WordCount为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),本例测试用的JAR包地址,完整的WordCount代码参见附录(后续会合到emr demo项目中)。
  2. 该样例以Tablestore表作为数据源,统计的是主键PkString。SmallTarget数据表中数据初始时为空,如下图所示。
    4

3. 启动spark streaming,开启一个统计SmallTarget表中通道zengliang(通道名)实时消费数据的一个监听程序。

spark-submit --class com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

各个参数的详细说明如下表所示。

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见下文Catalog字段说明

向SmallTarget数据表插入数据,spark streaming会实时更新,如下面两张图所示。
5
6

Catalog字段说明

Catalog为一个Json字符串,"columns"里面指定的是需要读取的列的自定义配置,这些列的键值对最终会被TableStore Source所输出,如下图所示,会读取PkString, PkInt和col1三列,其类型分别为string, long和string。

{
  "columns": {
    "PkString": {
      "col": "PkString",
      "type": "string"
    },
    "PkInt": {
      "col": "PkInt",
      "type": "long"
    },
    "col1": {
      "col": "col1",
      "type": "string"
    }
  }
}

预定义列说明

除去Catalog字段中的用户自定义列之外,Tablestore Source的输出默认还会加上一些预定义列的值,以供下游系统灵活的使用,预定义字段说明如下表所示:

预定义列名 说明
__ots_record_type__ Tablestore行操作类型(PUT, UPDATE, DELETE)
__ots_record_timestamp__ Tablestore行的时间戳,单位为ms
__ots_column_type_<ColumnName> 某列的操作类型,其中ColumnName为Catalog字段中定义的列

附录

package com.aliyun.emr.example.spark.sql.streaming

import java.util.UUID

import org.apache.spark.sql.SparkSession

object StructuredTableStoreWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      System.err.println(
        "Usage: StructuredTableStoreWordCount <ots-instanceName>" +
          "<ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint>" +
          "<max-offsets-per-channel> [<checkpoint-location>]"
      )
    }

    val Array(
    instanceName,
    tableName,
    tunnelId,
    accessKeyId,
    accessKeySecret,
    endpoint,
    maxOffsetsPerChannel,
    _*
    ) = args

    System.out.println(args.toSeq.toString)

    val checkpointLocation =
      if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession.builder.appName("TableStoreWordCount").master("local[5]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val lines = spark.readStream
      .format("tablestore")
      .option("instance.name", instanceName)
      .option("table.name", tableName)
      .option("tunnel.id", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxOffsetsPerChannel", maxOffsetsPerChannel) // default 10000
      .option(
        "catalog",
        "{\"columns\":{\"PkString\":{\"col\":\"PkString\",\"type\":\"string\"},\"PkInt\":{\"col\":\"PkInt\",\"type\":\"long\"}," +
          "\"col1\":{\"col\":\"col1\",\"type\":\"string\"}}}"
      )
      .load()
      .selectExpr("PkString", "PkInt", "__ots_record_type__")
      .as[(String, Long, String)]

    val wordCounts = lines
      .flatMap(line => {
        System.out.println(s"wordCount line: ${line}")
        line._1.split(" ")
      })
      .groupBy("value")
      .count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }
}

写在最后

本篇文章我们介绍了如何在E-MapReduce中实时流式的处理Tablestore中的数据,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
image

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
74 0
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
143 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
492 0
使用EMR DataFrame 流处理 Tablestore
使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ### 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar
354 0
使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖
本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。 ## 架构介绍 表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。 表格存储支持多种数据库模型
583 0
使用EMR SQL 批处理Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.al
237 0
使用EMR SQL 流处理 Tablestore
通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。 ## 前提条件 - 已创建E-MapReduce Hadoop集群。具体操作,请参见[创建集群](https://help.aliyun.com/document_
185 0
海量结构化数据解决方案-表格存储场景解读
数据是驱动业务创新的最核心的资产。不同类型的数据如非结构化数据(视频、图片等)、结构化数据(订单、轨迹),面向不同业务的使用要求需要选择适合的存储引擎,能够真正发挥数据的价值。针对于海量的非强事务的海量结构化/半结构数据,表格存储一站式解决。这里详细解读该适合场景的使用解读。
3950 0
阿里云物联网平台数据转发到表格存储(Table Store)示例参考
本文主要结合物模型的结构体类型属性数据,演示payLoad的设置及规则引擎的配置。
896 0
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1270 0
+关注
琸然
上去就是一梭子代码。
文章
问答
来源圈子
更多
阿里云存储基于飞天盘古2.0分布式存储系统,产品包括对象存储OSS、块存储Block Storage、共享文件存储NAS、表格存储、日志存储与分析、归档存储及混合云存储等,充分满足用户数据存储和迁移上云需求,连续三年跻身全球云存储魔力象限四强。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
玩转 Tablestore 入门与实战
立即下载
TableStore在社交类场景下的应用
立即下载
一站式结构化数据存储Tablestore实战手册
立即下载