背景
物联网时序场景是目前最火热的方向之一。海量的时序数据如汽车轨迹数据、汽车状态监控数据、传感器实时监控数据需要存放进入数据库。一般这类场景下存在如下需求
- 数据高写入,低读取
- 需要对写入数据进行基础的图表展示
- 对写入数据进行聚合分析
传统的关系型数据库并不适合此类场景,时序数据库脱颖而出。表格存储时序实例支持时序数据的存储,其具有如下特点:
- Serverless,分布式,低成本
- 高写入支持
- 优秀的索引能力对数据读取数据分析提供了保障。
本文将以车联网为例介绍如何将 Kafka 中的时序数据写入表格存储并进行读取查询。
常见架构
常见的时序 IOT 场景架构如下。
设备如车辆、各类传感器等在物联网平台上进行注册、登录、消息发布等操作。这些平台会基于对应事件、主题对消息进行 ETL 处理并转发。一般情况下消息会被写入如Kafak的消息队列等待消费。如果有流式数据处理计算的需求,那么会由Flink来消费 Kafka 中的数据。而如果需要直接将这些数据储存,则可以直接利用 Kafka 时序的 sink connector 将数据写入 Tablestore 中的时序表中。而我们可以利用 Tablestore 自身提供的 SQL 能力、索引能力,对这部分时序数据进行展示、分析。
Kafka-connect-tablestore说明
Kafka-connect-tablestore 是阿里云 Tablestore 团队开源出的 Kafka sink connector 组件。它包含 Kafka 官方包中 SinkConnector 接口的的一个实现,支持将 Kafka 数据导入表格存储。该组件既支持写入 Tablestore 普通表也支持通过配置,可以将 Json 数据形式的时序数据导入表格存储中的时序表中。Json 中的字段可以通过配置映射成为表格存储时序表中的字段。
可以根据配置将其各字段映射到时序表中的字段中,给出一个示例 Json,映射如下。
{
"m": "vehicle", // 通过配置映射为measurement,表示一个度量类别,一个时序表可以存储多种度量类别的数据。
"d": "vehicle01", // 通过配置映射为data source,表示一个数据源的id,这里使用车辆id
"region": "shanghai", // 通过配置映射为tags中的一个标签,标签的key为region
"timestamp": 1638868699090, // 映射为_time字段,记录的时间
"speed": 55, // 通过配置映射为一个field
"temperature": "20" // 通过配置映射为一个field
}
开源项目地址见github地址。
下面将以车联网场景为例,演示下如果使用 Kafka + Tablestore 完成时序数据的入库和分析展示工作。
车联网场景测试
场景说明
车联网是 IOT 领域下的一个典型场景,车辆在行程中实时上报位置、里程等信息。下面我们将模拟车辆向 Kafka 实时上传时序数据,然后由 Kakfa 的 Tablestore connector 将数据写入时序表,最后在 Tablestore 中对数据进行查看和分析。
数据结构
车联网可以关注的参数有很多,位置、温度、油量、速度等等。这里测试我们选取如下几个参数温度、地理位置、总里程数、实时速度进行上传。_data_source用来记录设备标识,在车联网场景中,使用车辆 id 填入 _data_source 字段。_tag 字段中记录行程 id,这样可以区分同一辆汽车的不同行程。
参数字段 |
说明 |
Tablestore 时序表中对应数据 |
measurement |
记录类型,本例中使用" vehicle"作为这个参数的值 |
_m_name |
vehicle |
车辆id |
_data_source |
tripId |
行程id |
_tag中的tripId字段 |
timestamp |
当前时间戳 |
_time |
temperature |
车内温度 |
field中的数据 |
location |
地理位置经纬度,格式为"x,y" |
field中的数据 |
miles |
总里程 |
field中的数据 |
speed |
速度 |
field中的数据 |
参数配置
按照表格存储官网上 Kafka 时序 Connector的使用说明进行 Kafka 部署、表格存储开通、配置等工作。
基于以上数据结构,我们对 Kafka Connector 配置如下:
# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector
# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx
table.name.format=<topic>
# 是否自动创建目标表,默认值为false。
auto.create=true
runtime.error.tolerance=all
runtime.error.mode=ignore
# connector工作模式,默认为normal
tablestore.mode=timeseries
# 时序表主键字段映射
tablestore.timeseries.test.measurement=measurement
tablestore.timeseries.test.dataSource=vehicle
tablestore.timeseries.test.tags=tripId
# 时序表时间字段映射
tablestore.timeseries.test.time=timestamp
tablestore.timeseries.test.time.unit=MILLISECONDS
#field字段类型配置
tablestore.timeseries.test.field.name=temperature,location,miles,speed
tablestore.timeseries.test.field.type=double,string,double,double
写入程序
使用 Java 程序模拟向 Kafka 中写入 Json 数据,进而写入 Tablestore。程序启动 100 个任务,每个任务每秒钟生成一条记录进行上报,模拟 100 辆汽车每秒上报一条时序数据。
连接 Kafak 代码如下:
public void init() {
Properties properties = new Properties();
//broker的地址清单,建议至少填写两个,避免宕机
properties.put("bootstrap.servers", "#########:9092");
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16);
properties.put("linger.ms", 0);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
}
每秒上报数据代码:
public void upload() {
int num = 100;
service = Executors.newScheduledThreadPool(num);
final Map<String, AutomobileBean> preBeanMap = new ConcurrentHashMap<>();
final Map<String, NextMove> moveMap = new ConcurrentHashMap<>();
for (int i = 0; i < num; i++) {
final int j = i;
double oilConsumption = r.nextDouble() * 0.07 + 0.05; // 油耗
int speed = r.nextInt(80) + 40;
service.scheduleAtFixedRate(()->{
try {
VehicleRecord msg = getVehicleRecord("vehicle" + j, "" + j, speed, oilConsumption, preBeanMap, moveMap);
String jsonStr = map.writeValueAsString(msg);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", jsonStr));
future.get();
System.out.println("Sent:" + jsonStr);
} catch (Exception e) {
e.printStackTrace();
}
},1,1, TimeUnit.SECONDS);
}
}
控制台查询
表数据查询
可以直接通过控制台中的界面读取写入的时序数据。测试中,模拟了 100 辆汽车上报数据。在控制台选择一条时间线,点击查询数据,进一步点击查询,可以看到写入的数据以列表的形式展示。
SQL 查询
表格存储具备强大的 SQL 能力,可以支持用户使用 SQL 直接筛选、分析数据。
执行以下 SQL,拿到 tripId 为1且记录项为总里程的数据,按时间倒序排列。
select count(*) from test where tag_value_at(_tags, 'tripId')="1"
and _field_name = "miles"
order by _time desc limit 1000
可以在控制台的 SQL 页面看到执行结果如下。
执行以下SQL,统计在行程 id 为1的行驶过程中,共上报多少个位置记录。
select count(*) from test where tag_value_at(_tags, 'tripId')="1"
and _field_name = "location"
可以在控制台看到结果。
执行以下SQL,可以通过元数据表查看上报数据的车辆列表。
select distinct _data_source from `test::meta` limit 1000
可以看到如下结果
Grafana查询
表格存储同样支持通过 Grafana 查询时序表中的数据。
经过配置,我们可以在 Grafana 面板中选择需要查询的车辆,然后就可以看到其各个时序维度上如总里程、油量、温度、时速,数值随时间变化的曲线图。
表格存储的 SQL 支持时序元数据检索,我们利用这一能力在 Grafana 中增加一个车辆 ID 的变量,如下图左上角名称为 _data_source 的变量。切换这一变量,选择不同车辆 ID,可以看到不同车辆的数据。
总结
本文介绍了在物联网场景下使用 Tablestore 时序表存储、分析时序数据的方案。使用 Kafka 将海量时序数据导入 Tablestore 时序表中,利用 Tablestore Serverless、分布式、高写入等特点对数据进行存储,利用其多元索引能力、SQL 能力对数据进行展示、分析。文中以物联网中典型场景车联网为例,模拟了在这种架构下,车辆上报数据并对上报数据进行分析的过程。
希望本次分享对你的时序设计架构有所帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。