关于 TDengine 3.0 数据订阅,你需要知道这些

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: TDengine 3.0 对数据订阅功能又进行了优化升级,本文将详细介绍其语法规则,方便开发者及企业使用。

小T导读:为了帮助应用实时获取写入时序数据库Time Series DatabaseTDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统就不需要再集成如 Kafka 一般的消息队列产品,从而简化系统设计的复杂度,降低运维成本。TDengine 3.0 对数据订阅功能又进行了优化升级,本文将详细介绍其语法规则,方便开发者及企业使用。


与 Kafka 一样,应用 TDengine 时你也需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而数据的过滤与预处理是交给 TDengine 来完成,有效地减少传输的数据量与应用的复杂度。


消费者订阅 topic 后(一个消费者可以订阅多个 topic),可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group),一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度;但不同消费者组中的消费者即使消费同一个 topic,也并不共享消费进度。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下也能确保 at least once 消费。


为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小:


  • WAL_RETENTION_PERIOD:为了数据订阅消费,需要 WAL 日志文件额外保留的最大时长策略。WAL 日志清理,不受订阅客户端消费状态影响。单位为 s,默认为 3600,表示在 WAL 保留最近 3600 秒的数据,用户可以根据数据订阅的需要修改这个参数为适当值。
  • WAL_RETENTION_SIZE:为了数据订阅消费,需要 WAL 日志文件额外保留的最大累计大小策略。单位为 KB,默认为 0,表示累计大小无上限。


通过以上方式,我们将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,因此不推荐保留太长时间,一般来说建议不超过几天)。对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。

为了方便大家上手实操,下文将对 TDengine 数据订阅相关语法进行详细解读。

写入数据

首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:

  1. DROP DATABASE IF EXISTS tmqdb;
  2. CREATE DATABASE tmqdb;
  3. CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
  4. CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0,"subtable0");
  5. CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1,"subtable1");      
  6. INSERT INTO tmqdb.ctb0 VALUES(now,0,0,'a0')(now+1s,0,0,'a00');
  7. INSERT INTO tmqdb.ctb1 VALUES(now,1,1,'a1')(now+1s,11,11,'a11');

创建 topic

TDengine 使用 SQL 创建如下所示 topic(topic 创建个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个):

  1. CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHEREc1>1;

TMQ 支持以下多种订阅类型:

列订阅

  1. CREATE TOPIC topic_name as subquery

通过 SELECT 语句订阅(包括 SELECT *,或 SELECT ts, c1 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。但需要注意的是:

  • 该类型 TOPIC 一旦创建则订阅数据的结构确定;
  • 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY);
  • 若发生表结构变更,新增的列不出现在结果中。

超级表订阅

  1. CREATE TOPIC topic_name AS STABLE stb_name

与 SELECT * from stbName 订阅的区别是:

  • 不会限制用户的表结构变更。
  • 返回的是非结构化的数据:返回数据的结构会随超级表的表结构变化而变化。
  • with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于 taosx 做超级表迁移。
  • where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是 tag 或 tbname,where 条件里可以用函数,用来过滤 tag,但是不能是聚合函数,因为子表 tag 值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅 0 个子表)。
  • 返回数据不包含标签。

数据库订阅

  1. CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;

通过该语句可创建一个包含数据库所有表数据的订阅,with meta 参数可选,同上。

创建消费者

订阅 topics

一个 consumer 支持同时订阅多个 topic。以 Java 为例:

  1. List<String> topics =newArrayList<>();
  2. topics.add("tmq_topic");
  3. consumer.subscribe(topics);

消费

在 Java 语言下如何对 TMQ 消息进行消费,代码示意如下:

  1. while(running){
  2.  ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
  3.    for(Meters meter : meters){
  4.      processMsg(meter);
  5.    }    
  6. }

结束消费

消费结束后,应当取消订阅。

  1. /* 取消订阅 */
  2. tmq_unsubscribe(tmq);
  3. /* 关闭消费者对象 */
  4. tmq_consumer_close(tmq);

删除 topic

如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 topic 才能被删除。

  1. /* 删除 topic */
  2. DROP TOPIC topic_name;

状态查看

1、topics:查询已经创建的 topic

  1. SHOW TOPICS;

2、consumers:查询 consumer 的状态及其订阅的 topic

  1. SHOW CONSUMERS;

3、subscriptions:查询 consumer 与 vgroup 之间的分配关系

  1. SHOW SUBSCRIPTIONS;

写在最后

受文章篇幅所限,本文只分享了部分语法的具体实现,需要了解相关设置及更多语言的代码示例,可以进入 TDengine 官网查询数据订阅的相关文档。对于更为复杂的应用问题,也欢迎大家加入 TDengine 的开发者交流群,直接向社区技术支持人员寻求帮助。

目录
相关文章
|
8月前
|
消息中间件 存储 数据库
RocketMQ 流数据库解析:如何实现一体化流处理?
RocketMQ 5.0 是一款云原生的消息中间件,旨在覆盖更多业务场景。它针对国内企业在数字化转型中面临的多场景消息处理需求,提供了一体化的解决方案。
112028 21
|
8月前
|
开发框架 监控 前端开发
实时数据更新与Apollo:探索GraphQL订阅
实时数据更新与Apollo:探索GraphQL订阅
|
2月前
|
消息中间件 数据采集 监控
高级应用:利用DataHub构建实时数据流处理系统
【10月更文挑战第23天】在大数据时代,实时数据处理的需求日益增长。无论是金融交易、物联网设备监控,还是社交媒体分析,实时数据流处理系统都扮演着至关重要的角色。作为阿里云提供的实时数据同步服务,DataHub为开发者提供了一种高效、可靠的方式来构建实时数据流处理系统。本文将从个人的角度出发,探讨如何利用DataHub构建实时数据流处理系统,包括配置实时数据采集、与流处理引擎集成、实施数据流的实时分析和处理,以及确保系统的高可用性和扩展性。
125 5
|
4月前
|
存储 Go API
使用GoFrame连接和操作TDengine时序数据库
通过使用GoFrame框架和TDengine Go驱动,我们可以方便地连接和操作TDengine时序数据库。无论是插入、查询还是分析时序数据,都可以通过简单的API调用来实现。GoFrame提供了强大的Web开发功能,结合TDengine的高性能时序数据存储和查询能力,可以构建高效、可扩展的时序数据应用。
|
2月前
|
消息中间件 SQL API
TDengine 数据订阅 vs. InfluxDB 数据订阅:谁更胜一筹?
在时序数据的应用场景中,数据的实时消费和处理能力成为衡量数据库性能和可用性的重要指标。TDengine 和 InfluxDB 作为时序数据库(Time Series Database)中的佼佼者,在数据订阅方面各有特点。但从架构设计、灵活性和系统负载上看,TDengine 提供了更加全面且高效的解决方案。
69 2
|
8月前
|
消息中间件 存储 物联网
|
消息中间件 运维 物联网
一文告诉你为什么时序场景下 TDengine 数据订阅比 Kafka 好
在本文中,TDengine 研发人员详细揭秘了 TDengine 数据订阅的流程和具体实现。
299 0
EMQ
|
SQL 存储 人工智能
EMQX+HStreamDB 实现物联网流数据高效持久化
本文将具体介绍如何通过EMQX规则引擎将数据持久化到HStreamDB流数据库,实现MQTT数据流的存储与实时处理。
EMQ
418 0
EMQX+HStreamDB 实现物联网流数据高效持久化
|
消息中间件 存储 运维
消息队列Kafka「检索组件」重磅上线!
本文对消息队列 Kafka「检索组件」进行详细介绍,首先通过对消息队列使用过程中的痛点问题进行介绍,然后针对痛点问题提出相应的解决办法,并对关键技术技术进行解读,旨在帮助大家对消息队列 Kafka「检索组件」的特点及使用方式更加熟悉,以期可以帮助大家更有效的解决在消息排查过程中遇到的痛点问题。
574 0
消息队列Kafka「检索组件」重磅上线!
EMQ
|
存储 消息中间件 运维
EMQX Cloud更新:数据集成新增 HStreamDB & Tablestore
EMQX Cloud版本更新,新增HStreamDB和阿里云Tablestore两个外部数据库扩展支持。
EMQ
148 0
EMQX Cloud更新:数据集成新增 HStreamDB & Tablestore