单机每秒百万记录、毫秒级延迟,TDengine 3.0 流处理引擎全面解析

在 8 月 13 日的 TDengine 开发者大会上,TDengine 引擎开发工程师刘继聪带来题为《TDengine 3.0 流处理引擎——单机每秒百万记录,毫秒级延迟》的主题演讲,详细阐述了 TDengine 3.0 全新的流式计算引擎技术实现原理。本文即根据此演讲整理而成。

点击【这里】查看完整演讲视频

全新的流处理引擎以及数据订阅

流是源源不断写入 TDengine 中的数据,但作为原始数据的流无法产生更多价值。我们需要对流进行过滤、分流、变换,产生新的流写入 TDengine 或推送给用户程序,这就是 TDengine 流处理引擎的使命。在 TDengine 中创建一个流,即是定义一个变换,并在流的 source 中数据到来时按照一定规则自动触发,并写入 sink。

TDengine Database

我们首先来看一下 TDengine 中流的几个例子:

  • 作为标量函数的流,对超级表中的每个子表各自运算,写入新的子表
CREATE STREAM power_stream
        INTO power_stream_output_stb AS 
        SELECT 
              ts, 
              concat_ws(".", location, TBNAME) AS meter_location,
              current * voltage * cos(phase) AS active_power,
              current * voltage * sin(phase) AS reactive_power 
          FROM meters 
          WHERE voltage > 100
          PARTITION BY TBNAME

通过 CREATE STREAM 定义了流的名称,紧接着是定义流的 sink,在 INTO 后面紧跟的是一个超级表,这个超级表在创建流时会被同时自动创建出来。紧接着,是一个 SELECT 查询,定义了流的变换。这里是一个带过滤的标量函数计算。

创建流语句的最后是一个 PARTITION 语句:PARTITION BY TBNAME,表示流的分区如何划分,不同分区的数据将被写入超级表中的不同子表。

  • 以滑动窗口,聚合超级表中所有子表的流
CREATE STREAM current_stream
        TRIGGER AT_ONCE
        INTO current_stream_output_stb AS
        SELECT
                _wstart as start,
                _wend as end,
                max(current) as max_current
        FROM meters
        WHERE voltage <= 220
       INTEVAL (5s) SLIDING (1s);

我们来看第二个例子,这个例子中是一个滑动窗口的聚合。在 create stream 之后,通过 TRIGGER AT_ONCE 指定了流的触发模式,AT_ONCE 模式表示当数据写入,立即触发计算并推送结果,而无需等待窗口关闭。

  • 以会话窗口session window,对超级表中的子表各自聚合
CREATE STREAM current_stream
      TRIGGER WINDOW_CLOSE
      WATERMARK 60s
      INTO current_stream_output_stb AS
      SELECT
                _wstart as start,
                _wend as end,
                avg(current) as avg_current
       FROM meters
       SESSION(ts, 30s)
       PARTITION BY TBNAME

第三个例子是会话窗口(session window)。流除了支持翻滚窗口(tumble window)和滑动窗口(sliding window, hop window)外,还支持会话窗口和状态窗口(state window),窗口的定义与 TDengine 中的普通查询完全一致。在这里,我们再次引入了 PARTITION BY TBNAME 子句,表示每个子表独立计算会话窗口并将结果写入目的表。

从上面三种触发方式的具体实现中我们可以看到,创建流有几个重要的元素:

  1. 流的源表及其变换:这是一个 SELECT 查询
  2. 流的目的表:这是一个自动创建的超级表
  3. 流的分区:根据不同的 PARTITION 自动创建子表,并写入对应的子表中去。

定义一个流就是定义一个数据变换,数据的变换与 ETL 在 TDengine 内部完成。

TDengine Database
CREATE STREAM 定义的流是 TDengine 内部的流

在上面的例子中,实时流推送到了 TDengine 中,却并没有直接推送到应用。这是因为,全新的范式带来了全新的解决方案,但是也会带来应用改造的额外开销。如果应用需要去处理 TDengine 推送的实时流,那么将带来更加高昂的成本;反之它可以保留原有的 Query 获取结果模式,例如,将复杂耗时的查询改为对流聚合结果的简单查询,最大程度地利用流带来的便利。

但有些时候应用仍然需要去低延迟地获取数据,比如在实现监控报警与异常检测时,这种情况下流就需要真正到达应用, TDengine 提供了数据订阅功能来满足这种需求。

从流的角度,我们也可以重新理解 TDengine 提供的数据订阅功能,数据订阅的目的是将写入的数据以流的方式推送到消费者中去,由于应用的处理能力有限,流需要被持久化并按需读取,对应到 Kafka 中就是消息可以无限堆积的特性。

TDengine Database
数据订阅就是从 TDengine 延伸到应用的流

TDengine 中落盘的数据流就是 WAL,我们会把写入的数据物化,即持久化落盘下去,这也就是消息队列中的存储。从这点出发,我们将 WAL 改造成一个真正的存储引擎,提供灵活可配置的删除与文件切换策略,并建立索引,再对接查询引擎。

数据订阅使用 CREATE TOPIC 语法从 WAL 中产生数据流,提供类似消息队列的接口,既可以订阅用户创建的表,又可以订阅流的 SINK 表。在具体操作时,标量函数、过滤可以从 WAL 中提取数据并变换,这样的变换其实就是产生一个实时流,然后将其推送到应用中去。

流引擎实现原理

前面讲解了 TDengine 的流引擎是什么,以及数据订阅与流之间的关系,下面我们来看一下 TDengine 的流处理如何实现。当然,这里面细节很多,内容很杂,由于时间关系,我只会挑出 3 个最重要的部分,也是我觉得最有意思的部分来讲解,那就是“事件驱动”、“增量计算”与“乱序处理”。

事件驱动

我们前面已经讲过,流式计算和连续查询最大的区别之一在于,流式计算能够支持事件驱动,也就是每一条数据的到来都会触发计算,这样的特性让我们能够对标量函数进行计算,从而实现数据的清洗与预处理;并且能够对窗口聚合提供 AT_ONCE 触发模式,不再需要等待窗口关闭,从而支持会话窗口与状态窗口。事件驱动执行的承担者是 Stream Task,我们先来看一下 Stream Task 如何部署。

TDengine Database
标量计算、partition by tbname 聚合

第一个例子,是标量函数与 partition by tbname 的聚合,来自 source DB 的每一个 vnode 都各自的进行聚合,并且分发到 target DB 中,由 target DB 中的 Stream Task 负责将数据写入对应子表。

如图所示,流是可以跨越 DB 的,而不同 DB 代表不同数据保存生命周期,Source DB 的 3 个 Stream Task 代表着部署在其中的三个 vnode。在进行标量计算、partition by tbname 聚合时,数据可以不经过聚合节点,直接在 Source DB 的 vnode 里经过 Stream Task 完成聚合,再发送到 target DB。

TDengine Database

超级表聚合第二个例子,是一个分布式的聚合,将超级表中所有子表聚合到一起。它需要部署一个聚合 Stream Task,来汇总来自各个 vnode 的数据。在具体实现上,数据在源 vnode 处进行一级聚合,一级聚合的数据会被推送到二级节点进行二级聚合,而聚合的结果则根据 trigger 模式按需推送到 target DB。

那么,这两级聚合分别是指什么呢?在增量计算部分,我们会详细讲解。不过在此之前,我们先来放大看一看 Stream Task 内部的具体结构。

TDengine Database

每一个流都由多个 Stream Task 构成,而每个 Stream Task 都包含了一个 Input Queue 与 Output Queue。在执行时,流式计算框架会将 Output Queue 中的数据分发到下游的 Stream Task 中去,并通知流的执行调度器,调度空闲的流线程触发计算。

而 Stream Task 内部,具体计算的执行者是一系列有状态的流算子。在创建流时,SQL 被解析成语法树,planner 将语法树拆分成多个的 pipeline,而每个 pipeline 就是多个串联起来的流算子 。我们可以看到,从计划的层面来看,流计划的最大的区别在于去掉了 Exchange Operator,将所有的 pipeline 单机化,pipeline 与 pipeline 之间采用 push 模式进行数据交换。在 push 模式下,我们能够对语法树自底向上地执行,并逐级触发,这样不仅最大程度地减少了流执行过程中阻塞,并且减少了无效的执行调度,因为流不再需要当事件到来时首先调度起父节点的 Stream Task,向子节点的 Stream Task 拉取数据。

而流算子是有状态的算子,在 Stream Task 中有流的状态下存储后端,当内存中的状态数据过大时,会溢出到硬盘。

有状态增量计算

流计算根据函数的不同性质,可以分成很多种,比如 invertible、holistic 等等,这里我们只讨论对于 incremental 的计算是如何实现的。

TDengine Database

我们没有必要先引入一系列复杂的数学公式或代数结构,只需用一个最简单的计算平均值的例子来展示增量计算的过程。

对于左边的图,数据 1、2、3,平均值计算为 2,当新的数据到来时,假设为 4,那么 4 是无法与结果 2 进行增量计算的。如果要增量计算,那么我们需要提取出一个状态向量,记录数据的 Sum 与 Count。状态向量被维持在算子的状态存储中,当新的数据到来,新数据被直接映射成状态空间中的向量,而状态空间中的向量定义了合成运算,最终得到一个新的状态向量。当需要得到最终结果时,根据状态向量计算出最终结果,如上图的 10 / 4 = 2.5。

我们抽象一下上述过程,将上述的“将原始数据映射到状态空间中”定义为 Lift,“将状态空间中的向量合成”定义为 Combine,“将从状态向量中提取结果”定义为 Lower,就得到了增量计算的 3 个基本原语:Lift、Combine 与 Lower。状态向量占用的内存是恒定的,当数据被聚合之后会被释放,因此,内存的占用不再与数据量正相关,而只与开启的窗口数据相关,因此能够在实现大窗口下的高吞吐量的聚合,而不会导致内存的暴涨。

TDengine Database

对于上述过程以及实际常用函数如何拆分,大家可以参考 VLDB 2015 年 General incremental sliding-window aggregation [1] 这篇论文,以及一些后续的工作。

TDengine Database

这时,我们就能够明白前面讲到的分布式的两级聚合到底是什么了:

  • 批量插入的数据,会在数据插入的 vnode 首先执行 Lift 与 Combine 操作
  • 对于跨多个 vnode 的聚合,会在随机选择一个 vnode 部署聚合 stream task,将第一级聚合的 state 再次 combine
  • 根据 trigger 模式的不同按需执行 lower
  • 两级增量聚合降低了数据传输的量,将 CPU 密集的计算分散到各个节点中去

乱序处理

为了实现在乱序等多种场景下的正确性,TDengine 3.0 中的流式计算采用了以事件时间为基准的处理模式,而 Watermark 即是对于乱序容忍的上界,想要理解乱序数据的处理,我们首先需要了解 Watermark。

TDengine Database

在上图中,纵轴表示墙上时钟,即真实时间。横轴表示对应 T1、T2、T3 时刻到达 TDengine 中的数据。蓝色点表示最新插入的数据,Watermark 就是沿着这个时间轴往过去的方向去推移,用最后到达的事件时间减去 Watermark 时间,得到时间 T = latest event time – watermark 。所有结束时间早于 T 的窗口都会被关闭。这些窗口已经超出了乱序容忍的上界,我们认为它们不会再有数据插入,可以安全关闭。

触发模式是 WINDOW_CLOSE、MAX_DELAY 的数据这时会被推送。而在 AT_ONCE 模式下,窗口关闭与结果推送无关,只与内存释放有关,因为内存是有限的,而数据流是无界的。因此,对于 WINDOW CLOSE 或 MAX DELAY 触发模式,Watermark 的选择是结果的实时性与正确性之间的 trade-off。在数据可能有乱序的情况下,提前关闭窗口意味着还未聚合所有的结果,就推送了数据,而为了得到更多的正确性,往往就要牺牲实时性,这也就是将窗口的关闭根据 Watermark 来延迟。

而对于 AT ONCE 的触发模式,因为不会再有数据源源不断推送的问题,Watermark 更重要的功能是让窗口打开与关闭处在动态的平衡中,让“用有限的内存来处理无界的数据流与不断新增的窗口”成为可能。在实际状态存储上,TDengine 3.0 已经实现了内存与硬盘两级,超过内存的可以被外溢到硬盘中去,对于状态存储,后续我们还会进一步进行完善。

即使定义了 Watermark,对于乱序仍然超过 Watermark 的数据如何处理呢?我们提供了两种策略,直接丢弃或从 TSDB(Time-Series Database) 从拉取并重新计算,分别对应 IGNORE EXPIRED 1 与 IGNORE EXPIRED 0。不过从 TSDB 中拉取数据重新计算只适用于少量乱序的情况,因为它会带来处理速度的降低。

性能指标:单机百万吞吐,毫秒级延迟

虽然我将性能指标作为了今天演讲的副标题,它是一切新应用场景的基石。但性能指标又并不是一个特别值得看中的东西:我们希望用户最不用关心的就是性能,因为我们的性能能够满足绝大多数场景的需求。为此,我们想要验证的是,在一个普通的机器上、每秒百万行数据写入的情况下,TDengine 3.0 仍然可以做到毫秒级的延迟。

我们后续会将 benchmark 完善发布并让用户能更简单地使用并验证。

TDengine Database

我们的性能测试主要会去验证以下几个方面:

  • 测试流对写入性能的影响:对于有无流情况下的写入延迟与吞吐量
  • 测试流在大写入吞吐下的结果延迟
  • 分别验证标量函数、每个子表各自聚合、多 vgroup 超级表聚合等几个主要场景

标量函数变换

create stream perf_stream into perf_db2.output_streamtb as select ts,abs(c1),char_length(tbname),cast(c1 as binary(16)),timezone(),now from perf_db1.stb partition by tbname;
TDengine Database

在 100 个子表的条件下测试标量函数计算,有无流对写入吞吐的影响几乎不大。在每秒写入 200 万行数据的情况下,流的结果延迟大概在几毫秒;数据从客户端写入到 TSDB 到流式计算引擎算出最终结果的延迟,大约在几毫秒到十几毫秒。

超级表中每个子表各自聚合 : partition by tbname

create stream if not exists perf_stream trigger at_once into perf_db2.output_streamtb as select _wstart as start, min(c1),max(c2), sum(c3), avg(c0), count(c3), first(c0), last(c1), now from perf_db1.stb partition by tbname interval(1s);
TDengine Database

我们选择了 min、max、sum、avg、count、first、last 作为测试基准的聚合函数,设置滑动窗口的时间长度是一秒,分别测试了子表数目在 10、100、1k、10k、100k 几个节点的流结果延迟数据,跨度在几毫秒到十几毫秒。

多 vnode 超级表聚合

create stream if not exists perf_stream trigger at_once watermark 30000s into perf_db2.output_streamtb as select _wstart as start, min(c1),max(c2), sum(c3), avg(c0), count(c3), first(c0), last(c1), now from perf_db1.stb interval(1s);

taosBenchmark config: 子表数目 1k,timestamp_step = 110,线程数 100,batch_size = 100, interlace = 100(影响乱序程度)

TDengine Database

这种场景是典型的分布式的两级聚合。在分布式两级聚合操作中,要聚合不同子表中的写入数据,因此可能会存在一定的乱序,我们在 taosBenchmark 中设置了 interlace 参数来控制数据的乱序程度。watermark 设置为 30000s,这会在一定程度增大内存占用,但可以减少因为乱序触发的扫盘。我们可以看到,流结果的延迟仍然维持在几十毫秒。

这个基准测试验证了 3.0 流式计算引擎性能基本上可以达到要求,当然,目前 TDengine 的流式计算引擎还是一个年轻的引擎,我们仍然在做着大量的性能优化,以及更多实际场景的验证。

Roadmap

接下来 TDengine 3.0 流式计算引擎的优化工作将分为以下几方面:

  • 更全面的 SQL 支持:Join / Fill / Group by / 子查询等
  • 更完善的流状态管理,使用户在 AT ONCE 模式下不再需要关心 Watermark
  • 更灵活的 partition 机制:partition by column / 表达式
  • 多聚合节点;独立部署、存算分离的流式计算节点:SNODE
  • 可配置的 checkpoint
  • Benchmark 完善,端到端的延迟指标、P99 延迟指标

虽然列举了这些后续的工作,但真正决定流式计算处理引擎发展的,其实是 TDengine 的用户和社区开发者。我们希望大家都能真正用上 TDengine 3.0 的流式计算引擎,能在开源社区中给它贡献代码,我们也会多多聆听来自客户以及社区的实时反馈。

Reference

[1] Tangwongsan, K., Hirzel, M., Schneider, S., & Wu, K. L. (2015). General incremental sliding-window aggregation. Proceedings of the VLDB Endowment, 8(7), 702-713.