基于Tablestore的一站式物联网存储解决方案-Spark 分析

简介: ## 前言上一章节[《基于Tablestore的一站式物联网存储解决方案-数据操作篇》](https://ata.alibaba-inc.com/articles/213053) 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景

前言

上一章节《基于Tablestore的一站式物联网存储解决方案-数据操作篇》 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景中,会出现一些离线分析的场景,可能在一次任务中会涉及对表格存储数据进行复杂的计算、分析。本文主要介绍如何通过Spark SQL分析表格存储Tablestore中的数据,以及分析结果的可视化展示。

流批计算场景

在共享充电宝场景中,会出现诸多离线分析类的需求。例如:

  • 批计算

    • 每天定时统计各省份的营收情况
    • 计算各个时间段内的充电宝租赁比例。
    • 计算各个厂商的机柜损坏比例。
  • 流计算

    • 实时生成机柜营收报表。并回写到Tablestore表中。

实现方案

这里以表格存储Tablestore作为存储库,通过Spark计算引擎访问表格存储。Spark可对表格存储中的数据进行复杂的计算、分析。对于批计算场景,Tablestore on Spark提供了自动选择索引、分区裁剪、动态指定Split、谓词下推等功能,可降低服务端数据出口量,提升Spark任务执行速度。对于流计算,基于表格存储的通道服务实现,在保证at-least-once语义的基础上,完成Spark流式消费和计算。通道服务Channel与RDD的分区一一绑定,通过扩展数据表分区进而扩展通道Channel数量,完成数据吞吐量的线性扩展。

Spark访问表格存储的方式有E-MapReduce SQL或者DataFrame编程方式。这里以E-MapReduce SQL方式为例,介绍实现步骤与场景解决方案。更多关于Spark访问表格存储的介绍请参考表格存储文档Spark/SparkSQL

实现步骤

创建Spark外部表

  1. 登录阿里云控制台,创建EMR-MapReduce Hadoop集群。创建流程请参考创建集群
  2. 下载emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包,并上传至EMR Header服务器。
  3. 执行命令启动Spark-sql命令行。其中替换为上传的jar包版本。

命令:spark-sql --jars emr-datasources_shaded_2.11-.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  1. 创建Spark外表。

    1. 批计算。

      1. 创建语句请参考参考文档底部附录,这里分别建立cabinet、cabinet_time、order三张外表。
      2. 执行SQL语句。参考结果展示
    2. 流计算。

      1. 执行streaming-sql命令。

  2. 创建order_stream外表、创建order_sink结果表、创建order_stream_view视图。创建脚本见文档地步**附录**。
  2. 登录Tablestore控制台,创建增量类型通道,并记录TunnelID。通道创建步骤请参考[创建通道](https://help.aliyun.com/document_detail/102491.html?spm=a2c4g.11186623.6.667.3dbc32b51VVnNR)。
  2. 开启流任务,执行SQL语句。参考**结果展示**。

结果展示

  • 批计算
  1. 计算各个厂商的机柜损坏比例。
select cabinet_manufacturers as cm,sum(cabinet_damage_size)/sum(cabinet_powerbank_size) as damage_percent from cabinet group by cabinet_manufacturers;

  1. 统计各个省份的营收总额。
select cabinet_province, sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) from order where order_end_time > 0 group by cabinet_province; 

  1. 计算湖北省2021年1月1日中,各个时间点维度上的平均租赁比例。
select from_unixtime(cabinet_state_timestamp/1000, 'yyyy-MM-dd HH:mm:ss') as time, sum(cabinet_powerbank_size-cabinet_available_size)/sum(cabinet_powerbank_size)from cabinet_time where cabinet_state_timestamp >= 1609430400000 and cabinet_state_timestamp <= 1609516799000 and cabinet_province='湖北省' group by cabinet_state_timestamp order by time;  

  • 流计算

计算每个机柜的营收总额,并将结果回写到Tablestore的ots_sink表中。

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
select cabinet_ID,sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) as totalPrice,count(cabinet_ID) as orderNum from order_stream_view group by cabinet_ID;

回写ots_sink表的结果示例

小结

本章节通过表格存储Tablestore与Spark结合使用,采用Spark SQL的方式,实现了共享充电宝案例的流批计算场景。除了上述的访问方式以外,也可以采用DataFrame编程的方式访问Tablestore中的数据。更多关于Tablestore On Spark的介绍,请参考官网文档Spark/Spark SQL。下一章章节将为大家介绍Data Lake Analytics在Tablestore中的使用。

附录

创建cabinet外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet;
CREATE TABLE cabinet (
cabinet_Md5ID STRING,cabinet_ID STRING,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建cabinet_time外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet_time;
CREATE TABLE cabinet_time (
cabinet_Md5ID STRING,cabinet_ID STRING,cabinet_state_timestamp LONG,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据时序表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS order;
CREATE TABLE order (
order_Md5ID STRING,order_ID STRING,
cabinet_ID STRING, cabinet_geo STRING, cabinet_pricePerHour DOUBLE, 
cabinet_province STRING, cabinet_type STRING, order_end_time LONG,
order_isRevert BOOLEAN, order_lose_pay DOUBLE, order_phone STRING,
order_start_time LONG COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order_stream外部表(流计算)

DROP TABLE IF EXISTS order_stream;
CREATE TABLE order_stream
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
catalog='{"columns": {"cabinet_ID": {"type":"string"}, "cabinet_pricePerHour": {"type":"double"},"order_start_time": {"type":"long"}, "order_end_time": {"type":"long"}}}'
);

创建order_sink结果表(流计算)

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre结果表名,示例中为‘order_sink’",
catalog='{"columns": {"cabinetID": {"type": "string"},"totalPrice": {"type": "double"}, "orderNum": {"type": "long"}}}'
);

创建order_stream_view视图(流计算)

CREATE SCAN order_stream_view ON order_stream USING STREAM
OPTIONS(
tunnel.id="87f0de2c-40ab-4f9d-80ab-630961ebea27", 
maxoffsetsperchannel="10000");
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
1月前
|
人工智能 边缘计算 监控
【开源视频联动物联网平台】视频AI智能分析部署方式
【开源视频联动物联网平台】视频AI智能分析部署方式
105 3
|
2月前
|
监控 安全 物联网
Java基于物联网技术的智慧工地解决方案源代码
应用先进的大数据、物联网、云计算等数字化技术,融合施工运营管理规范和技术标准,建构支撑施工和运营的一体化平台是投资、施工和运营单位能力建设的关键。应用企业架构、设计思维和软件工程方法,深入分析施工和运营技术特性与管理体系,研究开发基于大数据技术的智慧工地信息一体化平台,智慧工地管理平台是依托物联网、互联网建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。
80 2
|
3月前
|
物联网 定位技术
【技术探讨】一种多节点5Km(1.2M bps速率)实时Sub-G无线通信的物联网通讯解决方案
针对在高速公路上货车行驶过程中收集5公里范围内的GPS定位数据,上报云服务器端,最终实时显示每一辆货车的运行轨迹,用户的项目需求如下:200辆货车(无线从站节点),要求很高的实时性,每秒发5包,每个GPS定位数据报文30个字节,这样200辆车同时上报每秒需要发送30K的字节(200 x 5 x 3 0 =30K字节),30K字节 x 8bit=240 k bps速率。
|
3月前
|
供应链 NoSQL 物联网
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
物联网已成为面向未来的解决方案的关键组成部分,且其所蕴含的巨大经济价值潜力有待挖掘
1440 0
链接全球数十亿台设备!物联网行业如何应对数据管理、实时分析和供应链优化的挑战?
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:安装主题分析实现【三十】
助力工业物联网,工业大数据之服务域:安装主题分析实现【三十】
22 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:派单主题分析实现【二十九】
助力工业物联网,工业大数据之服务域:派单主题分析实现【二十九】
26 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:回访主题分析【二十八】
助力工业物联网,工业大数据之服务域:回访主题分析【二十八】
19 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:安装主题分析实现【二十七】
助力工业物联网,工业大数据之服务域:安装主题分析实现【二十七】
25 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
28 0
|
4月前
|
物联网 大数据 BI
助力工业物联网,工业大数据之费用事实指标分析及实现【二十四】
助力工业物联网,工业大数据之费用事实指标分析及实现【二十四】
24 0

相关产品

  • 物联网平台