从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。

小T导读:本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。

在当前的物联网行业,TDengine 已成为国内占有率最高的开源时序数据库,并在开源物联网平台中扮演着至关重要的角色。接下来,我们将分几个模块详细探讨联犀物联网平台如何与 TDengine 深度结合,并进一步提升 TDengine 的扩展性,推动其在物联网场景中的应用潜力。

物模型如何结合 TDengine?

物模型介绍
现实世界由众多真实存在的物理设备组成,我们可以将这些设备称之为“物”。物联网的目标是通过网络将这些“物”连接在一起,并将其数字化为云端服务或资源,从而实现智能化应用。因此,在物联网构建的数字世界中,首先需要对“物”进行清晰、统一的定义,明确其功能和能提供的服务资源。
ICA 联盟从产品层面对“物”进行了功能建模,提出了统一的“物的抽象模型”和“物的描述语言”(TSL,Things Specification Language)。物的抽象模型描述了“设备是什么”以及“设备能做什么”,包括物的状态、档案信息和功能定义等方面。
1.PNG

在物模型中,最为关键的是属性(property)。以智能电灯为例,其状态具有二元性:开启或关闭。用户可以通过控制操作轻松在这两种状态之间切换。此外,一些智能电灯还具备更多高级功能,允许用户根据个人需求调整亮度、颜色和色温等参数。
智能设备的属性通常具备读写能力,这意味着应用程序不仅可以读取设备的当前状态,还可以修改属性来调整设备行为。例如,在环境监测设备中,应用程序可以读取温度和湿度数据,并根据需要调整参数,以适应不同的环境条件。

物模型如何结合 TDengine?

物模型超级表与产品的关系
在联犀物联网平台中,存在两种物模型:公共物模型和产品物模型。公共物模型是由多个产品共同定义的共享物模型,而产品物模型则为每个特定产品设备单独定义。定义物模型时,通常会有以下两种操作:

  • 产品物模型:每类特定产品设备使用一个超级表来定义。
  • 通用物模型:只创建一个超级表,后续的产品可以按需引入该物模型,产品下的设备则直接使用这个超级表。
    TDengine 支持灵活的数据模型设计,既可以使用多列模型,也可以选择单列模型。多列模型相当于将多个字段存储在同一张超级表中,通常在写入和存储效率上表现较优。而在某些情况下,例如数据采集点的种类和数量经常变化时,单列模型则可能更为适用,因为它简化了应用程序的设计和管理,允许独立管理和扩展每个物理量的超级表。
    综上所述,TDengine 提供了灵活的数据模型选项,用户可以根据具体需求和应用场景选择最适合的模型。无论是采用窄表设计、单列模型还是多列模型,最终目的是为了优化性能和简化管理复杂性。

物模型和表分为三种对应关系

  1. 普通类型: 根据物模型的数据类型,映射单列模式的超级表。物模型定义示例如下图所示。

2.PNG
示例 SQL 如下:

CREATE STABLE IF NOT EXISTS model_custom_property_00b_int (ts timestamp,param BIGINT) 
  TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));

相关参数说明:

3.PNG

param 物模型和 TDengine 表结构的定义对照如下:
4.png
下面是完整的建表语句:

CREATE STABLE IF NOT EXISTS model_custom_property_00b_int (ts timestamp,param BIGINT) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_float (ts timestamp,param DOUBLE) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_enum (ts timestamp,param SMALLINT) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_timestamp (ts timestamp,param TIMESTAMP) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_bool (ts timestamp,param BOOL) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_string (ts timestamp,param BINARY(5000)) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));

对应的每个设备创建的普通表如下:

CREATE TABLE IF NOT EXISTS device_property_00b_device1_int USING model_custom_property_00b_int  TAGS('00b','device1','int');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_enum USING model_custom_property_00b_enum  TAGS('00b','device1','enum');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_bool USING model_custom_property_00b_bool  TAGS('00b','device1','bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_string USING model_custom_property_00b_string  TAGS('00b','device1','string');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_float USING model_custom_property_00b_float  TAGS('00b','device1','float');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_timestamp USING model_custom_property_00b_timestamp  TAGS('00b','device1','timestamp');

2.结构体类型:拥有多个字段并将物模型进行整体抽象,映射为多列模式的超级表。物模型定义示例如下图所示。

5.png
示例 SQL 如下:


CREATE STABLE IF NOT EXISTS model_custom_property_00b_struct (ts timestamp, latitude DOUBLE,longitude DOUBLE) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));

相关参数说明:
39.jpg
对应的每个设备创建的普通表如下:


CREATE TABLE IF NOT EXISTS device_property_00b_device1_struct USING model_custom_property_00b_struct  TAGS('00b','device1','struct');

3.数组类型: 数组类型在物联网平台中较为特殊,传统平台中的数组无法单独操作某一位。例如,如果想单独修改开关10的状态,必须传递完整的数组(如:[0,1,1,0,1,0,1,1,0,1])来进行控制,这种方式并不符合现实世界的需求。联犀则对数组进行了扩展,支持下角标访问。比如,要修改开关 10 的状态,只需传递 "switch.10": 1 即可。接下来,让我们来看一下联犀是如何处理这种数据结构的。物模型定义示例如下图所示。
6.png
示例 SQL 如下:

 CREATE STABLE IF NOT EXISTS model_custom_property_00b_switchg (ts timestamp,param BOOL) TAGS (product_id BINARY(50),device_name BINARY(50),_num BIGINT,property_type BINARY(50));

相关参数说明:
40.jpg
可以看到,数组类型的定义与简单类型相似,区别在于它额外添加了一个 _num 字段,用来标识数组的下标。这种设计的好处在于,即使数组长度达到 1000,依然只需定义一个超级表。虽然每个设备的普通表需要定义 1000 个,但由于是由一个超级表进行管理,整体管理变得更加简便。接下来,我们来看看每个设备创建的普通表是什么样的。


CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_0 USING model_custom_property_00b_switchg  TAGS('00b','device1',0,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_1 USING model_custom_property_00b_switchg  TAGS('00b','device1',1,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_2 USING model_custom_property_00b_switchg  TAGS('00b','device1',2,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_3 USING model_custom_property_00b_switchg  TAGS('00b','device1',3,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_4 USING model_custom_property_00b_switchg  TAGS('00b','device1',4,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_5 USING model_custom_property_00b_switchg  TAGS('00b','device1',5,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_6 USING model_custom_property_00b_switchg  TAGS('00b','device1',6,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_7 USING model_custom_property_00b_switchg  TAGS('00b','device1',7,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_8 USING model_custom_property_00b_switchg  TAGS('00b','device1',8,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_9 USING model_custom_property_00b_switchg  TAGS('00b','device1',9,'bool');

如何发挥 TDengine 的性能?

在设备上报信息时,如果在线设备达到 120 万,每个设备每 10 分钟上报一次数据,那么每秒钟大约会有 2000 条消息需要处理。每条消息不仅需要入库一条调试日志,还需要记录一条属性历史数据。通过大量性能测试和优化,我们发现,突破 2000 QPS 是非常具有挑战性的。深入分析了 TDengine 的体系架构后,我们实施了以下优化措施:

  1. 从 HTTP 切换到 WebSocket:TDengine 从 3.x 版本开始支持 WebSocket,我们顺势升级,经过测试,性能和稳定性都有所提升,但在高并发的情况下,系统资源消耗依然较大。
  2. 改进同步操作为异步操作:通过将同步操作转为异步处理,显著提高了整体性能。
  3. SQL 批量插入优化:TDengine 支持单条 SQL 语句插入多条数据,甚至跨表插入,这大大提升了写入性能。
    官方语法如下:

、INSERT INTO
    tb_name
        [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
        [(field1_name, ...)]
        VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
    [tb2_name
        [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
        [(field1_name, ...)]
        VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
    ...];

INSERT INTO tb_name [(field1_name, ...)] subquery

官方示例如下:


INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
            d21002 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:34.255', 10.15, 217, 0.33)
            d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);

联犀利用这一语法实现了异步操作,操作流程如下:

  1. 设备插入数据时,首先生成类似 d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31) 格式的插入语句。
  2. 将生成的 SQL 语句放入 Golang 的 Channel 中。
  3. 多个异步入库协程监听 Channel,并从中取出 SQL 语句。当执行间隔超过 1 秒或 SQL 数量达到上限时,协程会将这些数据合并成完整的 SQL 批量插入数据库。

由于设备上报数据的整个流程不涉及磁盘操作,整个过程速度非常快。即使在未完全优化的情况下,经过测试,单机配置为 8 核 16GB 的服务器,能够稳定支持每秒 7000 并发请求,且保持毫秒级的低延迟,并无任何错误。
7.png
关键代码如下:


type Td struct {
    *sql.DB
}

type ExecArgs struct {
    Query string
    Args  []any
}

var (
    td         = Td{}
    once       = sync.Once{}
    insertChan = make(chan ExecArgs, 1000)
)

const (
    asyncExecMax = 200 //异步执行sql最大数量
    asyncRunMax  = 40
)

func NewTDengine(DataSource conf.TSDB) (TD *Td, err error) {
    once.Do(func() {
        if DataSource.Driver == "" {
            DataSource.Driver = "taosWS"
        }
        td.DB, err = sql.Open(DataSource.Driver, DataSource.DSN)
        if err != nil {
            return
        }
        td.DB.SetMaxIdleConns(50)
        td.DB.SetMaxOpenConns(50)
        td.DB.SetConnMaxIdleTime(time.Hour)
        td.DB.SetConnMaxLifetime(time.Hour)
        _, err = td.Exec("create database if not exists ithings;")
        if err != nil {
            return
        }
        for i := 0; i < asyncRunMax; i++ {
            utils.Go(context.Background(), func() {
                td.asyncInsertRuntime()
            })
        }
    })
    if err != nil {
        logx.Errorf("TDengine 初始化失败,err:%v", err)
    }
    return &td, err
}

func (t *Td) asyncInsertRuntime() {
    r := rand.Intn(1000)
    tick := time.Tick(time.Second/2 + time.Millisecond*time.Duration(r))
    execCache := make([]ExecArgs, 0, asyncExecMax*2)
    exec := func() {
        if len(execCache) == 0 {
            return
        }
        sql, args := t.genInsertSql(execCache...)
        var err error
        for i := 3; i > 0; i-- { //三次重试
            _, err = t.Exec(sql, args...)
            if err == nil {
                break
            }
        }
        if err != nil {
            logx.Error(err)
        }
        execCache = execCache[0:0] //清空切片
    }
    for {
        select {
        case _ = <-tick:
            exec()
        case e := <-insertChan:
            execCache = append(execCache, e)
            if len(execCache) > asyncExecMax {
                exec()
            }
        }
    }

}

func (t *Td) AsyncInsert(query string, args ...any) {
    insertChan <- ExecArgs{
        Query: query,
        Args:  args,
    }
}
func (t *Td) genInsertSql(eas ...ExecArgs) (query string, args []any) {
    qs := make([]string, 0, len(eas))
    as := make([]any, 0, len(eas))
    for _, e := range eas {
        qs = append(qs, e.Query)
        as = append(as, e.Args...)
    }
    return fmt.Sprintf("insert into %s;", strings.Join(qs, " ")), as
}

TDengine 查询

数据插入数据库后,我们开始着手进行数据查询和展示。得益于 TDengine 支持多种丰富的数据聚合方式,数据分析变得更加高效和便捷。
ORM 设计
灵活的查询方式离不开 ORM 框架的支持。然而,TDengine 的官方库并未提供 ORM 框架,开源社区也没有专门为 TDengine 开发的 ORM 框架。为了填补这一空白,联犀基于知名 ORM 框架 Squirrel(https://github.com/Masterminds/squirrel)进行了扩展,以支持 TDengine 的语法。以下是 ORM 的示例:


func (d *DeviceDataRepo) getPropertyArgFuncSelect(
    ctx context.Context,
    filter msgThing.FilterOpt) (sq.SelectBuilder, error) {
    schemaModel, err := d.getSchemaModel(ctx, filter.ProductID)
    if err != nil {
        return sq.SelectBuilder{}, err
    }
    p, ok := schemaModel.Property[filter.DataID]
    if !ok {
        return sq.SelectBuilder{}, errors.Parameter.AddMsgf("dataID:%s not find", filter.DataID)
    }
    var (
        sql sq.SelectBuilder
    )

    if p.Define.Type == schema.DataTypeStruct {
        sql = sq.Select("FIRST(ts) AS ts", d.GetSpecsColumnWithArgFunc(p.Define.Specs, filter.ArgFunc))
    } else {
        sql = sq.Select("FIRST(ts) AS ts", fmt.Sprintf("%s(param) as param", filter.ArgFunc))
    }
    if filter.Interval != 0 {
        sql = sql.Interval("?a", filter.Interval) //TDengine特有语法
    }
    if len(filter.Fill) > 0 {
        sql = sql.Fill(filter.Fill)//TDengine特有语法
    }
    return sql, nil
}

灵活的查询接口

借助上述的 ORM 底层实现,我们才得以实现灵活的查询接口,下面是查询接口示例。
请求参数:
8.png
回复参数:

9.png

TDengine 链路追踪

最后,我们还需要解决链路追踪的问题:我们需要记录每个 SQL 的执行耗时,并能够与业务链路 ID 打通,确保可以通过日志追溯到具体的 SQL 执行者和执行过程。虽然官方驱动较为底层,但它支持通过 context 进行传递。联犀的链路追踪同样基于 context 传递,因此我们只需在执行 SQL 时记录相关日志即可。无论是采用 HTTP、WebSocket 还是 CGo 连接方式,日志都统一记录在 driver-go/taosWS/connection.go 文件中。联犀打印的日志如下:
10.png

结语

通过以上一系列工作,我们成功地实现了 TDengine 在建模、数据写入、查询和运维等各个环节的无缝衔接,并与物联网业务完美融合。为了感谢 TDengine 对我们产品的支持与帮助,我们特撰此文,希望能为相关从业人士提供一些借鉴与启发。

附录

联犀开源地址: https://gitee.com/unitedrhino
TDengine ORM 定制: https://gitee.com/unitedrhino/squirrel
TDengine 官方驱动定制: https://gitee.com/unitedrhino/driver-go

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
3天前
|
存储 安全 数据管理
时序数据库TDengine 与中移软件达成兼容性互认证,推动虚拟化云平台与时序数据库的深度融合
在数字化转型和智能化升级的浪潮下,企业对数据的需求日益增长,尤其是在物联网、大数据和实时分析等领域。随着设备数量的激增,时序数据的管理和处理变得愈发复杂,企业亟需高效、稳定的数据解决方案来应对这一挑战。时序数据库作为专门处理时间序列数据的工具,正逐渐成为各行业数字化转型的重要支撑。
21 4
|
3天前
|
人工智能 物联网 大数据
解密时序数据库的未来:TDengine Open Day技术沙龙精彩回顾
在数字化时代,开源已成为推动技术创新和知识共享的核心力量,尤其在数据领域,开源技术的涌现不仅促进了行业的快速发展,也让更多的开发者和技术爱好者得以参与其中。随着物联网、工业互联网等技术的广泛应用,时序数据库的需求愈发强烈,开源的兴起更是为这一技术的创新与普及提供了强有力的支持。
16 3
|
3天前
|
运维 监控 Cloud Native
云原生之运维监控实践:使用 taosKeeper 与 TDinsight 实现对 时序数据库TDengine 服务的监测告警
在数字化转型的过程中,监控与告警功能的优化对保障系统的稳定运行至关重要。本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品之一,详细介绍了如何利用 TDengine、taosKeeper 和 TDinsight 实现对 TDengine 服务的状态监控与告警功能。作者通过容器化安装 TDengine 和 Grafana,演示了如何配置 Grafana 数据源、导入 TDinsight 仪表板、以及如何设置告警规则和通知策略。欢迎大家阅读。
18 0
|
3月前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
93 3
|
11天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
39 3
|
11天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
41 3
|
11天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
53 2
|
24天前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
174 15
|
18天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
25天前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。

热门文章

最新文章