基于Tablestore的一站式物联网存储解决方案-Spark 分析

简介: ## 前言上一章节[《基于Tablestore的一站式物联网存储解决方案-数据操作篇》](https://ata.alibaba-inc.com/articles/213053) 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景

前言

上一章节《基于Tablestore的一站式物联网存储解决方案-数据操作篇》 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景中,会出现一些离线分析的场景,可能在一次任务中会涉及对表格存储数据进行复杂的计算、分析。本文主要介绍如何通过Spark SQL分析表格存储Tablestore中的数据,以及分析结果的可视化展示。

流批计算场景

在共享充电宝场景中,会出现诸多离线分析类的需求。例如:

  • 批计算

    • 每天定时统计各省份的营收情况
    • 计算各个时间段内的充电宝租赁比例。
    • 计算各个厂商的机柜损坏比例。
  • 流计算

    • 实时生成机柜营收报表。并回写到Tablestore表中。

实现方案

这里以表格存储Tablestore作为存储库,通过Spark计算引擎访问表格存储。Spark可对表格存储中的数据进行复杂的计算、分析。对于批计算场景,Tablestore on Spark提供了自动选择索引、分区裁剪、动态指定Split、谓词下推等功能,可降低服务端数据出口量,提升Spark任务执行速度。对于流计算,基于表格存储的通道服务实现,在保证at-least-once语义的基础上,完成Spark流式消费和计算。通道服务Channel与RDD的分区一一绑定,通过扩展数据表分区进而扩展通道Channel数量,完成数据吞吐量的线性扩展。

Spark访问表格存储的方式有E-MapReduce SQL或者DataFrame编程方式。这里以E-MapReduce SQL方式为例,介绍实现步骤与场景解决方案。更多关于Spark访问表格存储的介绍请参考表格存储文档Spark/SparkSQL

实现步骤

创建Spark外部表

  1. 登录阿里云控制台,创建EMR-MapReduce Hadoop集群。创建流程请参考创建集群
  2. 下载emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包,并上传至EMR Header服务器。
  3. 执行命令启动Spark-sql命令行。其中替换为上传的jar包版本。

命令:spark-sql --jars emr-datasources_shaded_2.11-.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  1. 创建Spark外表。

    1. 批计算。

      1. 创建语句请参考参考文档底部附录,这里分别建立cabinet、cabinet_time、order三张外表。
      2. 执行SQL语句。参考结果展示
    2. 流计算。

      1. 执行streaming-sql命令。

  2. 创建order_stream外表、创建order_sink结果表、创建order_stream_view视图。创建脚本见文档地步**附录**  2. 登录Tablestore控制台,创建增量类型通道,并记录TunnelID。通道创建步骤请参考[创建通道](https://help.aliyun.com/document_detail/102491.html?spm=a2c4g.11186623.6.667.3dbc32b51VVnNR)。
  2. 开启流任务,执行SQL语句。参考**结果展示**
AI 代码解读

结果展示

  • 批计算
  1. 计算各个厂商的机柜损坏比例。
select cabinet_manufacturers as cm,sum(cabinet_damage_size)/sum(cabinet_powerbank_size) as damage_percent from cabinet group by cabinet_manufacturers;
AI 代码解读

  1. 统计各个省份的营收总额。
select cabinet_province, sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) from order where order_end_time > 0 group by cabinet_province; 
AI 代码解读

  1. 计算湖北省2021年1月1日中,各个时间点维度上的平均租赁比例。
select from_unixtime(cabinet_state_timestamp/1000, 'yyyy-MM-dd HH:mm:ss') as time, sum(cabinet_powerbank_size-cabinet_available_size)/sum(cabinet_powerbank_size)from cabinet_time where cabinet_state_timestamp >= 1609430400000 and cabinet_state_timestamp <= 1609516799000 and cabinet_province='湖北省' group by cabinet_state_timestamp order by time;  
AI 代码解读

  • 流计算

计算每个机柜的营收总额,并将结果回写到Tablestore的ots_sink表中。

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
select cabinet_ID,sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) as totalPrice,count(cabinet_ID) as orderNum from order_stream_view group by cabinet_ID;
AI 代码解读

回写ots_sink表的结果示例

小结

本章节通过表格存储Tablestore与Spark结合使用,采用Spark SQL的方式,实现了共享充电宝案例的流批计算场景。除了上述的访问方式以外,也可以采用DataFrame编程的方式访问Tablestore中的数据。更多关于Tablestore On Spark的介绍,请参考官网文档Spark/Spark SQL。下一章章节将为大家介绍Data Lake Analytics在Tablestore中的使用。

附录

创建cabinet外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet;
CREATE TABLE cabinet (
cabinet_Md5ID STRING,cabinet_ID STRING,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);
AI 代码解读

创建cabinet_time外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet_time;
CREATE TABLE cabinet_time (
cabinet_Md5ID STRING,cabinet_ID STRING,cabinet_state_timestamp LONG,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据时序表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);
AI 代码解读

创建order外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS order;
CREATE TABLE order (
order_Md5ID STRING,order_ID STRING,
cabinet_ID STRING, cabinet_geo STRING, cabinet_pricePerHour DOUBLE, 
cabinet_province STRING, cabinet_type STRING, order_end_time LONG,
order_isRevert BOOLEAN, order_lose_pay DOUBLE, order_phone STRING,
order_start_time LONG COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);
AI 代码解读

创建order_stream外部表(流计算)

DROP TABLE IF EXISTS order_stream;
CREATE TABLE order_stream
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
catalog='{"columns": {"cabinet_ID": {"type":"string"}, "cabinet_pricePerHour": {"type":"double"},"order_start_time": {"type":"long"}, "order_end_time": {"type":"long"}}}'
);
AI 代码解读

创建order_sink结果表(流计算)

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre结果表名,示例中为‘order_sink’",
catalog='{"columns": {"cabinetID": {"type": "string"},"totalPrice": {"type": "double"}, "orderNum": {"type": "long"}}}'
);
AI 代码解读

创建order_stream_view视图(流计算)

CREATE SCAN order_stream_view ON order_stream USING STREAM
OPTIONS(
tunnel.id="87f0de2c-40ab-4f9d-80ab-630961ebea27", 
maxoffsetsperchannel="10000");
AI 代码解读
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
打赏
0
0
0
0
3025
分享
相关文章
Tablestore深度解析:面向AI场景的结构化数据存储最佳实践
《Tablestore深度解析:面向AI场景的结构化数据存储最佳实践》由阿里云专家团队分享,涵盖Tablestore十年发展历程、AI时代多模态数据存储需求、VCU模式优化、向量检索发布及客户最佳实践等内容。Tablestore支持大规模在线数据存储,提供高性价比、高性能和高可用性,特别针对AI场景进行优化,满足结构化与非结构化数据的统一存储和高效检索需求。通过多元化索引和Serverless弹性VCU模式,助力企业实现低成本、灵活扩展的数据管理方案。
78 12
Tablestore 物联网存储全面升级 -- 分析存储公测
物联网存储功能介绍随着物联网技术的快速发展,物联网已广泛应用于制造业、能源、建筑、医疗、交通、物流仓储等多个领域,物联网的应用能够有效节约资源、提高效率、保障安全以及降低成本,帮助各行业实现可持续发展目标。在物联网场景中根据数据特点进行分类,数据主要包括设备元数据、设备消息数据和设备时序数据三种类型,不同类型数据的存储需求不同。物联网场景中不同类型数据的存储核心需求如下:设备元数据:主要数据为设备
306 0
Tablestore 物联网存储全面升级 -- 分析存储公测
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
349 1
基于物联网平台的车辆时序数据存储实践
物联网平台 + Tablestore 时序表解决车联网中时序数据存储场景、需求。
957 15
基于Tablestore 实现海量订单日志数据存储
从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展。
328 0
基于Tablestore 实现海量订单日志数据存储
Android物联网应用程序开发(智慧城市)—— 购物信息的存储界面开发
Android物联网应用程序开发(智慧城市)—— 购物信息的存储界面开发
448 0
Android物联网应用程序开发(智慧城市)—— 购物信息的存储界面开发
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
280 2
ClickHouse与大数据生态集成:Spark & Flink 实战
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
105 0
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
80 0

物联网

+关注

相关产品

  • 物联网平台
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等