基于Tablestore的一站式物联网存储解决方案-Spark 分析

简介: ## 前言上一章节[《基于Tablestore的一站式物联网存储解决方案-数据操作篇》](https://ata.alibaba-inc.com/articles/213053) 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景

前言

上一章节《基于Tablestore的一站式物联网存储解决方案-数据操作篇》 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景中,会出现一些离线分析的场景,可能在一次任务中会涉及对表格存储数据进行复杂的计算、分析。本文主要介绍如何通过Spark SQL分析表格存储Tablestore中的数据,以及分析结果的可视化展示。

流批计算场景

在共享充电宝场景中,会出现诸多离线分析类的需求。例如:

  • 批计算

    • 每天定时统计各省份的营收情况
    • 计算各个时间段内的充电宝租赁比例。
    • 计算各个厂商的机柜损坏比例。
  • 流计算

    • 实时生成机柜营收报表。并回写到Tablestore表中。

实现方案

这里以表格存储Tablestore作为存储库,通过Spark计算引擎访问表格存储。Spark可对表格存储中的数据进行复杂的计算、分析。对于批计算场景,Tablestore on Spark提供了自动选择索引、分区裁剪、动态指定Split、谓词下推等功能,可降低服务端数据出口量,提升Spark任务执行速度。对于流计算,基于表格存储的通道服务实现,在保证at-least-once语义的基础上,完成Spark流式消费和计算。通道服务Channel与RDD的分区一一绑定,通过扩展数据表分区进而扩展通道Channel数量,完成数据吞吐量的线性扩展。

Spark访问表格存储的方式有E-MapReduce SQL或者DataFrame编程方式。这里以E-MapReduce SQL方式为例,介绍实现步骤与场景解决方案。更多关于Spark访问表格存储的介绍请参考表格存储文档Spark/SparkSQL

实现步骤

创建Spark外部表

  1. 登录阿里云控制台,创建EMR-MapReduce Hadoop集群。创建流程请参考创建集群
  2. 下载emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包,并上传至EMR Header服务器。
  3. 执行命令启动Spark-sql命令行。其中替换为上传的jar包版本。

命令:spark-sql --jars emr-datasources_shaded_2.11-.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  1. 创建Spark外表。

    1. 批计算。

      1. 创建语句请参考参考文档底部附录,这里分别建立cabinet、cabinet_time、order三张外表。
      2. 执行SQL语句。参考结果展示
    2. 流计算。

      1. 执行streaming-sql命令。

  2. 创建order_stream外表、创建order_sink结果表、创建order_stream_view视图。创建脚本见文档地步**附录**。
  2. 登录Tablestore控制台,创建增量类型通道,并记录TunnelID。通道创建步骤请参考[创建通道](https://help.aliyun.com/document_detail/102491.html?spm=a2c4g.11186623.6.667.3dbc32b51VVnNR)。
  2. 开启流任务,执行SQL语句。参考**结果展示**。

结果展示

  • 批计算
  1. 计算各个厂商的机柜损坏比例。
select cabinet_manufacturers as cm,sum(cabinet_damage_size)/sum(cabinet_powerbank_size) as damage_percent from cabinet group by cabinet_manufacturers;

  1. 统计各个省份的营收总额。
select cabinet_province, sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) from order where order_end_time > 0 group by cabinet_province; 

  1. 计算湖北省2021年1月1日中,各个时间点维度上的平均租赁比例。
select from_unixtime(cabinet_state_timestamp/1000, 'yyyy-MM-dd HH:mm:ss') as time, sum(cabinet_powerbank_size-cabinet_available_size)/sum(cabinet_powerbank_size)from cabinet_time where cabinet_state_timestamp >= 1609430400000 and cabinet_state_timestamp <= 1609516799000 and cabinet_province='湖北省' group by cabinet_state_timestamp order by time;  

  • 流计算

计算每个机柜的营收总额,并将结果回写到Tablestore的ots_sink表中。

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
select cabinet_ID,sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) as totalPrice,count(cabinet_ID) as orderNum from order_stream_view group by cabinet_ID;

回写ots_sink表的结果示例

小结

本章节通过表格存储Tablestore与Spark结合使用,采用Spark SQL的方式,实现了共享充电宝案例的流批计算场景。除了上述的访问方式以外,也可以采用DataFrame编程的方式访问Tablestore中的数据。更多关于Tablestore On Spark的介绍,请参考官网文档Spark/Spark SQL。下一章章节将为大家介绍Data Lake Analytics在Tablestore中的使用。

附录

创建cabinet外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet;
CREATE TABLE cabinet (
cabinet_Md5ID STRING,cabinet_ID STRING,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建cabinet_time外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet_time;
CREATE TABLE cabinet_time (
cabinet_Md5ID STRING,cabinet_ID STRING,cabinet_state_timestamp LONG,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据时序表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS order;
CREATE TABLE order (
order_Md5ID STRING,order_ID STRING,
cabinet_ID STRING, cabinet_geo STRING, cabinet_pricePerHour DOUBLE, 
cabinet_province STRING, cabinet_type STRING, order_end_time LONG,
order_isRevert BOOLEAN, order_lose_pay DOUBLE, order_phone STRING,
order_start_time LONG COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order_stream外部表(流计算)

DROP TABLE IF EXISTS order_stream;
CREATE TABLE order_stream
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
catalog='{"columns": {"cabinet_ID": {"type":"string"}, "cabinet_pricePerHour": {"type":"double"},"order_start_time": {"type":"long"}, "order_end_time": {"type":"long"}}}'
);

创建order_sink结果表(流计算)

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre结果表名,示例中为‘order_sink’",
catalog='{"columns": {"cabinetID": {"type": "string"},"totalPrice": {"type": "double"}, "orderNum": {"type": "long"}}}'
);

创建order_stream_view视图(流计算)

CREATE SCAN order_stream_view ON order_stream USING STREAM
OPTIONS(
tunnel.id="87f0de2c-40ab-4f9d-80ab-630961ebea27", 
maxoffsetsperchannel="10000");
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
11天前
|
监控 安全 物联网
Java基于物联网技术的智慧工地解决方案源代码
应用先进的大数据、物联网、云计算等数字化技术,融合施工运营管理规范和技术标准,建构支撑施工和运营的一体化平台是投资、施工和运营单位能力建设的关键。应用企业架构、设计思维和软件工程方法,深入分析施工和运营技术特性与管理体系,研究开发基于大数据技术的智慧工地信息一体化平台,智慧工地管理平台是依托物联网、互联网建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。
94 2
|
11天前
|
物联网 定位技术
【技术探讨】一种多节点5Km(1.2M bps速率)实时Sub-G无线通信的物联网通讯解决方案
针对在高速公路上货车行驶过程中收集5公里范围内的GPS定位数据,上报云服务器端,最终实时显示每一辆货车的运行轨迹,用户的项目需求如下:200辆货车(无线从站节点),要求很高的实时性,每秒发5包,每个GPS定位数据报文30个字节,这样200辆车同时上报每秒需要发送30K的字节(200 x 5 x 3 0 =30K字节),30K字节 x 8bit=240 k bps速率。
|
8月前
|
运维 监控 安全
物联网行业解决方案(五)
物联网行业解决方案(五)
395 2
|
8月前
|
传感器 人工智能 监控
物联网行业解决方案(四)
物联网行业解决方案(四)
330 1
|
3天前
|
存储 供应链 物联网
未来技术纵横:区块链、物联网与虚拟现实的融合革新
【5月更文挑战第23天】 随着科技的迅猛发展,新兴技术正不断突破传统的边界,推动着社会向更加智能和互联的方向演进。本文将聚焦于区块链技术、物联网(IoT)以及虚拟现实(VR)这三个技术领域,探讨它们各自的发展趋势以及在不同应用场景中的交叉融合和创新应用。通过分析这些技术的互动和集成,揭示它们如何共同塑造未来的数字世界,并讨论在实际应用中面临的挑战及其解决方案。
|
1天前
|
安全 物联网 区块链
未来技术浪潮:区块链、物联网和虚拟现实的革新之路
【5月更文挑战第25天】 随着第四次工业革命的到来,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正不断重塑我们的世界。区块链技术以其去中心化和不可篡改的特性为数据安全和交易透明性提供了新范式。物联网通过使设备互联互通,优化了自动化系统的运行效率和生活品质。而虚拟现实则在娱乐、教育和医疗等领域创造了沉浸式体验。本文将深入探讨这些技术的发展趋势,并分析它们在不同应用场景中的实际作用与潜力。
|
1天前
|
监控 安全 物联网
未来交织:区块链、物联网与虚拟现实的技术融合与应用探索
【5月更文挑战第25天】 随着科技的迅猛发展,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正逐渐渗透到我们生活的方方面面。这些技术的独立发展已经引起了广泛关注,然而它们之间的相互作用及融合潜力更是值得深入探讨。本文将聚焦于这些技术在未来可能的发展趋势,以及它们在不同应用场景中如何相互促进、共同进化。我们将分析区块链技术在确保数据安全、提供去中心化信任机制方面的关键作用;物联网在连接设备、优化资源管理方面的革命性影响;以及虚拟现实在提升用户体验、扩展现实边界方面的潜力。通过综合考量这些技术的特点和互补性,文章旨在为读者提供一个关于新兴技术趋势与应用的综合视角。
|
2天前
|
供应链 搜索推荐 物联网
新兴技术趋势与应用:区块链、物联网、虚拟现实的未来发展
随着科技的不断进步,新兴技术如区块链、物联网、虚拟现实等正在逐渐改变我们的生活。本文将探讨这些技术的发展趋势和应用场景,以期为读者提供一个全面了解这些新兴技术的机会。
|
2天前
|
供应链 安全 物联网
未来技术的融合潮流:区块链、物联网与虚拟现实的革新之旅
【5月更文挑战第23天】 随着科技的不断进步,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正在重塑我们的世界。本文将深入探讨这些技术的发展趋势,并分析它们在不同应用场景中的作用。区块链技术以其独特的去中心化特性和数据不可篡改性,为金融交易和供应链管理带来了革命性的变革。物联网通过智能设备的互联互通,优化了城市管理和家庭生活。而虚拟现实技术则在娱乐、教育和医疗等领域提供了沉浸式体验。这些技术的融合预示着一个全新的技术时代的到来,它们将共同构建一个更加智能、安全和互动的未来。
|
3天前
|
安全 物联网 区块链
未来技术的融合潮流:区块链、物联网与虚拟现实的革新之路
【5月更文挑战第23天】随着科技不断进步,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正在逐渐改变我们的生活和工作方式。这些技术不仅各自独立发展,更在相互融合中展现出前所未有的潜力和应用前景。本文将深入探讨这些技术的发展趋势,并分析它们在不同领域的结合应用,揭示它们如何共同塑造一个更加智能、互联和沉浸式的未来。

相关产品

  • 物联网平台