Apache IoTDB开发系统整合之Spark IoTDB Connecter

简介: 以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。

version

The versions required for Spark and Java are as follow:

Spark Version Scala Version Java Version TsFile
2.4.3 2.11 1.8 0.10.0

install

mvn clean scala:compile compile install.

1. maven dependency

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <artifactId>spark-iotdb-connector</artifactId>
  4. <version>0.10.0</version>
  5. </dependency>

2. spark-shell user guide

  1. spark-shell --jars spark-iotdb-connector-0.10.0.jar,iotdb-jdbc-0.10.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load
  4. df.printSchema()
  5. df.show()

如果要对RDD进行分区,可以执行以下操作

  1. spark-shell --jars spark-iotdb-connector-0.10.0.jar,iotdb-jdbc-0.10.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
  4. option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
  5. option("numPartition", [the partition number you want]).load
  6. df.printSchema()
  7. df.show()

3. 模式推理

以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。这三项测量的基本信息如下:

Name Type Encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

The existing data in the TsFile is as follows:

device:root.ln.wf01.wt01 device:root.ln.wf02.wt02
status temperature hardware status
time value time value time value time value
1 True 1 2.2 2 “aaa” 1 True
3 True 2 2.2 4 “bbb” 2 False
5 False 3 2.1 6 “ccc” 4 True

The wide(default) table form is as follows:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null null null

You can also use narrow table form which as follows: (You can see part 4 about how to use narrow form)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

4. 宽表和窄表之间的转换

从宽到窄

  1. import org.apache.iotdb.spark.db._
  2. val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root where time < 1100 and time > 1000").load
  3. val narrow_df = Transformer.toNarrowForm(spark, wide_df)

从窄到宽

  1. import org.apache.iotdb.spark.db._
  2. val wide_df = Transformer.toWideForm(spark, narrow_df)

5. Java 用户指南

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. import org.apache.spark.sql.SparkSession;
  4. import org.apache.iotdb.spark.db.*
  5. public class Example {
  6. public static void main(String[] args) {
  7. SparkSession spark = SparkSession
  8. .builder()
  9. .appName("Build a DataFrame from Scratch")
  10. .master("local[*]")
  11. .getOrCreate();
  12. Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
  13. .option("url","jdbc:iotdb://127.0.0.1:6667/")
  14. .option("sql","select * from root").load();
  15. df.printSchema();
  16. df.show();
  17. Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df)
  18. narrowTable.show()
  19. }
  20. }
相关文章
|
6天前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
48 0
|
6天前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
6天前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
168 0
|
6天前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
6天前
|
网络安全 API Apache
如何在win系统部署Apache服务并实现无公网ip远程访问
如何在win系统部署Apache服务并实现无公网ip远程访问
|
6天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
6天前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
57 2
|
6天前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
6天前
|
SQL 分布式计算 数据处理
Apache Spark简介与历史发展
Apache Spark简介与历史发展
|
2天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
38 0

推荐镜像

更多