做工业互联网或物联网系统,最基本的需求是展示数据曲线,比如功率曲线,类似于股票的分时图,通常我们会取每分钟内该设备上报的最后一次功率值为这一分钟的功率,如果某一分钟内,设备没有上报,则取上一分钟的功率值,以此类推。举例如下:
得到的分钟曲线:
通常我们会把设备上报的数据先写入Apache Kafka。如果是离线计算场景,可能会考虑把数据写入Hive,然后使用Spark SQL定时读取Hive,再把计算结果写入HBase;如果是实时计算场景,则会使用Apache Flink消费Kafka数据,把结果写入HBase,这种情况下还需要考虑数据乱序和延迟投递计算等问题。
而且,基于传统大数据Hadoop的架构,需要搭建ZooKeeper和HDFS,然后才是Hive和HBase,整个体系维护成本很高。此外,HBase基于键值存储时序数据,会浪费很多空间在同一键值的数据设计架构上面。
以上所举,是物联网设备属性曲线计算场景的其中一个痛点,另外还需要考虑数据增长、数据核对以及数据容灾等特点。
笔者所在的公司,要基于3D打印技术给客户提供整体化解决方案,自然需要对设备的运行状态做持续追踪,需要存储设备的运行数据。这时候我们找到了开源的物联网大数据平台TDengine(https://github.com/taosdata/TDengine)。
参考TDengine Database的文档中SQL的写法,在数据齐全的情况下,可以轻松地用一句SQL解决上面的问题:
select last(val) a from super_table_xx where ts >= '2021-06-07 18:10:00' and ts <= '2021-06-07 18:20:00' interval(60s) fill(value, 0);
为什么类似的SQL,TDengine Database的执行效率可以如此之高呢?
这就在于它的超级表以及子表,针对单个设备的数据,TDengine设计了按照时间连续存储的特性。而事实上,业务系统在使用物联网数据的时候,无论是即时查询还是离线分析,存在读取单个设备的一个连续时间段数据的特点。
假设,我们要存储设备的温度与湿度,我们可以设计超级表如下:
create stable if not exists s_device (ts TIMESTAMP,
temperature double,
humidity double
) TAGS (device_sn BINARY(1000));
实际使用中,例如针对设备’d1’和’d2’的数据执行插入的SQL如下:
insert into s_device_d1 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d1') values (1623157875000, 35.34, 80.24);
insert into s_device_d2 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d2') values (1623157891000, 29.63, 79.48);
搜索设备’d1’某个时间段的数据,其SQL如下:
select * from s_device where device_sn = 'd1' and ts > 1623157871000 and ts < 1623157890000 ;
假设统计过去7天的平均温度曲线,每小时1个点:
select avg(temperature) temperature from s_device where device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime} interval(1h)
TDengine还提供了很多聚合函数,类似上面的计算1分钟连续曲线的last和fill,以及其他常用的sum和max等。
在和应用程序结合的过程中,我们选用MyBatis这种灵活易上手的ORM框架,例如,针对上面的数据表’s_device’,我们先定义entity :
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.sql.Timestamp;
/**
* @author: DaLuo
* @date: 2021/06/25
* @description:
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName(value = "s_device")
public class TestSuperDeviceEntity {
private Timestamp ts;
private Float temperature;
private Float humidity;
@TableField(value = "device_sn")
private String device_sn ;
}
再定义 mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hg.device.kafka.tdengine.entity.TestSuperDeviceEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.sql.Timestamp;
import java.util.List;
/**
* @author: DaLuo
* @date: 2021/06/25
* @description:
*/
@Mapper
public interface TestSuperDeviceMapper extends BaseMapper<TestSuperDeviceEntity> {
/**
* 单个插入
* @param entity
* @return
*/
@Insert({
"INSERT INTO 's_device_${entity.deviceSn}' (ts ,temperature, humidity ) ",
"USING s_device (device_sn) TAGS (#{entity.deviceSn}) ",
"VALUES (#{entity.ts}, #{entity.temperature}, #{entity.humidity})"
})
int insertOne(@Param(value = "entity") TestSuperDeviceEntity entity);
/**
* 批量插入
* @param entities
* @return
*/
@Insert({
"<script>",
"INSERT INTO ",
"<foreach collection='list' item='item' separator=' '>",
"'s_device_${item.deviceSn}' (ts ,temperature, humidity) USING s_device (device_sn) TAGS (#{item.deviceSn}) ",
"VALUES (#{item.ts}, #{item.temperature}, #{item.humidity})",
"</foreach>",
"</script>"
})
int batchInsert(@Param("list") List<TestSuperDeviceEntity> entities);
/**
* 查询过去一段时间范围的平均温度,每小时1个数据点
* @param deviceSn
* @param startTime inclusive
* @param endTime exclusive
* @return
*/
@Select("select avg(temperature) temperature from s_device where device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime} interval(1h)")
List<TempSevenDaysTemperature> selectSevenDaysTemperature(
@Param(value = "deviceSn") String deviceSn,
@Param(value = "startTime") long startTime,
@Param(value = "endTime") long endTime);
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
class TempSevenDaysTemperature {
private Timestamp ts;
private float temperature;
}
}
TDengine有一个很巧妙的设计,就是不用预先创建子表,所以我们可以很方便地利用’tag’标签作为子表名称的一部分,即时插入数据同时创建子表。
注意:考虑到跨时区的国际化特性,我们所有的时间存储查询交互,都是使用的时间戳,而非”yyyy-mm-dd hh:MM:ss”格式,因为数据存储涉及到应用程序时区,连接字符串时区,TDengine服务时区,使用”yyyy-mm-dd hh:MM:ss”格式容易导致时间存储的不准确性,而使用时间戳,长整型的数据格式则可以完美地避免此类问题。
Java使用TDengine JDBC-driver目前有两种方式:JDBC-JNI和JDBC-RESTful,前者在写入性能上更有优势。但是需要在应用程序运行的服务器上安装TDengine客户端驱动。
我们的应用程序用到了Kubernetes集群,程序是运行在Docker里面,为此我们制作了一个适合我们应用程序运行的镜像,例如基础镜像的Dockerfile如下所示:
FROM openjdk:8-jdk-oraclelinux7
COPY TDengine-client-2.0.16.0-Linux-x64.tar.gz /
RUN tar -xzvf /TDengine-client-2.0.16.0-Linux-x64.tar.gz && cd /TDengine-client-2.0.16.0 && pwd && ls && ./install_client.sh
build:
docker build -t tdengine-openjdk-8-runtime:2.0.16.0 -f Dockerfile .
引用程序镜像Dockerfile所示:
FROM tdengine-openjdk-8-runtime:2.0.16.0
ENV JAVA_OPTS="-Duser.timezone=Asia/Shanghai -Djava.security.egd=file:/dev/./urandom"
COPY app.jar /app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
这样我们的应用程序就可以调度在任意的K8s节点上了。
另外,我们的程序涉及到任务自动化调度,需要频繁地和设备下位机进行MQTT数据交互,比如,云端发送指令1000-“开始任务A”,下位机回复指令2000-“收到任务A”,把指令理解成设备,把指令序列号以及内容理解成它的属性,自然这种数据也是非常适合存储在TDengine时序数据库中的:
*************************** 1.row ***************************
ts: 2021-06-23 16:10:30.000
msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"start"}
device_sn: deviceA
kind: 1000
*************************** 2.row ***************************
ts: 2021-06-23 16:10:31.000
msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"}
device_sn: deviceA
kind: 2000
我们云端在和设备对接的过程中,频繁需要考究消息是否发送的问题,所以迫切需要对指令进行保存,从而在应用程序中新辟线程,专门订阅指令集消息,批量写入到TDengine数据库。
最后,TDengine Database还有一个超级表log.dn,里面保留了内存、CPU等使用信息,所以我们可以利用Grafana展示这些数据,为监控提供可靠的运营数据参照!
作者介绍:大罗,黑格智造架构师,主要从事云原生,大数据系统开发,曾参与国家示范级工业互联网系统建设等。