基于Tablestore的一站式物联网存储解决方案-Spark 分析

简介: ## 前言上一章节[《基于Tablestore的一站式物联网存储解决方案-数据操作篇》](https://ata.alibaba-inc.com/articles/213053) 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景

前言

上一章节《基于Tablestore的一站式物联网存储解决方案-数据操作篇》 为大家介绍了如何读写表格存储Tablestore中的数据。可以看到,无论是主键读写还是索引查询,都属于在线实时查询的场景。这些场景都要求某个查询或某个任务的服务响应时间极低(秒级别甚至毫秒级别)。然而,在共享充电宝场景中,会出现一些离线分析的场景,可能在一次任务中会涉及对表格存储数据进行复杂的计算、分析。本文主要介绍如何通过Spark SQL分析表格存储Tablestore中的数据,以及分析结果的可视化展示。

流批计算场景

在共享充电宝场景中,会出现诸多离线分析类的需求。例如:

  • 批计算

    • 每天定时统计各省份的营收情况
    • 计算各个时间段内的充电宝租赁比例。
    • 计算各个厂商的机柜损坏比例。
  • 流计算

    • 实时生成机柜营收报表。并回写到Tablestore表中。

实现方案

这里以表格存储Tablestore作为存储库,通过Spark计算引擎访问表格存储。Spark可对表格存储中的数据进行复杂的计算、分析。对于批计算场景,Tablestore on Spark提供了自动选择索引、分区裁剪、动态指定Split、谓词下推等功能,可降低服务端数据出口量,提升Spark任务执行速度。对于流计算,基于表格存储的通道服务实现,在保证at-least-once语义的基础上,完成Spark流式消费和计算。通道服务Channel与RDD的分区一一绑定,通过扩展数据表分区进而扩展通道Channel数量,完成数据吞吐量的线性扩展。

Spark访问表格存储的方式有E-MapReduce SQL或者DataFrame编程方式。这里以E-MapReduce SQL方式为例,介绍实现步骤与场景解决方案。更多关于Spark访问表格存储的介绍请参考表格存储文档Spark/SparkSQL

实现步骤

创建Spark外部表

  1. 登录阿里云控制台,创建EMR-MapReduce Hadoop集群。创建流程请参考创建集群
  2. 下载emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包,并上传至EMR Header服务器。
  3. 执行命令启动Spark-sql命令行。其中替换为上传的jar包版本。

命令:spark-sql --jars emr-datasources_shaded_2.11-.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
  1. 创建Spark外表。

    1. 批计算。

      1. 创建语句请参考参考文档底部附录,这里分别建立cabinet、cabinet_time、order三张外表。
      2. 执行SQL语句。参考结果展示
    2. 流计算。

      1. 执行streaming-sql命令。

  2. 创建order_stream外表、创建order_sink结果表、创建order_stream_view视图。创建脚本见文档地步**附录**。
  2. 登录Tablestore控制台,创建增量类型通道,并记录TunnelID。通道创建步骤请参考[创建通道](https://help.aliyun.com/document_detail/102491.html?spm=a2c4g.11186623.6.667.3dbc32b51VVnNR)。
  2. 开启流任务,执行SQL语句。参考**结果展示**。

结果展示

  • 批计算
  1. 计算各个厂商的机柜损坏比例。
select cabinet_manufacturers as cm,sum(cabinet_damage_size)/sum(cabinet_powerbank_size) as damage_percent from cabinet group by cabinet_manufacturers;

  1. 统计各个省份的营收总额。
select cabinet_province, sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) from order where order_end_time > 0 group by cabinet_province; 

  1. 计算湖北省2021年1月1日中,各个时间点维度上的平均租赁比例。
select from_unixtime(cabinet_state_timestamp/1000, 'yyyy-MM-dd HH:mm:ss') as time, sum(cabinet_powerbank_size-cabinet_available_size)/sum(cabinet_powerbank_size)from cabinet_time where cabinet_state_timestamp >= 1609430400000 and cabinet_state_timestamp <= 1609516799000 and cabinet_province='湖北省' group by cabinet_state_timestamp order by time;  

  • 流计算

计算每个机柜的营收总额,并将结果回写到Tablestore的ots_sink表中。

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
select cabinet_ID,sum(ceiling((order_end_time-order_start_time)/3600000)*cabinet_pricePerHour) as totalPrice,count(cabinet_ID) as orderNum from order_stream_view group by cabinet_ID;

回写ots_sink表的结果示例

小结

本章节通过表格存储Tablestore与Spark结合使用,采用Spark SQL的方式,实现了共享充电宝案例的流批计算场景。除了上述的访问方式以外,也可以采用DataFrame编程的方式访问Tablestore中的数据。更多关于Tablestore On Spark的介绍,请参考官网文档Spark/Spark SQL。下一章章节将为大家介绍Data Lake Analytics在Tablestore中的使用。

附录

创建cabinet外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet;
CREATE TABLE cabinet (
cabinet_Md5ID STRING,cabinet_ID STRING,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建cabinet_time外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS cabinet_time;
CREATE TABLE cabinet_time (
cabinet_Md5ID STRING,cabinet_ID STRING,cabinet_state_timestamp LONG,
cabinet_available_size LONG, cabinet_damage_size LONG, cabinet_geo STRING, 
cabinet_isonline STRING, cabinet_location STRING, cabinet_manufacturers STRING,
cabinet_overhaul_time LONG, cabinet_powerPercent DOUBLE, cabinet_powerbank_size LONG,
cabinet_pricePerHour DOUBLE, cabinet_province STRING, cabinet_type STRING COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre元数据时序表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order外部表,同时连接多元索引(批计算)

DROP TABLE IF EXISTS order;
CREATE TABLE order (
order_Md5ID STRING,order_ID STRING,
cabinet_ID STRING, cabinet_geo STRING, cabinet_pricePerHour DOUBLE, 
cabinet_province STRING, cabinet_type STRING, order_end_time LONG,
order_isRevert BOOLEAN, order_lose_pay DOUBLE, order_phone STRING,
order_start_time LONG COMMENT "geo stored in string format"
)
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
search.index.name="这里填写Tablestore多元索引名",
max.split.count=64,
push.down.range.long = false,
push.down.range.string = false
);

创建order_stream外部表(流计算)

DROP TABLE IF EXISTS order_stream;
CREATE TABLE order_stream
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre订单表名",
catalog='{"columns": {"cabinet_ID": {"type":"string"}, "cabinet_pricePerHour": {"type":"double"},"order_start_time": {"type":"long"}, "order_end_time": {"type":"long"}}}'
);

创建order_sink结果表(流计算)

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="这里填Tablestore实例vpc地址",
access.key.id="这里填写access key",
access.key.secret="这里填写access secret",
instance.name="这里填写Tablestore实例名",
table.name="这里填写Tablesotre结果表名,示例中为‘order_sink’",
catalog='{"columns": {"cabinetID": {"type": "string"},"totalPrice": {"type": "double"}, "orderNum": {"type": "long"}}}'
);

创建order_stream_view视图(流计算)

CREATE SCAN order_stream_view ON order_stream USING STREAM
OPTIONS(
tunnel.id="87f0de2c-40ab-4f9d-80ab-630961ebea27", 
maxoffsetsperchannel="10000");
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
1月前
|
传感器 机器学习/深度学习 存储
物联网设备精细化管理系统解决方案
随着科技的进步,物联网技术作为新一代信息技术的核心部分,正在深刻改变各行业的生产和管理方式。其在资产管理、智慧城市、能源管理和智慧医疗等多个领域的广泛应用,不仅提高了运营效率,还促进了资源优化配置和精细化管理。本文详细介绍了物联网的基础概念及其在设备精细化管理系统中的具体应用方案,展示了如何通过智能感知层建设、数据处理分析平台以及精细化管理应用,实现设备的实时监控、预测性维护和能耗管理等功能,从而帮助企业提升竞争力,降低成本,并推动社会向更智能化、绿色化的方向发展。
74 2
物联网设备精细化管理系统解决方案
|
23天前
|
存储 监控 物联网
医疗物联网设备精细化管理系统解决方案
华汇数据智慧医院物联网管理系统解决方案是一种集物联网、云计算、大数据和人工智能等先进技术于一体的综合性解决方案,旨在提升医院的运营效率、医疗质量和患者满意度。
61 3
|
26天前
|
存储 边缘计算 物联网
阿里云物联网平台:推动万物互联的智能化解决方案
随着物联网技术的快速发展,阿里云物联网平台为企业提供了一体化的解决方案,包括设备接入、数据管理和智能应用等核心功能。平台支持海量设备接入、实时数据采集与存储、边缘计算,并具备大规模设备管理、高安全性和开放生态等优势。广泛应用于智能制造、智慧城市和智能家居等领域,助力企业实现数字化转型。
128 5
|
2月前
|
存储 安全 物联网
.NET 跨平台工业物联网网关解决方案
【9月更文挑战第28天】本文介绍了利用 .NET 构建跨平台工业物联网网关的解决方案。通过 .NET Core 和多种通信协议(如 MQTT 和 Modbus),实现工业设备的高效接入和数据采集。系统架构包括设备接入层、数据处理层、通信层、应用层和数据库层,确保数据的准确采集、实时处理和安全传输。此外,还详细阐述了设备身份认证、数据加密及安全审计等机制,确保系统的安全性。该方案适用于不同操作系统和工业环境,具备高度灵活性和扩展性。
|
21天前
|
存储 供应链 物联网
探索未来:区块链、物联网与虚拟现实技术的融合与创新
【10月更文挑战第15天】本文深入探讨了新兴技术如区块链、物联网(IoT)和虚拟现实(VR)的发展趋势及其在现代社会的应用。通过分析这些技术的独特属性和它们如何相互补充,我们揭示了一个由高度互联、智能化和沉浸式体验定义的未来图景。文章不仅讨论了这些技术当前的挑战,还展望了它们在未来可能带来的转变,旨在为读者提供对这些令人兴奋的技术趋势的全面理解。
|
21天前
|
安全 物联网 区块链
未来已来:探索区块链技术、物联网与虚拟现实的融合趋势
【10月更文挑战第15天】 在数字化浪潮中,区块链、物联网(IoT)和虚拟现实(VR)技术正引领着一场革命。本文将深入探讨这三种技术的发展趋势和相互融合的潜力,以及它们如何共同塑造我们的未来。我们将从基本概念入手,逐步揭示这些技术如何影响经济、社会和日常生活,同时提供具体应用场景以展示其变革力量。
|
8天前
|
供应链 物联网 区块链
未来已来:区块链技术、物联网与虚拟现实的融合与创新
【10月更文挑战第28天】在数字化浪潮的推动下,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正逐步渗透至我们的日常生活中。本文将探讨这些技术的发展趋势,以及它们如何相互融合,创造出前所未有的应用场景。我们将通过实际案例,展示这些技术如何改变工业、医疗、教育和娱乐等多个领域。最后,我们将展望这些技术未来的发展方向,以及它们可能带来的社会变革。
33 12
|
2天前
|
供应链 物联网 区块链
未来已来:探索区块链、物联网与虚拟现实技术的融合趋势与实践应用
【10月更文挑战第34天】随着科技的迅猛发展,新兴技术如区块链、物联网(IoT)和虚拟现实(VR)正逐步渗透到我们的生活中,不仅改变着我们的生活方式,还在重塑全球的经济结构。本文将深入探讨这些技术的发展现状、相互之间的融合趋势以及在实际应用中的创新场景。我们将通过具体案例分析,揭示这些技术如何共同作用,推动社会向更加智能、互联的方向发展。
13 3
|
3天前
|
传感器 监控 物联网
物联网与虚拟现实:未来技术趋势与应用
随着科技的不断进步,新兴技术如物联网(IoT)和虚拟现实(VR)正在逐步改变我们的生活、工作以及娱乐方式。本文旨在探讨这些前沿技术的发展趋势及其在多个行业的潜在应用场景,分析其对社会发展的深远影响,并对未来的发展方向进行展望。通过详细分析,本文揭示了物联网和虚拟现实如何共同推动社会进步,并带来创新和可能性。
|
10天前
|
传感器 存储 运维
智能物联网:LoRaWAN技术在低功耗广域网中的应用
【10月更文挑战第26天】本文详细介绍了LoRaWAN技术的基本原理、应用场景及实际应用示例。LoRaWAN是一种低功耗、长距离的网络层协议,适用于智能城市、农业、工业监控等领域。文章通过示例代码展示了如何使用LoRaWAN传输温湿度数据,并强调了其在物联网中的重要性和广阔前景。
36 6

热门文章

最新文章

相关产品

  • 物联网平台
  • 下一篇
    无影云桌面