TDengine中订阅的用途和用法

本文将介绍TDengine Database订阅功能的使用场景、使用方法和一些限制,并与InfluxDB的订阅功能进行简单的对比。本文的预期读者是基于TDengine开发各种应用的软件开发人员。

什么是订阅?

订阅,是一种数据查询方式,其特点为:客户端执行一个查询语句后,可以增量形式,不断收到新到达服务端的、符合查询条件的数据。订阅的实现模型有两种,一种是“推”,即服务器主动将数据发到客户端;另一种是“拉”,即客户端主动向服务器请求数据。两种方式各有优缺点,这里不做详细的对比,只是说明一下,TDengine Database使用的是“拉”模型。

什么时候需要使用订阅?

为了便于用户程序消费TDengine Database中的数据,TDengine实现了基于SQL的数据查询语法,并提供了丰富的聚合函数,这种方式的优势已在多个实际案例中得到了体现。但由于时序数据的特点,单纯的直接数据查询并不能满足用户程序的需求,比如:我们管理着一批温度测量设备,希望当某个设备检测到的温度超过限制(比如80°C)后能得到通知并进行一些处理时,肯定会先为所有的设备建立一张超级表:

create database test;
use test;
create table devices (ts timestamp, temperature float) tags(id int);

并为每个设备创建一张子表:

create table device1 using devices tags(1);
create table device2 using devices tags(2);
...

这种设计满足了设备管理的需求,但如何满足温度监测的需求呢?如果仅使用普通的查询,有两种方法:一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:

select * from device1 where ts > last_timestamp1 and temperature > 80;
select * from device2 where ts > last_timestamp2 and temperature > 80;
...

这确实可行,但随着设备数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当设备数增长到一定的程度,系统就无法承受了。

另一种方法是对超级表进行查询。这样,无论有多少设备,都只需一次查询:

select * from devices where ts > last_timestamp and temperature > 80;

但是,如何选择 last_timestamp 就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同设备的数据到达TDengine的时间也会有差异。所以,如果我们在查询中使用最慢的那台设备的数据的时间戳作为 last_timestamp ,就可能重复读入其它设备的数据;如果使用最快的设备的时间戳,其它设备的数据就可能被漏掉。

TDengine的订阅功能为上面这个问题提供了一个彻底的解决方案。

如何使用TDengine中的订阅功能?

TDengine的API中,与订阅相关的主要有以下三个:

  • taos_subscribe
  • taos_consume
  • taos_unsubscribe

这三个API的具体说明请见《C/C++数据订阅接口》,下面结合一个示例,介绍下其使用方法,完整的示例代码可以在这里找到。

首先是创建订阅:

TAOS_SUB* tsub = NULL;
if (async) {
  // create an asynchronized subscription, the callback function will be called every 1s
  tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
  // create an synchronized subscription, need to call 'taos_consume' manually
  tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}

TDengine中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数async的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用 taos_consume来拉取数据,而异步则由API在内部的另一个线程中调用taos_consume,然后把拉取到的数据交给回调函数 subscribe_callback去处理。

参数taos是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。

参数sql是查询语句,可以在其中使用where子句指定过滤条件。回到开头的例子,如果我们只想订阅设备温度超过 80°C 时的数据,可以这样写:

select * from devices where temperature > 80;

注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:

select * from devices where ts > now - 1d and temperature > 80;

订阅的topic实际上是它的名字,因为订阅功能是在客户端API中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。

如果名topic的订阅不存在,参数restart没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个topic时,restart就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果restarttrue(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且restartfalse(0),用户程序就不会读到之前已经读取的数据了。

taos_subscribe的最后一个参数是以毫秒为单位的轮询周期。在同步模式下,如过前后两次调用taos_consume的时间间隔小于此时间,taos_consume会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。

taos_subscribe的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。

订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:

if (async) {
  getchar();
} else while(1) {
  TAOS_RES* res = taos_consume(tsub);
  if (res == NULL) {
    printf("failed to consume data.");
    break;
  } else {
    print_result(res, blockFetch);
    getchar();
  }
}

这里是一个while循环,用户每按一次回车键就调用一次taos_consume,而taos_consume的返回值是查询到的结果集,与taos_use_result完全相同,例子中使用这个结果集的代码是函数print_result

void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW row = NULL;
  int num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int nRows = 0;
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);puts(temp);
      nRows++;
    }
  }
  printf("%d rows consumed.\n", nRows);
}

其中的 taos_print_row 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。而异步模式下,消费订阅到的数据则显得更为简单:

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}

当要结束一次数据订阅时,需要调用taos_unsubscribe:

taos_unsubscribe(tsub, keep);

其第二个参数,用于决定是否在客户端保留订阅的进度信息,如果大家还记得前面说过“订阅功能是在客户端API中实现的”的话,应该可以猜到,如果这个参数是false(0),那无论下次调用taos_subscribe的时的restart参数是什么,订阅都只能重新开始了。另外,进度信息的保存位置是{DataDir}/subscribe/,这个目录下,每个订阅有一个与其topic同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。

代码介绍完毕,我们来看一下实际的运行效果。假设:

  • 示例代码已经下载到本地
  • TDengine 也已经在同一台机器上安装好
  • 已经按照本文开头的脚本创建数据库、超级表和一些子表

则可以在示例代码所在目录执行以下命令来编译并启动示例程序:

$ make
$ ./subscribe -sql='select * from devices where temperature > 80;'

示例程序启动后,打开另一个终端窗口,启动 TDengine 的 shell 向 device1 插入一条温度为 90 °C 的数据:

$ taos
> use test;
> insert into device1 values(0, 90);

这时,因为温度超过了 80 °C ,您应该可以看到示例程序将它输出到了屏幕上。您可以继续插入一些数据观察示例程序的输出。

用作消息队列

本文开头的例子,是用订阅实现了一个报警监控的功能,但其实订阅也可以用在其它场景中,比如:消息队列。

应用程序可以订阅数据库某些表的内容,同一个表也可以被多个应用订阅,一旦表有新的记录,应用将立即得到通知。这样,再把数据插入看做Publish操作,用户完全可以把TDengine作为一个消息队列中间件来使用。

所以,当下次面对需要使用Kafka的场景时,不妨先考虑下TDengine,因为TDengine除了安装包超小、运维超简单的优点外,还有一个Kafka不具备的功能——数据过滤:可以在查询语句中指定过滤条件,保证读到的数据都是有用的,不用再在代码中手写过滤逻辑了。

与InfluxDB的对比

概念上说,InfluxDB的订阅和TDengine的订阅区别很大,我们可以认为订阅在InfluxDB中更像一种数据同步机制,而TDengine中的订阅则是一种数据查询机制:

  • InfluxDB将收到的数据实时推送给其它节点,TDengine通过轮询的方式拉取数据,InfluxDB具有更好的实时性。
  • InfluxDB中只能订阅全部数据,TDengine中可以指定数据过滤条件。
  • InfluxDB中只能订阅当前时间之后的数据,TDengine中可以在订阅中读到历史数据。

所以,两相对比,InfluxDB的优势是实时性,而TDengine则以稍微牺牲实时性为代价提供了更强大的功能。

限制条件

下面是一些TDengine订阅功能的局限,大家需要在使用中注意。

  • 订阅的查询语句只能是 select 语句,只能查询原始数据(不支持聚合函数),只能按时间正序查询数据。
  • 在满足应用需求的情况下,请尽量将轮询周期设置的大一些,否则会对系统性能造成影响。
  • 暂不支持乱序数据,用户程序可能读不到使用import方式插入的数据。
  • 如果用户程序异常退出或没有正确调用taos_unsubscribe,进度信息可能会有错误,这时,后续的同名订阅可能读到之前已经读过的数据。