小T导读:天润融通是一家云呼叫中心服务商,其中CTI-Cloud为大量头部客户提供高效、稳定的呼叫中心服务。现在,天润通过T-Phone SDK将CTI-Cloud的功能延伸到移动端,为客户提供移动端的呼叫服务。
应用场景
在天润的T-Phone SDK中,我们需要采集WebRTC信息来进行数据的分析并作出优化的建议,所以需要将SDK中采集到的相关日志进行上报;为了精简日志上报的数据,我们只针对其中的传输数据每隔5秒在双方接通后上传,针对传输中网络的抖动和链接状态可以数据化展示,提供对每一通话的数据分析,以便在后续SDK演进中提供数据支撑;另外我们对每一通电话做了操作日志,记录了接口被调用的操作和时间,为用户在某一通电话的操作记录做还原,分析可能的误操作等,为客户提供更好的交互体验。
因为现在仍处于项目初期,我们更关心用户在某一个时间段内的使用情况,在大量使用的场景中是否仍然能保证较高的通话质量,同时我们应该尽可能做到对每个座席都可以进行分析,做到每一个座席都应该有自己的数据表。
举个例子:如果我们要查询企业7000001的座席9001 2020年2月14日12:00-12:10分的一通通话的WebRTC日志,如果没有按照座席进行分表,SQL语句应该是这样:
select log_time, audio_bytes_sent,... from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;
如果要提升查询的速度,我们首先要对device_id
和log_time
字段建立索引,但是当数据量比较大的时候,索引的存储也会是问题,所以要考虑分表(我们之前使用的数据库是aws的rds,所以没有分库的概念)
分表的选择有两种,按照时间分表或者按照座席分表。为什么我们要按照座席分表?如果按照时间分表,这样就会出现不同表的数据量差异过大,甚至存在某个表里没有数据的情况,因为很少有人半夜做外呼。但是我们也不能这样武断的不为半夜的时间段建立表,万一人打的是国际长途呢?但是一个座席不可能存在不外呼的情况,而且对于移动端的应用,我们在排查问题时更多是通过某个座席向我们反馈发生的问题,我们再针对这个座席进行排查,所以在查询的时候device_id
这个字段是必须要体现的,如果按照device_id
进行分表,我们在查询的时候就不再需要对这个字段建立索引了。因而选择按照座席进行分表。
如果要使用传统的数据库做分表,我们在插入数据之前一定要先判断这张表是否存在,同时我们还需要提前创建好这些表。这种步骤在我看来就显得很鸡肋。如果能有数据库可以做到在插入数据时指定表名,如果存在则插入,如果不存在则自动创建表,这样就方便多了。
日志上报的整体处理流程
整个流程需要T-Phone SDK,CTI-Cloud的Interface模块(CTI-Cloud对客户开放的接口)和日志上报模块相互协作
设计
考虑到日志上报的频率较高,对IO吞吐的要求比较高。我们可以通过全异步的方式进行数据的采集。这次使用了Vert.x作为全异步项目开发的工具。
在数据存储上基于以下几点考虑我们选择了TDengine Database:
1. 不管是WebRTC日志还是操作日志,都是按照时间产生的数据流。而TDengine正好是一个专门为物联网结构化数据流设计的时序数据库。
2. WebRTC日志和操作日志存储的数据格式都是一致的,但是如果要做到都每个使用的座席都可以进行分析,最好的方式是每个座席都能有一张自己的数据表。TDengine提供了超级表,在超级表中定义数据结构,并按照tag区分,只要在插入数据时指定表名即可做到分表。显然解决了上述的鸡肋问题。按照TDengine官网上的介绍:
为充分利用其数据的时序性和其他数据特点,TDengine要求对每个数据采集点单独建表。
其实我们的座席就相当于是一个独立的数据采集点,TDengine在我们的场景中是很贴合业务的。
3. 时间。时间也是我们在查询中重点关注的部分,在传统的数据库中,我们需要通过对字段建立索引来提升查询速度,可是我们仍然不想建立索引,因为索引仍需要占用存储空间,我们是否可以通过类似分表的方式来取代索引呢?答案是肯定的:
TDengine中写入的数据在硬盘上是按时间维度进行分片的。同一个vnode中的表在同一时间范围内的数据都存放在同一文件组中。这一数据分片方式可以大大简化数据在时间维度的查询,提高查询速度。在默认配置下,硬盘上的每个数据文件存放10天数据。用户可根据需要修改系统配置参数daysPerFile进行个性化配置。
4. 插入和查询的速度要快,稳定。
在我们的开发服务器上尝试了一下TDengine Database。和官网上介绍的出入不大,查询和存储速度确实很快,而且也不依赖其他文件系统,所以就使用TDengine作为这个模块的存储引擎。由于TDengine中对列有长度限制,最长4096,而且我们上报的字段比较多,所以尽量分配好每个字段的长度。
在数据的采集过程中,TPhone SDK不会直接和我们进行数据交互,而是会先将数据存储到SQS中,我们再从SQS中拉取数据,然后对数据处理后进行存储。
先来创建一个超级表,tdengine提供的超级表在我看来还是很方便的,我们可以直接利用超级表来做到自动的对数据进行分表存储。
create database aladdin;
use aladdin;
create table webrtc_log(
createTime timestamp,
deviceId binary(100),
audioBytesSent bigint,
audioBytesReceived bigint,
...
ssrcSendGoogCurrentDelayMs int,
ssrcSendGoogJitterBufferMs int
) tags (
deviceIdTag binary(100)
);
TDengine提供了非常多的连接方式,为了更好的配合Vertx进行异步存储,我们在这里使用了Rest方式进行数据库操作。
开始
在有了整体思路之后我们开始上手开发:
1. 应用配置:
{
"aws.region": "<your aws region>",
"aws.accessKey": "<your aws ak>",
"aws.secretAccessKey": "<your aws sk>",
"aladdin.maxPool": 100,
"aladdin.maxWaitQueue": 1500,
"aladdin.queue.name": ["queuename1","queuename2"],
"aladdin.cache.expireAfterWrite": 30,
"aladdin.cache.expireAfterAccess": 30,
"tdengine.host": "<your tdengine host>",
"tdengine.port": 6020,
"tdengine.user": "root",
"tdengine.password": "<your tdengine password>"
}
2. 重写Launcher
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianwj
* @since v1.0
*/
public class AladdinLauncher extends Launcher {
private static Configurer configurer = new Configurer();
private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class);
public static void main(String[] args) {
System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
new AladdinLauncher().dispatch(args);
}
@Override
public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
logger.info("Loading config starting...");
JsonObject config = configurer.load();
JsonObject local = deploymentOptions.getConfig();
if (!config.isEmpty()) { // 将consul配置注入到context中
local.mergeIn(config);
deploymentOptions.setConfig(local);
}
super.beforeDeployingVerticle(deploymentOptions);
logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());
}
@Override
public void afterConfigParsed(JsonObject config) {
logger.info("Loading local config complete, local config: {}", config.encodePrettily());
}
@Override
public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);
vertx.close();
}
}
其实写完第二步就可以知道这个配置文件存在不是必要的,我们使用了Consul作为配置中心来进行集中配置,这一步主要是为了注入consul的配置以及加载日志。
3. 拉取SQS中的数据
import com.amazonaws.AmazonServiceException;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.service.AwsSQSService;
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class DataCollectVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class);
private volatile boolean shutdown = false;
@Override
public void start() throws Exception {
logger.info("DataCollectVerticle starting...");
AwsSQSService sqsService = Configurer.sqsService();
EventBus bus = vertx.eventBus();
vertx.setPeriodic(1000, id -> {
try {
if (shutdown) {
vertx.cancelTimer(id);
}
JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);
List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0));
List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));
bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));
} catch (AmazonServiceException e) {
logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e);
}
});
}
@Override
public void stop() throws Exception {
shutdown = true;
logger.info("DataCollectVerticle closing...");
}
}
4. 将数据存储到TDengine中
import com.github.benmanes.caffeine.cache.Cache;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.DataOperator;
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.model.WebRTCLog;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SaveVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(SaveVerticle.class);
@Override
public void start() throws Exception {
logger.info("SaveVerticle starting....");
// 从event bus接收数据
EventBus bus = vertx.eventBus();
bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> {
JsonArray coming = new JsonArray(msg.body());
if (coming != null)
save(coming);
});
}
private void save(JsonArray array) {
WebClient client = Configurer.tdClient();
List<WebRTCLog> data = new ArrayList<>();
Cache<String, WebRTCLog> cache = Configurer.cache();
if (array.size() > 0) {
final WebRTCLog empty = new WebRTCLog();
for (int i = 0; i < array.size(); i++) {
String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();
try {
JsonObject json = DataOperator.toJsonObject(message);
WebRTCLog log = json.mapTo(WebRTCLog.class);
String cacheKey = log.getDeviceId();
WebRTCLog org = cache.get(cacheKey, k -> empty);
if (!Objects.equals(org, empty)) { // 如果不是第一次插入
DataOperator.merge(log, org);
}
cache.put(cacheKey, log);
data.add(log);
} catch (Exception e) {
logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);
}
}
client.post("/rest/sql")
.basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))
.sendBuffer(insert(data), ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
if (response != null) {
JsonObject res = response.bodyAsJsonObject();
if (!"succ".equals(res.getString("status"))) {
logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc"));
}
}
} else {
logger.error("data insert failed! {}", Json.encode(data), ar.cause());
}
});
}
}
private Buffer insert(WebRTCLog log) throws Exception {
String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +
" USING ALADDIN.WEBRTC_LOG TAGS(%s) " +
"VALUES(%s)";
String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));
return Buffer.buffer(sql);
}
private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";
for (WebRTCLog log : data) {
sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));
}
return Buffer.buffer(sqlBuilder.toString());
}
@Override
public void stop() throws Exception {
logger.info("SaveVerticle closing....");
}
}
5. 部署Verticle
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.verticle.DataCollectVerticle;
import com.tinet.twatch.aladdin.verticle.SaveVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClientOptions;
public class MainVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(MainVerticle.class);
@Override
public void start(Promise<Void> startPromise) throws Exception {
logger.info("MainVerticle starting...");
// 初始化sqs
String region = config().getString("aws.region");
String accessKey = config().getString("aws.accessKey");
String secretKey = config().getString("aws.secretAccessKey");
Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));
DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();
dataCollectDeploymentOptions.setInstances(1);
dataCollectDeploymentOptions.setConfig(config());
dataCollectDeploymentOptions.setWorker(true);
Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));
vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("DataCollectVerticle started!");
} else {
logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause());
}
});
// 初始化webclient
WebClientOptions options = new WebClientOptions();
options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));
options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));
options.setDefaultHost(config().getString("tdengine.host"));
options.setDefaultPort(config().getInteger("tdengine.port"));
Configurer.initTDClient(vertx, options);
DeploymentOptions saveDeploymentOptions = new DeploymentOptions();
saveDeploymentOptions.setInstances(1);
saveDeploymentOptions.setConfig(config());
vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("SaveVerticle started!");
} else {
logger.warn("SaveVerticle deploy failed!");
}
});
}
}
这样就快速实现了一个日志上报的模块,且多个实例部署时相互之间不会产生影响,当然在实际的生产环境中,我们需要考虑的会更多。
当然,日志上报只是开始。在之后的项目开发中,我还会继续向大家介绍TDengine Database在数据分析中的应用实践,感谢观看。
作者简介:钱文锦 ,天润融通基础研发部研发工程师,开源社区爱好者,目前主要负责天润融通T-Phone SDK/CTI-Cloud相关功能开发和应用。
本文首发于:http://blog.ti-net.com.cn/tdenginezwebrtcrzsbzdsj/?from=groupmessage&isappinstalled=0