通过EMR Spark Streaming实时读取Tablestore数据

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。 场景设计 随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。

本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。

场景设计

随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
而本文使用的Tablestore数据库可以直接利用它的通道服务功能,供Spark Streaming流式消费,进而计算订单的数量及金额,简化了整个流程,具体如下图所示
1

本文将介绍一个简单的demo,流式统计Tablestore数据表中字段出现的个数。

前提条件

确保将Tablestore实例部署在E-MapReduce集群相同的VPC环境下

准备工作

步骤一 创建Tablestore数据源表

详细开通步骤请参考官方文档,本文demo中所创建出来的表名为SmallTarget,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
2

为表SmallTarget建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。

3

技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

  • 全量:对数据表中历史存量数据消费处理
  • 增量:对数据表中新增数据消费处理
  • 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费

通道服务的详细介绍可查询> Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群

  • 获取环境依赖的JAR包。
Jar包 获取方法
emr-tablestore-X.X.X.jar
X.X.X: Since 1.9.0+
Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar 下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar
  • 集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面。
  • 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
  • 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
  • 上传所有JAR包到emr-header-1节点的某个目录下。

步骤三 运行Spark Streaming作业

  1. 以一个基于emr demo修改的WordCount为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),本例测试用的JAR包地址,完整的WordCount代码参见附录(后续会合到emr demo项目中)。
  2. 该样例以Tablestore表作为数据源,统计的是主键PkString。SmallTarget数据表中数据初始时为空,如下图所示。
    4

3. 启动spark streaming,开启一个统计SmallTarget表中通道zengliang(通道名)实时消费数据的一个监听程序。

spark-submit --class com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

各个参数的详细说明如下表所示。

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见下文Catalog字段说明

向SmallTarget数据表插入数据,spark streaming会实时更新,如下面两张图所示。
5
6

Catalog字段说明

Catalog为一个Json字符串,"columns"里面指定的是需要读取的列的自定义配置,这些列的键值对最终会被TableStore Source所输出,如下图所示,会读取PkString, PkInt和col1三列,其类型分别为string, long和string。

{
  "columns": {
    "PkString": {
      "col": "PkString",
      "type": "string"
    },
    "PkInt": {
      "col": "PkInt",
      "type": "long"
    },
    "col1": {
      "col": "col1",
      "type": "string"
    }
  }
}

预定义列说明

除去Catalog字段中的用户自定义列之外,Tablestore Source的输出默认还会加上一些预定义列的值,以供下游系统灵活的使用,预定义字段说明如下表所示:

预定义列名 说明
__ots_record_type__ Tablestore行操作类型(PUT, UPDATE, DELETE)
__ots_record_timestamp__ Tablestore行的时间戳,单位为ms
__ots_column_type_<ColumnName> 某列的操作类型,其中ColumnName为Catalog字段中定义的列

附录

package com.aliyun.emr.example.spark.sql.streaming

import java.util.UUID

import org.apache.spark.sql.SparkSession

object StructuredTableStoreWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      System.err.println(
        "Usage: StructuredTableStoreWordCount <ots-instanceName>" +
          "<ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint>" +
          "<max-offsets-per-channel> [<checkpoint-location>]"
      )
    }

    val Array(
    instanceName,
    tableName,
    tunnelId,
    accessKeyId,
    accessKeySecret,
    endpoint,
    maxOffsetsPerChannel,
    _*
    ) = args

    System.out.println(args.toSeq.toString)

    val checkpointLocation =
      if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession.builder.appName("TableStoreWordCount").master("local[5]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val lines = spark.readStream
      .format("tablestore")
      .option("instance.name", instanceName)
      .option("table.name", tableName)
      .option("tunnel.id", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxOffsetsPerChannel", maxOffsetsPerChannel) // default 10000
      .option(
        "catalog",
        "{\"columns\":{\"PkString\":{\"col\":\"PkString\",\"type\":\"string\"},\"PkInt\":{\"col\":\"PkInt\",\"type\":\"long\"}," +
          "\"col1\":{\"col\":\"col1\",\"type\":\"string\"}}}"
      )
      .load()
      .selectExpr("PkString", "PkInt", "__ots_record_type__")
      .as[(String, Long, String)]

    val wordCounts = lines
      .flatMap(line => {
        System.out.println(s"wordCount line: ${line}")
        line._1.split(" ")
      })
      .groupBy("value")
      .count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }
}

写在最后

本篇文章我们介绍了如何在E-MapReduce中实时流式的处理Tablestore中的数据,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
image

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
25天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
45 3
|
22天前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
本文介绍了阿里云EMR StarRocks在数据湖分析领域的应用,涵盖StarRocks的数据湖能力、如何构建基于Paimon的实时湖仓、StarRocks与Paimon的最新进展及未来规划。文章强调了StarRocks在极速统一、简单易用方面的优势,以及在数据湖分析加速、湖仓分层建模、冷热融合及全链路ETL等场景的应用。
219 2
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
|
15天前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
讲师焦明烨介绍了StarRocks的数据湖能力,如何使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓,StarRocks与Paimon的最新进展及未来规划。
83 3
|
3月前
|
存储 分布式计算 Java
|
3月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
236 4
|
3月前
|
存储 缓存 分布式计算
|
3月前
|
SQL 存储 分布式计算
|
3月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
48 1
|
3月前
|
存储 缓存 数据管理
阿里云EMR数据湖文件系统问题之JindoFS数据孤岛的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFS数据孤岛的问题如何解决
|
存储 分布式计算 Spark
Spark会把数据都载入到内存么?
这篇文章算是个科普贴。如果已经熟悉Spark的就略过吧。
1881 0