Apache IoTDB开发系统整合之TsFile-Spark-Connector

简介: TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。

TsFile-Spark-Connector用户指南

1. TsFile-Spark-Connector简介

TsFile-Spark-Connector 实现了 Spark 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Spark读取,写入和查询Tsfile。

使用此连接器,您可以

  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Spark 中
  • 将特定目录中的所有文件从本地文件系统或HDFS加载到Spark中
  • 将数据从 Spark 写入 TsFile

2. 系统要求

Spark Version Scala Version Java Version TsFile
2.4.3 2.11.8 1.8 0.10.0

注意:有关如何下载和使用 TsFile 的更多信息,请参阅以下链接:https://github.com/apache/incubator-iotdb/tree/master/tsfile.

3. 快速入门

本地模式

在本地模式下使用 TsFile-Spark-Connector 启动 Spark:

  1. ./<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar

注意:

  • is the real path of your spark-shell.
  • Multiple jar packages are separated by commas without any spaces.

分布式模式

在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即 Spark 集群通过 spark-shell 连接):

  1. . /<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar --master spark://ip:7077

4. Data Type Correspondence

TsFile data type SparkSQL data type
BOOLEAN BooleanType
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
TEXT StringType


5. 模式推理

显示 TsFile 的方式取决于架构。以以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。这三项测量的基本信息如下:

Name Type Encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

TsFile 中的现有数据如下:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null

您也可以使用窄表形式,如下所示:(您可以看到第 6 部分有关如何使用窄格式)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

6. Scala API

注意:请记住提前分配必要的读取和写入权限。

示例 1:从本地文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("test.tsfile", true)
  5. narrow_df.show

示例 2:从 Hadoop文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  5. narrow_df.show

示例 3:从特定目录读取

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop")
  3. df.show

注 1:目前不支持目录中所有 TsFiles 的全局时间排序。

注 2:同名测量应具有相同的架构。

示例 4:宽格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 5:窄格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 6:以宽格式写入

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output")
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
  7. newDf.show

示例 6:以窄格式书写

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output", true)
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
  7. newDf.show

附录 A:架构推理的旧设计

显示 TsFile 的方式与 TsFile 架构有关。以以下 TsFile 结构为例:TsFile 的架构中有三个度量:状态、温度和硬件。这三个测量的基本信息如下:

名字 类型 编码
地位 布尔 平原
温度 RLE
硬件 发短信 平原

测量基本信息

文件中的现有数据如下所示:

delta_object:root.ln.wf01.wt01 delta_object:root.ln.wf02.wt02 delta_object:root.sgcc.wf03.wt01
地位 温度 硬件 地位 地位 温度
时间 价值 时间 价值 时间 价值 时间 价值 时间 价值 时间 价值
1 1 2.2 2 “啊” 1 2 3 3.3
3 2 2.2 4 “咔嚓” 2 3 6 6.6
5 3 2.1 6 “抄送” 4 4 8 8.8
7 4 2.0 8 “爹” 5 6 9 9.9

一组时间序列数据

有两种方法可以显示它:

默认方式

将创建两列来存储设备的完整路径:time(LongType)和delta_object(StringType)。

  • time:时间戳,长类型
  • delta_object: Delta_object ID, 字符串类型

接下来,为每个度量创建一个列来存储特定数据。SparkSQL 表结构如下:

时间(长型) delta_object(字符串类型) 状态(布尔类型) 温度(浮子型) 硬件(字符串类型)
1 根.ln.wf01.wt01 2.2
1 根.ln.wf02.wt02
2 根.ln.wf01.wt01 2.2
2 根.ln.wf02.wt02 “啊”
2 根.sgcc.wf03.wt01
3 根.ln.wf01.wt01 2.1
3 根.sgcc.wf03.wt01 3.3
4 根.ln.wf01.wt01 2.0
4 根.ln.wf02.wt02 “咔嚓”
4 根.sgcc.wf03.wt01
5 根.ln.wf01.wt01
5 根.ln.wf02.wt02
5 根.sgcc.wf03.wt01
6 根.ln.wf02.wt02 “抄送”
6 根.sgcc.wf03.wt01 6.6
7 根.ln.wf01.wt01
8 根.ln.wf02.wt02 “爹”
8 根.sgcc.wf03.wt01 8.8
9 根.sgcc.wf03.wt01 9.9

展开delta_object列

将设备列按“.”展开为多个列,忽略根目录“root”。方便进行更丰富的聚合操作。如果用户想使用这种显示方式,需要在表创建语句中设置参数“delta_object_name”(参考本手册第5.5节中的示例1),如本例中参数“delta_object_name”设置为“root.device.turbine”。路径层数需要一对一。此时,将为设备路径的每个层(“根”层除外)创建一列。列名是参数中的名称,值是设备相应层的名称。接下来,将为每个测量创建一个列来存储特定数据。

那么 SparkSQL 表结构如下:

time(LongType) group(StringType) field(StringType) device(StringType) status(BooleanType) temperature(FloatType) hardware(StringType)
1 ln wf01 wt01 True 2.2 null
1 ln wf02 wt02 True null null
2 ln wf01 wt01 null 2.2 null
2 ln wf02 wt02 False null “aaa”
2 sgcc wf03 wt01 True null null
3 ln wf01 wt01 True 2.1 null
3 sgcc wf03 wt01 True 3.3 null
4 ln wf01 wt01 null 2.0 null
4 ln wf02 wt02 True null “bbb”
4 sgcc wf03 wt01 True null null
5 ln wf01 wt01 False null null
5 ln wf02 wt02 False null null
5 sgcc wf03 wt01 True null null
6 ln wf02 wt02 null null “ccc”
6 sgcc wf03 wt01 null 6.6 null
7 ln wf01 wt01 True null null
8 ln wf02 wt02 null null “ddd”
8 sgcc wf03 wt01 null 8.8 null
9 sgcc wf03 wt01 null 9.9

TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。请注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不保证结果的正确性。

写入过程是将数据帧写入为一个或多个 TsFiles。默认情况下,需要包含两列:时间和delta_object。其余列用作度量。

相关文章
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
149 7
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
337 2
|
5月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
142 1
|
2月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
95 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
3月前
|
数据处理 Apache 数据库
将 Python UDF 部署到 Apache IoTDB 的详细步骤与注意事项
【10月更文挑战第21天】将 Python UDF 部署到 Apache IoTDB 中需要一系列的步骤和注意事项。通过仔细的准备、正确的部署和测试,你可以成功地将自定义的 Python UDF 应用到 Apache IoTDB 中,为数据处理和分析提供更灵活和强大的支持。在实际操作过程中,要根据具体情况进行调整和优化,以确保实现最佳的效果。还可以结合具体的代码示例和实际部署经验,进一步深入了解和掌握这一过程。
40 2
|
3月前
|
存储 物联网 数据处理
如何使用 Apache IoTDB UDF
【10月更文挑战第21天】使用 Apache IoTDB 的 UDF 可以为用户提供更大的灵活性和扩展性,帮助用户更好地处理和分析物联网数据。通过合理编写和使用 UDF,用户可以充分发挥 IoTDB 的潜力,实现更复杂、更高效的数据处理和分析任务。
77 2
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
89 1
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
5月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
5月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
当今社会,物联网技术的发展带来了许多繁琐的挑战,尤其是在数据库管理系统领域,比如实时整合海量数据、处理流中的事件以及处理数据的安全性。例如,应用于智能城市的基于物联网的交通传感器可以实时生成大量的交通数据。据估计,未来5年,物联网设备的数量将达数万亿。物联网产生大量的数据,包括流数据、时间序列数据、RFID数据、传感数据等。要有效地管理这些数据,就需要使用数据库。数据库在充分处理物联网数据方面扮演着非常重要的角色。因此,适当的数据库与适当的平台同等重要。由于物联网在世界上不同的环境中运行,选择合适的数据库变得非常重要。 原创文字,IoTDB 社区可进行使用与传播 一、什么是IoTDB 我
216 9
Apache IoTDB进行IoT相关开发实践

推荐镜像

更多