开发者社区> 阿里云存储服务> 正文

通过EMR Spark Streaming实时读取Tablestore数据

简介: 本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。场景设计随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。

本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。

场景设计

随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
而本文使用的Tablestore数据库可以直接利用它的通道服务功能,供Spark Streaming流式消费,进而计算订单的数量及金额,简化了整个流程,具体如下图所示
1

本文将介绍一个简单的demo,流式统计Tablestore数据表中字段出现的个数。

前提条件

确保将Tablestore实例部署在E-MapReduce集群相同的VPC环境下

准备工作

步骤一 创建Tablestore数据源表

详细开通步骤请参考官方文档,本文demo中所创建出来的表名为SmallTarget,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
2

为表SmallTarget建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。

3

技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

  • 全量:对数据表中历史存量数据消费处理
  • 增量:对数据表中新增数据消费处理
  • 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费

通道服务的详细介绍可查询> Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群

  • 获取环境依赖的JAR包。
Jar包 获取方法
emr-tablestore-X.X.X.jar
X.X.X: Since 1.8.0+
Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar 下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar
  • 集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面。
  • 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
  • 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
  • 上传所有JAR包到emr-header-1节点的某个目录下。

步骤三 运行Spark Streaming作业

  1. 以一个基于emr demo修改的WordCount为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),本例测试用的JAR包地址,完整的WordCount代码参见附录(后续会合到emr demo项目中)。
  2. 该样例以Tablestore表作为数据源,统计的是主键PkString。SmallTarget数据表中数据初始时为空,如下图所示。
    4

3. 启动spark streaming,开启一个统计SmallTarget表中通道zengliang(通道名)实时消费数据的一个监听程序。

spark-submit --class com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

各个参数的详细说明如下表所示。

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见下文Catalog字段说明

向SmallTarget数据表插入数据,spark streaming会实时更新,如下面两张图所示。
5
6

Catalog字段说明

Catalog为一个Json字符串,"columns"里面指定的是需要读取的列的自定义配置,这些列的键值对最终会被TableStore Source所输出,如下图所示,会读取PkString, PkInt和col1三列,其类型分别为string, long和string。

{
  "columns": {
    "PkString": {
      "col": "PkString",
      "type": "string"
    },
    "PkInt": {
      "col": "PkInt",
      "type": "long"
    },
    "col1": {
      "col": "col1",
      "type": "string"
    }
  }
}

预定义列说明

除去Catalog字段中的用户自定义列之外,Tablestore Source的输出默认还会加上一些预定义列的值,以供下游系统灵活的使用,预定义字段说明如下表所示:

预定义列名 说明
__ots_record_type__ Tablestore行操作类型(PUT, UPDATE, DELETE)
__ots_record_timestamp__ Tablestore行的时间戳,单位为ms
__ots_column_type_<ColumnName> 某列的操作类型,其中ColumnName为Catalog字段中定义的列

附录

package com.aliyun.emr.example.spark.sql.streaming

import java.util.UUID

import org.apache.spark.sql.SparkSession

object StructuredTableStoreWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      System.err.println(
        "Usage: StructuredTableStoreWordCount <ots-instanceName>" +
          "<ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint>" +
          "<max-offsets-per-channel> [<checkpoint-location>]"
      )
    }

    val Array(
      instanceName,
      tableName,
      tunnelId,
      accessKeyId,
      accessKeySecret,
      endpoint,
      maxOffsetsPerChannel,
      _*
    ) = args

    System.out.println(args.toSeq.toString)

    val checkpointLocation =
      if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession.builder.appName("TableStoreWordCount").master("local[5]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val lines = spark.readStream
      .format("tablestore")
      .option("ots.instance", instanceName)
      .option("ots.table", tableName)
      .option("ots.tunnel", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxOffsetsPerChannel", maxOffsetsPerChannel) // default 10000
      .option(
        "catalog",
        "{\"columns\":{\"PkString\":{\"col\":\"PkString\",\"type\":\"string\"},\"PkInt\":{\"col\":\"PkInt\",\"type\":\"int\"}," +
          "\"col1\":{\"col\":\"col1\",\"type\":\"string\"}}}"
      )
      .load()
      .selectExpr("PkString", "PkInt", "__ots_record_type__")
      .as[(String, Integer, String)]

    val wordCounts = lines
      .flatMap(line => {
        System.out.println(s"wordCount line: ${line}")
        line._1.split(" ")
      })
      .groupBy("value")
      .count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }
}

写在最后

本篇文章我们介绍了如何在E-MapReduce中实时流式的处理Tablestore中的数据,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
image

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
上一篇:基于Tablestore Tunnel的数据复制实战 下一篇:Tablestore+Delta Lake(快速开始)
阿里云存储服务
使用钉钉扫一扫加入圈子
+ 订阅

阿里云存储基于飞天盘古2.0分布式存储系统,产品多种多样,充分满足用户数据存储和迁移上云需求。

官方博客
链接