本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。
场景设计
随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
而本文使用的Tablestore数据库可以直接利用它的通道服务功能,供Spark Streaming流式消费,进而计算订单的数量及金额,简化了整个流程,具体如下图所示
本文将介绍一个简单的demo,流式统计Tablestore数据表中字段出现的个数。
前提条件
确保将Tablestore实例部署在E-MapReduce集群相同的VPC环境下
准备工作
步骤一 创建Tablestore数据源表
详细开通步骤请参考官方文档,本文demo中所创建出来的表名为SmallTarget,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
为表SmallTarget建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。
技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:
- 全量:对数据表中历史存量数据消费处理
- 增量:对数据表中新增数据消费处理
- 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费
通道服务的详细介绍可查询> Tablestore官网文档。
步骤二 获取相关jar包并上传到hadoop集群
- 获取环境依赖的JAR包。
Jar包 | 获取方法 |
---|---|
emr-tablestore-X.X.X.jar X.X.X: Since 1.9.0+ |
Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore |
tablestore-X.X.X-jar-with-dependencies.jar | 下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar |
- 在集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面。
- 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
- 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
- 上传所有JAR包到emr-header-1节点的某个目录下。
步骤三 运行Spark Streaming作业
- 以一个基于emr demo修改的WordCount为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),本例测试用的JAR包地址,完整的WordCount代码参见附录(后续会合到emr demo项目中)。
- 该样例以Tablestore表作为数据源,统计的是主键PkString。SmallTarget数据表中数据初始时为空,如下图所示。
3. 启动spark streaming,开启一个统计SmallTarget表中通道zengliang(通道名)实时消费数据的一个监听程序。
spark-submit --class com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
各个参数的详细说明如下表所示。
参数 | 参数说明 |
---|---|
com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount | 所要运行的主程序类 |
emr-tablestore-X.X.X-SNAPSHOT.jar | 包含Tablestore source的jar包 |
tablestore-X.X.X-jar-with-dependencies.jar | EMR SDK 相关的Tablestore依赖包 |
examples-X.X.X-shaded.jar | 基于EMR demo修改的包(包含主程序类) |
instance | Tablestore实例名 |
tableName | Tablestore表名 |
tunnelId | Tablestore表的通道Id |
accessKeyId | Tablestore的accessKeyId |
accessKeySecret | Tablestore的秘钥 |
endPoint | Tablestore实例的endPoint |
maxOffsetsPerChannel | Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。 |
catalog | 同步的列名,详见下文Catalog字段说明 |
向SmallTarget数据表插入数据,spark streaming会实时更新,如下面两张图所示。
Catalog字段说明
Catalog为一个Json字符串,"columns"里面指定的是需要读取的列的自定义配置,这些列的键值对最终会被TableStore Source所输出,如下图所示,会读取PkString, PkInt和col1三列,其类型分别为string, long和string。
{
"columns": {
"PkString": {
"col": "PkString",
"type": "string"
},
"PkInt": {
"col": "PkInt",
"type": "long"
},
"col1": {
"col": "col1",
"type": "string"
}
}
}
预定义列说明
除去Catalog字段中的用户自定义列之外,Tablestore Source的输出默认还会加上一些预定义列的值,以供下游系统灵活的使用,预定义字段说明如下表所示:
预定义列名 | 说明 |
---|---|
__ots_record_type__ |
Tablestore行操作类型(PUT, UPDATE, DELETE) |
__ots_record_timestamp__ |
Tablestore行的时间戳,单位为ms |
__ots_column_type_<ColumnName> |
某列的操作类型,其中ColumnName为Catalog字段中定义的列 |
附录
package com.aliyun.emr.example.spark.sql.streaming
import java.util.UUID
import org.apache.spark.sql.SparkSession
object StructuredTableStoreWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println(
"Usage: StructuredTableStoreWordCount <ots-instanceName>" +
"<ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint>" +
"<max-offsets-per-channel> [<checkpoint-location>]"
)
}
val Array(
instanceName,
tableName,
tunnelId,
accessKeyId,
accessKeySecret,
endpoint,
maxOffsetsPerChannel,
_*
) = args
System.out.println(args.toSeq.toString)
val checkpointLocation =
if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession.builder.appName("TableStoreWordCount").master("local[5]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val lines = spark.readStream
.format("tablestore")
.option("instance.name", instanceName)
.option("table.name", tableName)
.option("tunnel.id", tunnelId)
.option("endpoint", endpoint)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("maxOffsetsPerChannel", maxOffsetsPerChannel) // default 10000
.option(
"catalog",
"{\"columns\":{\"PkString\":{\"col\":\"PkString\",\"type\":\"string\"},\"PkInt\":{\"col\":\"PkInt\",\"type\":\"long\"}," +
"\"col1\":{\"col\":\"col1\",\"type\":\"string\"}}}"
)
.load()
.selectExpr("PkString", "PkInt", "__ots_record_type__")
.as[(String, Long, String)]
val wordCounts = lines
.flatMap(line => {
System.out.println(s"wordCount line: ${line}")
line._1.split(" ")
})
.groupBy("value")
.count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
}
}
写在最后
本篇文章我们介绍了如何在E-MapReduce中实时流式的处理Tablestore中的数据,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。