想要流畅体验 TDengine 3.0 数据订阅功能?要点都在这里

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。

众所周知,在 TDengine 3.0 中,我们对数据订阅功能进行了全面升级,以便大家可以更加便捷地实时订阅和获取数据的更新,完成实时监控、数据分析和有效报警等工作。在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。


本文将从 Java Developer 的视角来介绍如何使用 TDengine 3.0 的数据订阅功能。


TDengine 3.0 的版本迭代很快,可能有些配置参数或细节在之后的版本会发生变化,本文对应 TDengine 版本为 3.0.3.0


写在前面

在官方文档里已经有介绍,TDengine 的数据订阅是什么以及如何使用,有需要的朋友可以通过下方链接进入官网查看相关介绍:


  1. 开发指南:https://docs.tdengine.com/taos-sql/
  2. SQL:https://docs.tdengine.com/taos-sql/tmq/
  3. Java 使用数据订阅:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85


总结一下,我理解的数据订阅功能是以“订阅”的方式获取存在于 TDengine 中的数据。一般情况下,“订阅”意味着的业务需求是订阅数据库中的最新数据。“订阅”的流程很简单:(1)在数据库中创建 topic;(2)在应用中消费 topic 的数据。


基本操作:创建

在数据库中创建 topic,使用 SQL 语句 create topic 即可。create topic 这个 SQL 如何写,实际上定义了 topic 对应的数据粒度,包括哪些数据库、超级表、子表、列、行。值得一提的是,TDengine 的 SQL 支持订阅 database、 supertable、subquery 这 3 种模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 这种 SQL 可以直接订阅整个 database;CREATE TOPIC topic_name AS STABLE stb_name 这种 SQL 可以订阅某个超级表;订阅子查询是最普遍的场景。例如:


CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');


上面这个 SQL,订阅了 testdb 数据库中的 meters 超级表,通过 where 子句过滤满足以下条件:location(tag 列)为“北京”或“天津”的子表,且 voltage 超过 220.0 的 ts、voltage、location 的数据。


黄金搭档:流式计算 + 数据订阅

以智能电表的场景为例,如果我想每 10 分钟计算一次电压的平均值,并在平均电压高于 220V 就进行上报。对于这种需求,单纯用 TDengine 的数据订阅功能是不行的,因为 create topic 的子查询不支持聚合查询。这个时候,就需要用 TDengine 的流式计算 + 数据订阅这对黄金搭档了。如下:


CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1 
INTO stb_name 
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters 
WHERE location in ('北京', '天津') 
PARTITION BY location 
INTERVAL(10m);
CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;


上面的 2 条 SQL 中,第一条 SQL 创建了一个 stream:以 location 分组,计算每 10 分钟的“北京”、“天津”的平均电压;用时间窗口的结束 _wend 作为时间戳 ts;avg(voltage) 计算 voltage 平均值;时间窗口的最后一条 last_row(location) 作为标签。同时,这个 stream 以 WINDOW_CLOSE 作为计算窗口的触发模式,过期策略为 IGNORE EXPIRED 1。


第二条 SQL 创建了子查询订阅,用于过滤每 10 分钟平均电压高于 220V 的数据。这样我们就创建了一个可以被消费的 topic,消费到的数据为高于 220V 的 10 分钟平均电压,满足了前面所说的监控场景的需求。


消费 topic:很像 Kafka

在应用中消费 topic 的数据,需要按照各种连接器的 API 来使用,具体使用方式请参考官方文档:https://docs.taosdata.com/。在这里,我只对 TDengine 和订阅消费 topic 的一些配置参数进行梳理。


  1. 连接相关的参数,java connector 中使用 bootstrap.servers 一个参数代替了 td.connect.iptd.connect.port,使用了和 Kafka 一样的参数名。td.connect.usertd.connect.pass 仍然需要设置。
  2. group.id:和 Kafka 一样,多个线程可以共同消费同一个 topic,只要它们使用同一个 group.id。TDengine 的 vgroup 与 Kafka 的 partition 在概念上是对应的。同一个 group.id 中,一个 vgroup 最多只对应一个 consumer。如果 consumer 数量大于 vgroup 的数量,则有些 consumer 消费不到数据。
  3. auto.offset.reset:这个参数和 Kafka 的行为不一样。如果 group.id 为新值,在设置 earliest 时,订阅从头消费数据;设置为 latest 时,从最新数据开始订阅。当 group.id 为已存在的值时,不管 auto.offset.reset 为何值,都会从最后一个 offset 开始,继续消费。
  4. enable.auto.commit:建议设置为 false。开启自动提交 offset,TDengine 的 commit 自动提交机制是轮询提交。
  5. auto.commit.interval.ms:建议不设置。如果 enable.auto.commit 为 true,自动提交 commit 的间隔为 auto.commit.interval.ms 设置的值。
  6. enable.heartbeat.background:建议设置为 true,默认值为 true。如果设置为 false,在应用长时间不主动 poll 数据时,可能会造成当前 consumer 的离线。在 TDengine 的实现上,heartbeat 的 interval 被设置成了 1 秒。
  7. msg.with.table.name:建议设置成 true。在订阅超级表和数据库时添加了 WITH META,应该开启这个设置。例如:订阅为 CREATE TOPIC topic_name WITH META AS STABLE stb 时,配置 msg.with.table.name 为 true,则消费时可以获取到 tableName。


Show U The Code

到此,本文介绍了有关 TDengine3.0 的数据订阅功能的诸多细节。我相信,上面的内容应该可以为你使用数据订阅功能提供一些思路和帮助。但是,对程序员来说,“Talk is cheap. Show me the code”。下面,我列举了一些 Java 的示例代码,供你参考。


  1. subscribeDemo-java

这个 java 工程实现了一个最简单的订阅功能,从 TDengine 中订阅一个 topic ,并将消费到的数据写到文件中。值得一提的是,代码使用 bytebuddy 动态生成了 Java POJO 类和对应的 Deserializer 类。因此,你只需要在 schema.txt 内写好 topic 对应的字段,就可以不写代码,直接订阅不同 topic 的数据了。

链接:https://github.com/taosdata/subscribeDemo-java


  1. SubscribeDemo

这个页面展示了一段最基本的数据订阅的代码。main 方法中,包括了在 TDengine 中创建数据库、表、topic 的操作,并从 topic 中消费数据进行打印。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java


  1. WebsocketSubscribeDemo

这个页面的代码和 SubscribeDemo 相比,仅有的区别是其配置了 td.connect.type 参数为 ws,即:使用 websocket 连接 taosadapter,这样的好处是不用安装客户端。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java


结语

相信借助本篇文章,你一定能够流畅体验到 TDengine 的数据订阅功能,有需要的读者可以收藏备用。对于更为复杂的应用问题,也欢迎大家直接向社区技术支持人员寻求帮助。关于 TDengine 3.0 的更多示例代码,请参考:https://github.com/taosdata/TDengine/tree/main/docs/examples

目录
相关文章
|
7月前
|
小程序 API Android开发
【社区每周】基础库更新;小游戏增加流量运营相关文档(11月第四期)
【社区每周】基础库更新;小游戏增加流量运营相关文档(11月第四期)
119 11
|
7月前
|
搜索推荐 数据管理 数据挖掘
解码2024年项目管理系统:排行榜背后的功能与特色解析
2024年十大项目管理工具:Zoho Projects以其专业成熟度领先,适合跨部门协作和进度跟踪;Nifty适合初创公司,界面直观,响应快速;Quickbase面向处理大量信息的团队,提供定制化解决方案;WorkOtter专为中大型企业资源管理和汇报设计;Asana适合大型协作团队,任务管理和沟通高效;Monday.com高度可定制,适合复杂项目管理;Smartsheet结合电子表格功能,适合流程多变的团队;Adobe Workfront针对复杂项目和自动化需求;ClickUp是一站式工作平台,功能多样;Trello则以简洁看板适合小团队和个人。考虑团队规模、项目复杂度和个性化需求来选工具
79 1
|
7月前
|
存储 缓存 数据安全/隐私保护
说一说你对移动应用中的离线模式的实现。
【4月更文挑战第2天】移动应用的离线模式允许用户在无网情况下仍能部分使用应用,依赖于数据缓存和本地存储。应用在联网时缓存关键数据,离线时从本地读取。数据同步通过延迟策略在重连时完成,敏感信息加密存储并定期备份。开发者还需关注用户体验、性能优化及错误处理,确保离线模式的无缝衔接和稳定性。
268 1
|
7月前
|
安全
哈希竞猜游戏系统开发玩法详情/功能步骤/需求设计/流程方案/源码程序
Developing a hash guessing game system can provide a fun gaming experience. The following are possible gameplay and rules for your reference:
直播网站源码社区功能部署开发:连接世界的互动形式!
直播网站源码社区功能如何去实现from flask import Flask, request app = Flask(__name__) posts = [] @app.route('/post', methods=['POST'])
直播网站源码社区功能部署开发:连接世界的互动形式!
【项目实战典型案例】14.课程推送页面整理-增加定时功能
【项目实战典型案例】14.课程推送页面整理-增加定时功能
|
存储 小程序 JavaScript
借助云开发实现小程序朋友圈的发布与展示
借助云开发实现小程序朋友圈的发布与展示
123 0
|
缓存 UED
语音直播系统,清理缓存功能的设计细节
语音直播系统,清理缓存功能的设计细节
|
UED 异构计算
语音直播平台源码,关于开发优化的几点建议
语音直播平台源码,关于开发优化的几点建议
|
Android开发 iOS开发
直播间搭建过程中较为核心的三个步骤
不知道大家平时有没有看游戏直播的习惯,我偶尔会看。目前游戏直播通常会分成网游和手游两大类,最明显的区别就是一个需要电脑一个需要手机。但这些在开发者眼中看来,直播间搭建的过程都是比较一致的。不过其中也存在需要特殊注意的几个步骤,接下来跟大家简单分享一下。
下一篇
DataWorks