想要流畅体验 TDengine 3.0 数据订阅功能?要点都在这里

众所周知,在 TDengine 3.0 中,我们对数据订阅功能进行了全面升级,以便大家可以更加便捷地实时订阅和获取数据的更新,完成实时监控、数据分析和有效报警等工作。在本文中,TDengine 资深研发将以 TDengine 3.0 为对象,为大家介绍数据订阅功能的正确打开方式,给到有需要的人作参考指南,避免走入应用误区。

本文将从 Java Developer 的视角来介绍如何使用 TDengine 3.0 的数据订阅功能。

TDengine 3.0 的版本迭代很快,可能有些配置参数或细节在之后的版本会发生变化,本文对应 TDengine 版本为 3.0.3.0

写在前面

在官方文档里已经有介绍,TDengine 的数据订阅是什么以及如何使用,有需要的朋友可以通过下方链接进入官网查看相关介绍:

  1. 开发指南:https://docs.tdengine.com/taos-sql/
  2. SQL:https://docs.tdengine.com/taos-sql/tmq/
  3. Java 使用数据订阅:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85

总结一下,我理解的数据订阅功能是以“订阅”的方式获取存在于 TDengine 中的数据。一般情况下,“订阅”意味着的业务需求是订阅数据库中的最新数据。“订阅”的流程很简单:(1)在数据库中创建 topic;(2)在应用中消费 topic 的数据。

基本操作:创建

在数据库中创建 topic,使用 SQL 语句 create topic 即可。create topic 这个 SQL 如何写,实际上定义了 topic 对应的数据粒度,包括哪些数据库、超级表、子表、列、行。值得一提的是,TDengine 的 SQL 支持订阅 database、 supertable、subquery 这 3 种模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 这种 SQL 可以直接订阅整个 database;CREATE TOPIC topic_name AS STABLE stb_name 这种 SQL 可以订阅某个超级表;订阅子查询是最普遍的场景。例如:

CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');

上面这个 SQL,订阅了 testdb 数据库中的 meters 超级表,通过 where 子句过滤满足以下条件:location(tag 列)为“北京”或“天津”的子表,且 voltage 超过 220.0 的 ts、voltage、location 的数据。

黄金搭档:流式计算 + 数据订阅

以智能电表的场景为例,如果我想每 10 分钟计算一次电压的平均值,并在平均电压高于 220V 就进行上报。对于这种需求,单纯用 TDengine 的数据订阅功能是不行的,因为 create topic 的子查询不支持聚合查询。这个时候,就需要用 TDengine 的流式计算 + 数据订阅这对黄金搭档了。如下:

CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1 
INTO stb_name 
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters 
WHERE location in ('北京', '天津') 
PARTITION BY location 
INTERVAL(10m);

CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;

上面的 2 条 SQL 中,第一条 SQL 创建了一个 stream:以 location 分组,计算每 10 分钟的“北京”、“天津”的平均电压;用时间窗口的结束 _wend 作为时间戳 ts;avg(voltage) 计算 voltage 平均值;时间窗口的最后一条 last_row(location) 作为标签。同时,这个 stream 以 WINDOW_CLOSE 作为计算窗口的触发模式,过期策略为 IGNORE EXPIRED 1。

第二条 SQL 创建了子查询订阅,用于过滤每 10 分钟平均电压高于 220V 的数据。这样我们就创建了一个可以被消费的 topic,消费到的数据为高于 220V 的 10 分钟平均电压,满足了前面所说的监控场景的需求。

消费 topic:很像 Kafka

在应用中消费 topic 的数据,需要按照各种连接器的 API 来使用,具体使用方式请参考官方文档:https://docs.taosdata.com/。在这里,我只对 TDengine 和订阅消费 topic 的一些配置参数进行梳理。

  1. 连接相关的参数,java connector 中使用 bootstrap.servers 一个参数代替了 td.connect.iptd.connect.port,使用了和 Kafka 一样的参数名。td.connect.usertd.connect.pass 仍然需要设置。
  2. group.id:和 Kafka 一样,多个线程可以共同消费同一个 topic,只要它们使用同一个 group.id。TDengine 的 vgroup 与 Kafka 的 partition 在概念上是对应的。同一个 group.id 中,一个 vgroup 最多只对应一个 consumer。如果 consumer 数量大于 vgroup 的数量,则有些 consumer 消费不到数据。
  3. auto.offset.reset:这个参数和 Kafka 的行为不一样。如果 group.id 为新值,在设置 earliest 时,订阅从头消费数据;设置为 latest 时,从最新数据开始订阅。当 group.id 为已存在的值时,不管 auto.offset.reset 为何值,都会从最后一个 offset 开始,继续消费。
  4. enable.auto.commit:建议设置为 false。开启自动提交 offset,TDengine 的 commit 自动提交机制是轮询提交。
  5. auto.commit.interval.ms:建议不设置。如果 enable.auto.commit 为 true,自动提交 commit 的间隔为 auto.commit.interval.ms 设置的值。
  6. enable.heartbeat.background:建议设置为 true,默认值为 true。如果设置为 false,在应用长时间不主动 poll 数据时,可能会造成当前 consumer 的离线。在 TDengine 的实现上,heartbeat 的 interval 被设置成了 1 秒。
  7. msg.with.table.name:建议设置成 true。在订阅超级表和数据库时添加了 WITH META,应该开启这个设置。例如:订阅为 CREATE TOPIC topic_name WITH META AS STABLE stb 时,配置 msg.with.table.name 为 true,则消费时可以获取到 tableName。

Show U The Code

到此,本文介绍了有关 TDengine3.0 的数据订阅功能的诸多细节。我相信,上面的内容应该可以为你使用数据订阅功能提供一些思路和帮助。但是,对程序员来说,“Talk is cheap. Show me the code”。下面,我列举了一些 Java 的示例代码,供你参考。

  1. subscribeDemo-java

这个 java 工程实现了一个最简单的订阅功能,从 TDengine 中订阅一个 topic ,并将消费到的数据写到文件中。值得一提的是,代码使用 bytebuddy 动态生成了 Java POJO 类和对应的 Deserializer 类。因此,你只需要在 schema.txt 内写好 topic 对应的字段,就可以不写代码,直接订阅不同 topic 的数据了。

链接:https://github.com/taosdata/subscribeDemo-java

  1. SubscribeDemo

这个页面展示了一段最基本的数据订阅的代码。main 方法中,包括了在 TDengine 中创建数据库、表、topic 的操作,并从 topic 中消费数据进行打印。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java

  1. WebsocketSubscribeDemo

这个页面的代码和 SubscribeDemo 相比,仅有的区别是其配置了 td.connect.type 参数为 ws,即:使用 websocket 连接 taosadapter,这样的好处是不用安装客户端。

链接:https://github.com/taosdata/TDengine/blob/main/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java

结语

相信借助本篇文章,你一定能够流畅体验到 TDengine 的数据订阅功能,有需要的读者可以收藏备用。对于更为复杂的应用问题,也欢迎大家加入 TDengine 的开发者交流群(添加小T vx:tdengine),直接向社区技术支持人员寻求帮助。关于 TDengine 3.0 的更多示例代码,请参考:https://github.com/taosdata/TDengine/tree/main/docs/examples