Rust +时序数据库 TDengine:打造高性能时序数据处理利器

简介: TDengine 是一款专为物联网、车联网、工业互联网等时序数据场景优化设计的开源时序数据库,支持高并发写入、高效查询及流式计算,通过“一个数据采集点一张表”与“超级表”的概念显著提升性能。Rust 作为一门系统级编程语言,近年来在数据库、嵌入式系统、分布式服务等领域迅速崛起,以其内存安全、高性能著称,与 TDengine 的高效特性天然契合,适合构建高可靠、高性能的数据处理系统。

1. 引言:为什么选择 TDengine 与 Rust?

TDengine 是一款专为物联网、车联网、工业互联网等时序数据场景优化设计的开源时序数据库,支持高并发写入、高效查询及流式计算,通过“一个数据采集点一张表”与“超级表”的概念显著提升性能。
Rust 作为一门系统级编程语言,近年来在数据库、嵌入式系统、分布式服务等领域迅速崛起,以其内存安全、高性能著称,与 TDengine 的高效特性天然契合,适合构建高可靠、高性能的数据处理系统。

2. TDengine Rust 连接器的设计与架构

TDengine Rust 连接器的主要目标是提供一个高效、安全且易于使用的接口,让开发者能够通过 Rust 语言与 TDengine 数据库进行高效的交互。连接器的设计充分考虑了 Rust 语言的优势,如内存安全、并发处理和高性能,同时确保与 TDengine 数据库之间的通信可靠且高效。
主要模块间的架构如下图:
15.png
解释:

  1. taos-query 模块作为核心,负责定义公共接口和数据结构。
  2. taos-optin 模块和 taos-ws 模块分别实现了这些公共接口,负责原生连接和 WebSocket 连接的业务逻辑实现。
  3. taos 模块封装了 taos-optin 和 taos-ws 模块中的实现,同时将 taos-query 模块所定义的公共接口和数据结构予以暴露。
  4. 最终,用户只需依赖 taos 模块,即可轻松使用整个系统,而无需了解内部复杂的实现细节。

3. 快速入门:从安装到代码开发

1.在 Cargo.toml 中添加以下内容:

[dependencies]
anyhow = "1.0.96"
chrono = "0.4.39"
serde = "1.0.217"
taos = "0.12.3"
tokio = "1.43.0"
AI 代码解读

2.代码演示,数据写入与查询:

use chrono::{DateTime, Local};
use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Establish native connection
    let dsn = "taos://localhost:6030";
    let taos = TaosBuilder::from_dsn(dsn)?.build().await?;

    let db = "power";

    // Prepare the database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS {db}"),
        format!("CREATE DATABASE {db}"),
        format!("USE {db}"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // Create a super table
        "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
         TAGS (group_id INT, location varchar(20))",
        // Create a subtable
        "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
        // Write one record at a time
        "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
        // Write Null values
        "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
        // Automatically create table on insert
        "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
        // Write multiple records at once
        "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
    ]).await?;
    assert_eq!(inserted, 6);

    let mut result = taos.query("SELECT * FROM meters").await?;

    for field in result.fields() {
        println!("Get the field: {}", field.name());
    }
    println!();

    // Query option 1, using row stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("Get the value of {}: {}", name, value);
        }
        println!()
    }

    // Query option 2, deserialize using serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // Deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // Deserialize float to f32
        current: Option<f32>,
        // Deserialize int to i32
        voltage: Option<i32>,
        // Deserialize float to f32
        phase: Option<f32>,
        // Deserialize int to i32
        group_id: i32,
        // Deserialize varchar to String
        location: String,
    }

    let records: Vec<Record> = taos
        .query("SELECT * FROM meters")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    println!("Get records: {:?}", records);

    Ok(())
}
AI 代码解读

4. 高级功能与最佳实践

4.1 连接池优化:提升高并发性能
频繁创建和销毁数据库连接会带来显著的开销,尤其是在高并发场景(如物联网设备高频上报数据)中。通过连接池复用连接,可减少 TCP 握手、认证等重复操作,提升整体吞吐量。
代码示例:
```use taos::*;

[tokio::main]

async fn main() -> anyhow::Result<()> {
// Create a connection pool
let dsn = "taos://localhost:6030";
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
let taos = pool.get().await?;

let db = "power";

// Prepare the database
taos.exec_many([
    format!("DROP DATABASE IF EXISTS {db}"),
    format!("CREATE DATABASE {db}"),
    format!("USE {db}"),
])
.await?;

let inserted = taos.exec_many([
    // Create a super table
    "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
     TAGS (group_id INT, location varchar(20))",
    // Create a subtable
    "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
    // Write one record at a time
    "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
    // Write Null values
    "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
    // Automatically create table on insert
    "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
    // Write multiple records at once
    "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
]).await?;

assert_eq!(inserted, 6);

Ok(())
AI 代码解读

4.2 原生连接与 WebSocket 连接
对于原生连接和 WebSocket 连接这两种方式,除了建立连接所使用的 DSN 不同外,其余接口调用没有差异。
代码示例:

suse taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // let dsn = "taosws://localhost:6041"; // WebSocket connection
    let dsn = "taos://localhost:6030"; // Native connection
    let _taos = TaosBuilder::from_dsn(dsn)?.build().await?;
    Ok(())
}
AI 代码解读

4.3 同步接口和异步接口
TDengine Rust 连接器的接口分为同步接口和异步接口。通常情况下,同步接口基于异步接口实现,二者的方法签名,除了异步接口多了 async 关键字外,基本一致。
异步接口的代码示例可查阅第 3 章节,同步接口的代码示例如下:

use taos::sync::*;

fn main() -> anyhow::Result<()> {
    // Establish native connection
    let dsn = "taos://localhost:6030";
    let taos = TaosBuilder::from_dsn(dsn)?.build()?;

    let db = "power";

    // Prepare the database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS {db}"),
        format!("CREATE DATABASE {db}"),
        format!("USE {db}"),
    ])?;

    let inserted = taos.exec_many([
        // Create a super table
        "CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
         TAGS (group_id INT, location varchar(20))",
        // Create a subtable
        "CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
        // Write one record at a time
        "INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
        // Write Null values
        "INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
        // Automatically create table on insert
        "INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
        // Write multiple records at once
        "INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
    ])?;

    assert_eq!(inserted, 6);

    Ok(())
}
AI 代码解读

5. 结语:开源社区的星辰大海

TDengine Rust 连接器的成长史,是一个典型的技术民主化故事——从社区萌芽到官方支持,从边缘工具到核心生态。它证明了两个事实:

  1. 开源协作的效率:在数据库领域,社区贡献者的“需求反哺”比闭门造车更能击中痛点。
  2. Rust 的生态潜力:作为系统级编程语言的 Rust,正在时序数据处理这样的垂直领域开辟新战场。
    我们期待你的参与,欢迎提交 PR,一起推动 TDengine Rust 连接器的进步,携手开创更加美好的开源未来!

6. 附录

  1. https://docs.taosdata.com/
  2. https://github.com/taosdata/taos-connector-rust
目录
打赏
0
2
2
0
93
分享
相关文章
百万指标,秒级查询,零宕机——时序数据库 TDengine 在 AIOps 中的硬核实战
本篇文章详细讲述了七云团队在运维平台中如何利用 TDengine 解决海量时序数据存储与查询的实际业务需求。内容涵盖了从数据库选型、方案落地到业务挑战及解决办法的完整过程,特别是分享了升级 TDengine 3.x 时的实战经验,给到有需要的小伙伴参考阅读。
18 1
从 MySQL 到时序数据库 TDengine:Zendure 如何实现高效储能数据管理?
TDengine 助力广州疆海科技有限公司高效完成储能业务的数据分析任务,轻松应对海量功率、电能及输入输出数据的实时统计与分析,并以接近 1 : 20 的数据文件压缩率大幅降低存储成本。此外,taosX 强大的 transform 功能帮助用户完成原始数据的清洗和结构优化,而其零代码迁移能力更实现了历史数据从 TDengine OSS 与 MySQL 到 TDengine 企业版的平滑迁移,全面提升了企业的数据管理效率。本文将详细解读这一实践案例。
16 0
从 MongoDB 到 时序数据库 TDengine,沃太能源实现 18 倍写入性能提升
沃太能源是国内领先储能设备生产厂商,数十万储能终端遍布世界各地。此前使用 MongoDB 存储时序数据,但随着设备测点增加,MongoDB 在存储效率、写入性能、查询性能等方面暴露出短板。经过对比,沃太能源选择了专业时序数据库 TDengine,生产效能显著提升:整体上,数据压缩率超 10 倍、写入性能提升 18 倍,查询在特定场景上也实现了数倍的提升。同时减少了技术架构复杂度,实现了零代码数据接入。本文将对 TDengine 在沃太能源的应用情况进行详解。
17 0
消防行业如何借助时序数据库 TDengine 打造高效的数据监控与分析系统
本篇文章来自“2024,我想和 TDengine 谈谈”征文活动的优秀投稿,深入探讨了如何在消防行业中运用 TDengine 进行业务建模。文章重点介绍了如何通过 TDengine 的超级表、标签设计和高效查询功能,有效管理消防监控系统中的时序数据。作者详细阐述了实时监控、报警系统以及历史数据分析在消防行业中的应用,展示了 TDengine 在数据压缩、保留策略和分布式架构下的强大优势。
18 0
2600 万表流计算分析如何做到? 时序数据库 TDengine 助力数百家超市智能化转型
在生鲜超市的高效运营中,实时数据分析至关重要。万象云鼎的“云鲜生”通过智能秤+网关+软件系统的组合,实现了销售数据的精准管理与优化。而在数据处理方面,TDengine 的流计算能力成为了这一方案的核心支撑。本文详细分享了“云鲜生”如何利用 TDengine 高效存储和分析海量销售数据,在优化超市运营、提升用户体验的同时,解决高基数分组、高并发查询等技术挑战。
37 1
时序数据库 TDengine 化工新签约:存储降本一半,查询提速十倍
化工行业在数字化转型过程中面临数据接入复杂、实时性要求高、系统集成难度大等诸多挑战。福州力川数码科技有限公司科技依托深厚的行业积累,精准聚焦行业痛点,并携手 TDengine 提供高效解决方案。
42 0
docker拉取MySQL后数据库连接失败解决方案
通过以上方法,可以解决Docker中拉取MySQL镜像后数据库连接失败的常见问题。关键步骤包括确保容器正确启动、配置正确的环境变量、合理设置网络和权限,以及检查主机防火墙设置等。通过逐步排查,可以快速定位并解决连接问题,确保MySQL服务的正常使用。
110 82
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
Docker Compose V2 安装常用数据库MySQL+Mongo
以上内容涵盖了使用 Docker Compose 安装和管理 MySQL 和 MongoDB 的详细步骤,希望对您有所帮助。
198 42

热门文章

最新文章