想要流畅体验 TDengine 3.0 数据订阅功能?要点都在这里

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。

众所周知,在 TDengine 3.0 中,我们对数据订阅功能进行了全面升级,以便大家可以更加便捷地实时订阅和获取数据的更新,完成实时监控、数据分析和有效报警等工作。在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。


本文将从 Java Developer 的视角来介绍如何使用 TDengine 3.0 的数据订阅功能。


TDengine 3.0 的版本迭代很快,可能有些配置参数或细节在之后的版本会发生变化,本文对应 TDengine 版本为 3.0.3.0


写在前面

在官方文档里已经有介绍,TDengine 的数据订阅是什么以及如何使用,有需要的朋友可以通过下方链接进入官网查看相关介绍:


  1. 开发指南:https://docs.tdengine.com/taos-sql/
  2. SQL:https://docs.tdengine.com/taos-sql/tmq/
  3. Java 使用数据订阅:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85


总结一下,我理解的数据订阅功能是以“订阅”的方式获取存在于 TDengine 中的数据。一般情况下,“订阅”意味着的业务需求是订阅数据库中的最新数据。“订阅”的流程很简单:(1)在数据库中创建 topic;(2)在应用中消费 topic 的数据。


基本操作:创建

在数据库中创建 topic,使用 SQL 语句 create topic 即可。create topic 这个 SQL 如何写,实际上定义了 topic 对应的数据粒度,包括哪些数据库、超级表、子表、列、行。值得一提的是,TDengine 的 SQL 支持订阅 database、 supertable、subquery 这 3 种模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 这种 SQL 可以直接订阅整个 database;CREATE TOPIC topic_name AS STABLE stb_name 这种 SQL 可以订阅某个超级表;订阅子查询是最普遍的场景。例如:


CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');


上面这个 SQL,订阅了 testdb 数据库中的 meters 超级表,通过 where 子句过滤满足以下条件:location(tag 列)为“北京”或“天津”的子表,且 voltage 超过 220.0 的 ts、voltage、location 的数据。


黄金搭档:流式计算 + 数据订阅

以智能电表的场景为例,如果我想每 10 分钟计算一次电压的平均值,并在平均电压高于 220V 就进行上报。对于这种需求,单纯用 TDengine 的数据订阅功能是不行的,因为 create topic 的子查询不支持聚合查询。这个时候,就需要用 TDengine 的流式计算 + 数据订阅这对黄金搭档了。如下:


CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1 
INTO stb_name 
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters 
WHERE location in ('北京', '天津') 
PARTITION BY location 
INTERVAL(10m);
CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;


上面的 2 条 SQL 中,第一条 SQL 创建了一个 stream:以 location 分组,计算每 10 分钟的“北京”、“天津”的平均电压;用时间窗口的结束 _wend 作为时间戳 ts;avg(voltage) 计算 voltage 平均值;时间窗口的最后一条 last_row(location) 作为标签。同时,这个 stream 以 WINDOW_CLOSE 作为计算窗口的触发模式,过期策略为 IGNORE EXPIRED 1。


第二条 SQL 创建了子查询订阅,用于过滤每 10 分钟平均电压高于 220V 的数据。这样我们就创建了一个可以被消费的 topic,消费到的数据为高于 220V 的 10 分钟平均电压,满足了前面所说的监控场景的需求。


消费 topic:很像 Kafka

在应用中消费 topic 的数据,需要按照各种连接器的 API 来使用,具体使用方式请参考官方文档:https://docs.taosdata.com/。在这里,我只对 TDengine 和订阅消费 topic 的一些配置参数进行梳理。


  1. 连接相关的参数,java connector 中使用 bootstrap.servers 一个参数代替了 td.connect.iptd.connect.port,使用了和 Kafka 一样的参数名。td.connect.usertd.connect.pass 仍然需要设置。
  2. group.id:和 Kafka 一样,多个线程可以共同消费同一个 topic,只要它们使用同一个 group.id。TDengine 的 vgroup 与 Kafka 的 partition 在概念上是对应的。同一个 group.id 中,一个 vgroup 最多只对应一个 consumer。如果 consumer 数量大于 vgroup 的数量,则有些 consumer 消费不到数据。
  3. auto.offset.reset:这个参数和 Kafka 的行为不一样。如果 group.id 为新值,在设置 earliest 时,订阅从头消费数据;设置为 latest 时,从最新数据开始订阅。当 group.id 为已存在的值时,不管 auto.offset.reset 为何值,都会从最后一个 offset 开始,继续消费。
  4. enable.auto.commit:建议设置为 false。开启自动提交 offset,TDengine 的 commit 自动提交机制是轮询提交。
  5. auto.commit.interval.ms:建议不设置。如果 enable.auto.commit 为 true,自动提交 commit 的间隔为 auto.commit.interval.ms 设置的值。
  6. enable.heartbeat.background:建议设置为 true,默认值为 true。如果设置为 false,在应用长时间不主动 poll 数据时,可能会造成当前 consumer 的离线。在 TDengine 的实现上,heartbeat 的 interval 被设置成了 1 秒。
  7. msg.with.table.name:建议设置成 true。在订阅超级表和数据库时添加了 WITH META,应该开启这个设置。例如:订阅为 CREATE TOPIC topic_name WITH META AS STABLE stb 时,配置 msg.with.table.name 为 true,则消费时可以获取到 tableName。


Show U The Code

到此,本文介绍了有关 TDengine3.0 的数据订阅功能的诸多细节。我相信,上面的内容应该可以为你使用数据订阅功能提供一些思路和帮助。但是,对程序员来说,“Talk is cheap. Show me the code”。下面,我列举了一些 Java 的示例代码,供你参考。


  1. subscribeDemo-java

这个 java 工程实现了一个最简单的订阅功能,从 TDengine 中订阅一个 topic ,并将消费到的数据写到文件中。值得一提的是,代码使用 bytebuddy 动态生成了 Java POJO 类和对应的 Deserializer 类。因此,你只需要在 schema.txt 内写好 topic 对应的字段,就可以不写代码,直接订阅不同 topic 的数据了。

链接:https://github.com/taosdata/subscribeDemo-java


  1. SubscribeDemo

这个页面展示了一段最基本的数据订阅的代码。main 方法中,包括了在 TDengine 中创建数据库、表、topic 的操作,并从 topic 中消费数据进行打印。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java


  1. WebsocketSubscribeDemo

这个页面的代码和 SubscribeDemo 相比,仅有的区别是其配置了 td.connect.type 参数为 ws,即:使用 websocket 连接 taosadapter,这样的好处是不用安装客户端。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java


结语

相信借助本篇文章,你一定能够流畅体验到 TDengine 的数据订阅功能,有需要的读者可以收藏备用。对于更为复杂的应用问题,也欢迎大家直接向社区技术支持人员寻求帮助。关于 TDengine 3.0 的更多示例代码,请参考:https://github.com/taosdata/TDengine/tree/main/docs/examples

目录
相关文章
|
1天前
|
存储 编解码 数据处理
云端问道第4期实践教学——多媒体数据存储与分发方案部署演示
该文档详细介绍了阿里云一键部署和手动部署多媒体数据存储与分发方案的步骤。一键部署通过资源编排服务(ROS)实现自动化,涵盖注册账号、开通服务、创建OSS Bucket、配置CDN加速及绑定IMM等功能,简化了复杂操作。手动部署则更细致地展示了每个配置环节,包括网络规划、资源创建、域名绑定、CDN配置、证书加密及最终的验证与清理,确保用户对整个流程有清晰理解。两种方式均以OSS为核心,支持数据上传、转码处理和加速分发,保障高效稳定的用户体验。
|
8月前
|
存储 缓存 数据安全/隐私保护
说一说你对移动应用中的离线模式的实现。
【4月更文挑战第2天】移动应用的离线模式允许用户在无网情况下仍能部分使用应用,依赖于数据缓存和本地存储。应用在联网时缓存关键数据,离线时从本地读取。数据同步通过延迟策略在重连时完成,敏感信息加密存储并定期备份。开发者还需关注用户体验、性能优化及错误处理,确保离线模式的无缝衔接和稳定性。
283 1
|
8月前
|
搜索推荐 数据管理 数据挖掘
解码2024年项目管理系统:排行榜背后的功能与特色解析
2024年十大项目管理工具:Zoho Projects以其专业成熟度领先,适合跨部门协作和进度跟踪;Nifty适合初创公司,界面直观,响应快速;Quickbase面向处理大量信息的团队,提供定制化解决方案;WorkOtter专为中大型企业资源管理和汇报设计;Asana适合大型协作团队,任务管理和沟通高效;Monday.com高度可定制,适合复杂项目管理;Smartsheet结合电子表格功能,适合流程多变的团队;Adobe Workfront针对复杂项目和自动化需求;ClickUp是一站式工作平台,功能多样;Trello则以简洁看板适合小团队和个人。考虑团队规模、项目复杂度和个性化需求来选工具
83 1
|
存储 小程序 JavaScript
借助云开发实现小程序朋友圈的发布与展示
借助云开发实现小程序朋友圈的发布与展示
125 0
|
Android开发 iOS开发
直播间搭建过程中较为核心的三个步骤
不知道大家平时有没有看游戏直播的习惯,我偶尔会看。目前游戏直播通常会分成网游和手游两大类,最明显的区别就是一个需要电脑一个需要手机。但这些在开发者眼中看来,直播间搭建的过程都是比较一致的。不过其中也存在需要特殊注意的几个步骤,接下来跟大家简单分享一下。
|
安全
一对一聊天源码,语音聊天的优劣势分析
一对一聊天源码,语音聊天的优劣势分析
366 0
|
Web App开发
移动端基础事件和交互(未完待续)
移动端基础事件和交互
1290 0
|
程序员
直播间搭建一定要加入的三个功能,机制介绍与优势分析
直播系统中比较火的几个功能是主播连麦功能、守护功能和拼手气红包功能,恰巧了解到直播间搭建的程序员朋友也刚好研究出了这三个功能,加入到了直播系统里,下面为大家剖析一下这三个功能的功能机制功能特点。
|
Windows
直播系统中使用SEI传输用户自定义数据方案讨论
      在直播系统中,除了直播音视频之外,有时候还想从主播端发布文本信息等,这些信息可以不通过视频传输通道发送给用户播放端,但如果传输的数据想和视频保持精准同步,那最好的办法就是这些信息和视频数据打包在一起传输, 通过h264 sei方式就可以把数据放入h264 Access Unit中传输。
1642 0