想要流畅体验 TDengine 3.0 数据订阅功能?要点都在这里

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。

众所周知,在 TDengine 3.0 中,我们对数据订阅功能进行了全面升级,以便大家可以更加便捷地实时订阅和获取数据的更新,完成实时监控、数据分析和有效报警等工作。在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。


本文将从 Java Developer 的视角来介绍如何使用 TDengine 3.0 的数据订阅功能。


TDengine 3.0 的版本迭代很快,可能有些配置参数或细节在之后的版本会发生变化,本文对应 TDengine 版本为 3.0.3.0


写在前面

在官方文档里已经有介绍,TDengine 的数据订阅是什么以及如何使用,有需要的朋友可以通过下方链接进入官网查看相关介绍:


  1. 开发指南:https://docs.tdengine.com/taos-sql/
  2. SQL:https://docs.tdengine.com/taos-sql/tmq/
  3. Java 使用数据订阅:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85


总结一下,我理解的数据订阅功能是以“订阅”的方式获取存在于 TDengine 中的数据。一般情况下,“订阅”意味着的业务需求是订阅数据库中的最新数据。“订阅”的流程很简单:(1)在数据库中创建 topic;(2)在应用中消费 topic 的数据。


基本操作:创建

在数据库中创建 topic,使用 SQL 语句 create topic 即可。create topic 这个 SQL 如何写,实际上定义了 topic 对应的数据粒度,包括哪些数据库、超级表、子表、列、行。值得一提的是,TDengine 的 SQL 支持订阅 database、 supertable、subquery 这 3 种模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 这种 SQL 可以直接订阅整个 database;CREATE TOPIC topic_name AS STABLE stb_name 这种 SQL 可以订阅某个超级表;订阅子查询是最普遍的场景。例如:


CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');


上面这个 SQL,订阅了 testdb 数据库中的 meters 超级表,通过 where 子句过滤满足以下条件:location(tag 列)为“北京”或“天津”的子表,且 voltage 超过 220.0 的 ts、voltage、location 的数据。


黄金搭档:流式计算 + 数据订阅

以智能电表的场景为例,如果我想每 10 分钟计算一次电压的平均值,并在平均电压高于 220V 就进行上报。对于这种需求,单纯用 TDengine 的数据订阅功能是不行的,因为 create topic 的子查询不支持聚合查询。这个时候,就需要用 TDengine 的流式计算 + 数据订阅这对黄金搭档了。如下:


CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1 
INTO stb_name 
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters 
WHERE location in ('北京', '天津') 
PARTITION BY location 
INTERVAL(10m);
CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;


上面的 2 条 SQL 中,第一条 SQL 创建了一个 stream:以 location 分组,计算每 10 分钟的“北京”、“天津”的平均电压;用时间窗口的结束 _wend 作为时间戳 ts;avg(voltage) 计算 voltage 平均值;时间窗口的最后一条 last_row(location) 作为标签。同时,这个 stream 以 WINDOW_CLOSE 作为计算窗口的触发模式,过期策略为 IGNORE EXPIRED 1。


第二条 SQL 创建了子查询订阅,用于过滤每 10 分钟平均电压高于 220V 的数据。这样我们就创建了一个可以被消费的 topic,消费到的数据为高于 220V 的 10 分钟平均电压,满足了前面所说的监控场景的需求。


消费 topic:很像 Kafka

在应用中消费 topic 的数据,需要按照各种连接器的 API 来使用,具体使用方式请参考官方文档:https://docs.taosdata.com/。在这里,我只对 TDengine 和订阅消费 topic 的一些配置参数进行梳理。


  1. 连接相关的参数,java connector 中使用 bootstrap.servers 一个参数代替了 td.connect.iptd.connect.port,使用了和 Kafka 一样的参数名。td.connect.usertd.connect.pass 仍然需要设置。
  2. group.id:和 Kafka 一样,多个线程可以共同消费同一个 topic,只要它们使用同一个 group.id。TDengine 的 vgroup 与 Kafka 的 partition 在概念上是对应的。同一个 group.id 中,一个 vgroup 最多只对应一个 consumer。如果 consumer 数量大于 vgroup 的数量,则有些 consumer 消费不到数据。
  3. auto.offset.reset:这个参数和 Kafka 的行为不一样。如果 group.id 为新值,在设置 earliest 时,订阅从头消费数据;设置为 latest 时,从最新数据开始订阅。当 group.id 为已存在的值时,不管 auto.offset.reset 为何值,都会从最后一个 offset 开始,继续消费。
  4. enable.auto.commit:建议设置为 false。开启自动提交 offset,TDengine 的 commit 自动提交机制是轮询提交。
  5. auto.commit.interval.ms:建议不设置。如果 enable.auto.commit 为 true,自动提交 commit 的间隔为 auto.commit.interval.ms 设置的值。
  6. enable.heartbeat.background:建议设置为 true,默认值为 true。如果设置为 false,在应用长时间不主动 poll 数据时,可能会造成当前 consumer 的离线。在 TDengine 的实现上,heartbeat 的 interval 被设置成了 1 秒。
  7. msg.with.table.name:建议设置成 true。在订阅超级表和数据库时添加了 WITH META,应该开启这个设置。例如:订阅为 CREATE TOPIC topic_name WITH META AS STABLE stb 时,配置 msg.with.table.name 为 true,则消费时可以获取到 tableName。


Show U The Code

到此,本文介绍了有关 TDengine3.0 的数据订阅功能的诸多细节。我相信,上面的内容应该可以为你使用数据订阅功能提供一些思路和帮助。但是,对程序员来说,“Talk is cheap. Show me the code”。下面,我列举了一些 Java 的示例代码,供你参考。


  1. subscribeDemo-java

这个 java 工程实现了一个最简单的订阅功能,从 TDengine 中订阅一个 topic ,并将消费到的数据写到文件中。值得一提的是,代码使用 bytebuddy 动态生成了 Java POJO 类和对应的 Deserializer 类。因此,你只需要在 schema.txt 内写好 topic 对应的字段,就可以不写代码,直接订阅不同 topic 的数据了。

链接:https://github.com/taosdata/subscribeDemo-java


  1. SubscribeDemo

这个页面展示了一段最基本的数据订阅的代码。main 方法中,包括了在 TDengine 中创建数据库、表、topic 的操作,并从 topic 中消费数据进行打印。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java


  1. WebsocketSubscribeDemo

这个页面的代码和 SubscribeDemo 相比,仅有的区别是其配置了 td.connect.type 参数为 ws,即:使用 websocket 连接 taosadapter,这样的好处是不用安装客户端。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java


结语

相信借助本篇文章,你一定能够流畅体验到 TDengine 的数据订阅功能,有需要的读者可以收藏备用。对于更为复杂的应用问题,也欢迎大家直接向社区技术支持人员寻求帮助。关于 TDengine 3.0 的更多示例代码,请参考:https://github.com/taosdata/TDengine/tree/main/docs/examples

目录
相关文章
|
1月前
|
存储 缓存 数据安全/隐私保护
说一说你对移动应用中的离线模式的实现。
【4月更文挑战第2天】移动应用的离线模式允许用户在无网情况下仍能部分使用应用,依赖于数据缓存和本地存储。应用在联网时缓存关键数据,离线时从本地读取。数据同步通过延迟策略在重连时完成,敏感信息加密存储并定期备份。开发者还需关注用户体验、性能优化及错误处理,确保离线模式的无缝衔接和稳定性。
19 1
|
2月前
|
机器学习/深度学习 存储 数据库
视觉智能平台常见问题之一直显示视频异步处理如何解决
视觉智能平台是利用机器学习和图像处理技术,提供图像识别、视频分析等智能视觉服务的平台;本合集针对该平台在使用中遇到的常见问题进行了收集和解答,以帮助开发者和企业用户在整合和部署视觉智能解决方案时,能够更快地定位问题并找到有效的解决策略。
18 0
|
2月前
|
安全
哈希竞猜游戏系统开发玩法详情/功能步骤/需求设计/流程方案/源码程序
Developing a hash guessing game system can provide a fun gaming experience. The following are possible gameplay and rules for your reference:
游戏对接广告看视频系统开发详细规则/方案逻辑/步骤逻辑/规则玩法/源码程序
Advertising location and display method: According to the characteristics of the game interface and scene, choose the appropriate advertising location and display method to ensure that the advertisement naturally integrates into the game and does not affect the player's game experience.
【项目实战典型案例】14.课程推送页面整理-增加定时功能
【项目实战典型案例】14.课程推送页面整理-增加定时功能
|
12月前
|
存储 小程序 JavaScript
借助云开发实现小程序朋友圈的发布与展示
借助云开发实现小程序朋友圈的发布与展示
|
iOS开发 MacOS Windows
无影产品动态 | 客户端6.2.0版本发布,操作更顺畅,体验更丝滑
本次更新优化了核心功能模块“云电脑助理”,新增云电脑UI缩放(DPI)配置功能,用户远程协助申请和用户共享协同功能,有效简化操作步骤,提升产品体验。
256 0
无影产品动态 | 客户端6.2.0版本发布,操作更顺畅,体验更丝滑
相亲交友源码,如何开发出便捷高效的消息列表?
相亲交友源码,如何开发出便捷高效的消息列表?
|
缓存 UED
语音直播系统,清理缓存功能的设计细节
语音直播系统,清理缓存功能的设计细节