基于Tablestore的一站式物联网存储解决方案-Spark 分析

简介: ## 前言上一章节[《基于Tablestore的一站式物联网存储解决方案-数据操作篇》](https://ata.alibaba-inc.com/articles/213053) 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景

前言

上一章节《基于Tablestore的一站式物联网存储解决方案-数据操作篇》 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景中,会出现一些离线分析的场景,可能在一次任务中会涉及对表格存储数据进行复杂的计算、分析。本文主要介绍如何通过Spark SQL分析表格存储Tablestore中的数据,以及分析结果的可视化展示。

流批计算场景

在共享充电宝场景中,会出现诸多离线分析类的需求。例如:

  • 批计算

    • 每天定时统计各省份的营收情况
    • 计算各个时间段内的充电宝租赁比例。
    • 计算各个厂商的机柜损坏比例。
  • 流计算

    • 实时生成机柜营收报表。并回写到Tablestore表中。

实现方案

这里以表格存储Tablestore作为存储库,通过Spark计算引擎访问表格存储。Spark可对表格存储中的数据进行复杂的计算、分析。对于批计算场景,Tablestore on Spark提供了自动选择索引、分区裁剪、动态指定Split、谓词下推等功能,可降低服务端数据出口量,提升Spark任务执行速度。对于流计算,基于表格存储的通道服务实现,在保证at-least-once语义的基础上,完成Spark流式消费和计算。通道服务Channel与RDD的分区一一绑定,通过扩展数据表分区进而扩展通道Channel数量,完成数据吞吐量的线性扩展。

Spark访问表格存储的方式有E-MapReduce SQL或者DataFrame编程方式。这里以E-MapReduce SQL方式为例,介绍实现步骤与场景解决方案。更多关于Spark访问表格存储的介绍请参考表格存储文档Spark/SparkSQL

实现步骤

创建Spark外部表

  1. 登录阿里云控制台,创建EMR-MapReduce Hadoop集群。创建流程请参考创建集群
  2. 下载emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包,并上传至EMR Header服务器。
  3. 执行命令启动Spark-sql命令行。其中替换为上传的jar包版本。

命令:spark-sql --jars emr-datasources_shaded_2.11-.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  1. 创建Spark外表。

    1. 批计算。

      1. 创建语句请参考参考文档底部附录,这里分别建立cabinet、cabinet_time、order三张外表。
      2. 执行SQL语句。参考结果展示
    2. 流计算。

      1. 执行streaming-sql命令。

  2. 创建order_stream外表、创建order_sink结果表、创建order_stream_view视图。创建脚本见文档地步**附录**。
  2. 登录Tablestore控制台,创建增量类型通道,并记录TunnelID。通道创建步骤请参考[创建通道](https://help.aliyun.com/document_detail/102491.html?spm=a2c4g.11186623.6.667.3dbc32b51VVnNR)。
  2. 开启流任务,执行SQL语句。参考**结果展示**。

结果展示

  • 批计算
  1. 计算各个厂商的机柜损坏比例。
select cabinet_manufacturers as cm,sum(cabinet_damage_size)/sum(cabinet_powerbank_size) as damage_percent from cabinet group by cabinet_manufacturers;

  1. 统计各个省份的营收总额。
select cabinet_province, sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) from order where order_end_time > 0 group by cabinet_province; 

  1. 计算湖北省2021年1月1日中,各个时间点维度上的平均租赁比例。
select from_unixtime(cabinet_state_timestamp/1000, 'yyyy-MM-dd HH:mm:ss') as time, sum(cabinet_powerbank_size-cabinet_available_size)/sum(cabinet_powerbank_size)from cabinet_time where cabinet_state_timestamp >= 1609430400000 and cabinet_state_timestamp <= 1609516799000 and cabinet_province='湖北省' group by cabinet_state_timestamp order by time;  

  • 流计算

计算每个机柜的营收总额,并将结果回写到Tablestore的ots_sink表中。

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
select cabinet_ID,sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) as totalPrice,count(cabinet_ID) as orderNum from order_stream_view group by cabinet_ID;

回写ots_sink表的结果示例

小结

本章节通过表格存储Tablestore与Spark结合使用,采用Spark SQL的方式,实现了共享充电宝案例的流批计算场景。除了上述的访问方式以外,也可以采用DataFrame编程的方式访问Tablestore中的数据。更多关于Tablestore On Spark的介绍,请参考官网文档Spark/Spark SQL。下一章章节将为大家介绍Data Lake Analytics在Tablestore中的使用。

附录

创建cabinet外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet;
CREATE TABLE cabinet (
cabinet_Md5ID STRING,cabinet_ID STRING,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建cabinet_time外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet_time;
CREATE TABLE cabinet_time (
cabinet_Md5ID STRING,cabinet_ID STRING,cabinet_state_timestamp LONG,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据时序表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS order;
CREATE TABLE order (
order_Md5ID STRING,order_ID STRING,
cabinet_ID STRING, cabinet_geo STRING, cabinet_pricePerHour DOUBLE, 
cabinet_province STRING, cabinet_type STRING, order_end_time LONG,
order_isRevert BOOLEAN, order_lose_pay DOUBLE, order_phone STRING,
order_start_time LONG COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order_stream外部表(流计算)

DROP TABLE IF EXISTS order_stream;
CREATE TABLE order_stream
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
catalog='{"columns": {"cabinet_ID": {"type":"string"}, "cabinet_pricePerHour": {"type":"double"},"order_start_time": {"type":"long"}, "order_end_time": {"type":"long"}}}'
);

创建order_sink结果表(流计算)

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre结果表名,示例中为‘order_sink’",
catalog='{"columns": {"cabinetID": {"type": "string"},"totalPrice": {"type": "double"}, "orderNum": {"type": "long"}}}'
);

创建order_stream_view视图(流计算)

CREATE SCAN order_stream_view ON order_stream USING STREAM
OPTIONS(
tunnel.id="87f0de2c-40ab-4f9d-80ab-630961ebea27", 
maxoffsetsperchannel="10000");
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
9天前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
|
2月前
|
存储 安全 物联网
物联网设备的安全挑战与解决方案
【5月更文挑战第31天】随着物联网技术的发展,大量设备联网带来严重安全挑战。设备计算能力有限,易受黑客攻击;多样性和复杂性增加管理难度;环境暴露及用户安全意识薄弱也是问题。解决方法包括:增强设备身份认证、定期更新软件、实施网络隔离和访问控制、加密数据以及建立安全监测机制。Python 示例展示了数据加密方法。提升用户安全意识同样关键。综合施策,强化安全研究,保障物联网设备安全,促进行业健康发展。重视物联网安全,打造安全可靠的数字环境。
33 0
|
2月前
|
物联网 PHP 区块链
区块链和物联网解决方案实用指南(二)(3)
区块链和物联网解决方案实用指南(二)
23 0
|
2月前
|
JSON 物联网 PHP
区块链和物联网解决方案实用指南(二)(2)
区块链和物联网解决方案实用指南(二)
18 0
|
2月前
|
物联网 API PHP
区块链和物联网解决方案实用指南(二)(1)
区块链和物联网解决方案实用指南(二)
21 0
|
9月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
80 3
|
SQL 存储 弹性计算
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1688 0
|
2月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
2月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
42 1
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
723 0

热门文章

最新文章

相关产品

  • 物联网平台