基于 Tablestore 时序存储的物联网数据存储方案

背景

物联网时序场景是目前最火热的方向之一。海量的时序数据如汽车轨迹数据、汽车状态监控数据、传感器实时监控数据需要存放进入数据库。一般这类场景下存在如下需求

  • 数据高写入,低读取
  • 需要对写入数据进行基础的图表展示
  • 对写入数据进行聚合分析

传统的关系型数据库并不适合此类场景,时序数据库脱颖而出。表格存储时序实例支持时序数据的存储,其具有如下特点:

  • Serverless,分布式,低成本
  • 高写入支持
  • 优秀的索引能力对数据读取数据分析提供了保障。

本文将以车联网为例介绍如何将 Kafka 中的时序数据写入表格存储并进行读取查询。

常见架构

常见的时序 IOT 场景架构如下。

基于 Tablestore 时序存储的物联网数据存储方案

设备如车辆、各类传感器等在物联网平台上进行注册、登录、消息发布等操作。这些平台会基于对应事件、主题对消息进行 ETL 处理并转发。一般情况下消息会被写入如Kafak的消息队列等待消费。如果有流式数据处理计算的需求,那么会由Flink来消费 Kafka 中的数据。而如果需要直接将这些数据储存,则可以直接利用 Kafka 时序的 sink connector 将数据写入 Tablestore 中的时序表中。而我们可以利用 Tablestore 自身提供的 SQL 能力、索引能力,对这部分时序数据进行展示、分析。

Kafka-connect-tablestore说明

Kafka-connect-tablestore 是阿里云 Tablestore 团队开源出的 Kafka sink connector 组件。它包含 Kafka 官方包中 SinkConnector 接口的的一个实现,支持将 Kafka 数据导入表格存储。该组件既支持写入 Tablestore 普通表也支持通过配置,可以将 Json 数据形式的时序数据导入表格存储中的时序表中。Json 中的字段可以通过配置映射成为表格存储时序表中的字段。

可以根据配置将其各字段映射到时序表中的字段中,给出一个示例 Json,映射如下。

{
    "m": "vehicle",               // 通过配置映射为measurement,表示一个度量类别,一个时序表可以存储多种度量类别的数据。
    "d": "vehicle01",             // 通过配置映射为data source,表示一个数据源的id,这里使用车辆id
    "region": "shanghai",         // 通过配置映射为tags中的一个标签,标签的key为region
    "timestamp": 1638868699090,   // 映射为_time字段,记录的时间
    "speed": 55,                  // 通过配置映射为一个field
    "temperature": "20"           // 通过配置映射为一个field
}

开源项目地址见github地址

下面将以车联网场景为例,演示下如果使用 Kafka + Tablestore 完成时序数据的入库和分析展示工作。

车联网场景测试

场景说明

车联网是 IOT 领域下的一个典型场景,车辆在行程中实时上报位置、里程等信息。下面我们将模拟车辆向 Kafka 实时上传时序数据,然后由 Kakfa 的 Tablestore connector 将数据写入时序表,最后在 Tablestore 中对数据进行查看和分析。

数据结构

车联网可以关注的参数有很多,位置、温度、油量、速度等等。这里测试我们选取如下几个参数温度、地理位置、总里程数、实时速度进行上传。_data_source用来记录设备标识,在车联网场景中,使用车辆 id 填入 _data_source 字段。_tag 字段中记录行程 id,这样可以区分同一辆汽车的不同行程。

参数字段

说明

Tablestore 时序表中对应数据

measurement

记录类型,本例中使用"

vehicle"作为这个参数的值

_m_name

vehicle 

车辆id

_data_source

tripId

行程id

_tag中的tripId字段

timestamp

当前时间戳

_time

temperature

车内温度

field中的数据

location

地理位置经纬度,格式为"x,y"

field中的数据

miles

总里程

field中的数据

speed

速度

field中的数据

参数配置

按照表格存储官网上 Kafka 时序 Connector的使用说明进行 Kafka 部署、表格存储开通、配置等工作。

基于以上数据结构,我们对 Kafka Connector 配置如下:

# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector

# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx

table.name.format=<topic>

# 是否自动创建目标表,默认值为false。
auto.create=true

runtime.error.tolerance=all
runtime.error.mode=ignore

# connector工作模式,默认为normal
tablestore.mode=timeseries
# 时序表主键字段映射
tablestore.timeseries.test.measurement=measurement
tablestore.timeseries.test.dataSource=vehicle
tablestore.timeseries.test.tags=tripId
# 时序表时间字段映射
tablestore.timeseries.test.time=timestamp
tablestore.timeseries.test.time.unit=MILLISECONDS
#field字段类型配置
tablestore.timeseries.test.field.name=temperature,location,miles,speed
tablestore.timeseries.test.field.type=double,string,double,double

写入程序

使用 Java 程序模拟向 Kafka 中写入 Json 数据,进而写入 Tablestore。程序启动 100 个任务,每个任务每秒钟生成一条记录进行上报,模拟 100 辆汽车每秒上报一条时序数据。

连接 Kafak 代码如下:

public void init() {

        Properties properties = new Properties();
        //broker的地址清单,建议至少填写两个,避免宕机
        properties.put("bootstrap.servers", "#########:9092");
        properties.put("acks", "all");
        properties.put("retries", 3);
        properties.put("batch.size", 16);
        properties.put("linger.ms", 0);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
    }

每秒上报数据代码:

   public void upload() {

       int num = 100;
       service = Executors.newScheduledThreadPool(num);

       final Map<String, AutomobileBean> preBeanMap = new ConcurrentHashMap<>();
       final Map<String, NextMove> moveMap = new ConcurrentHashMap<>();

       for (int i = 0; i < num; i++) {
           final int j = i;
           double oilConsumption =  r.nextDouble() * 0.07 + 0.05; // 油耗
           int speed = r.nextInt(80) + 40;
           service.scheduleAtFixedRate(()->{
               try {
                   VehicleRecord msg = getVehicleRecord("vehicle" + j, "" + j, speed, oilConsumption, preBeanMap, moveMap);
                   String jsonStr = map.writeValueAsString(msg);

                   Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", jsonStr));
                   future.get();
                   System.out.println("Sent:" + jsonStr);
               } catch (Exception e) {
                   e.printStackTrace();
               }
           },1,1, TimeUnit.SECONDS);
       }
   }

控制台查询

表数据查询

可以直接通过控制台中的界面读取写入的时序数据。测试中,模拟了 100 辆汽车上报数据。在控制台选择一条时间线,点击查询数据,进一步点击查询,可以看到写入的数据以列表的形式展示。

基于 Tablestore 时序存储的物联网数据存储方案

SQL 查询

表格存储具备强大的 SQL 能力,可以支持用户使用 SQL 直接筛选、分析数据。

执行以下 SQL,拿到 tripId 为1且记录项为总里程的数据,按时间倒序排列。

select count(*) from test where  tag_value_at(_tags, 'tripId')="1"  
and _field_name = "miles"
order by _time desc limit 1000

可以在控制台的 SQL 页面看到执行结果如下。

基于 Tablestore 时序存储的物联网数据存储方案

执行以下SQL,统计在行程 id 为1的行驶过程中,共上报多少个位置记录。

select count(*) from test where  tag_value_at(_tags, 'tripId')="1"  
and _field_name = "location"

可以在控制台看到结果。

基于 Tablestore 时序存储的物联网数据存储方案

执行以下SQL,可以通过元数据表查看上报数据的车辆列表。

select distinct _data_source from `test::meta` limit 1000

可以看到如下结果

基于 Tablestore 时序存储的物联网数据存储方案

Grafana查询

表格存储同样支持通过 Grafana 查询时序表中的数据。

经过配置,我们可以在 Grafana 面板中选择需要查询的车辆,然后就可以看到其各个时序维度上如总里程、油量、温度、时速,数值随时间变化的曲线图。

表格存储的 SQL 支持时序元数据检索,我们利用这一能力在 Grafana 中增加一个车辆 ID 的变量,如下图左上角名称为 _data_source 的变量。切换这一变量,选择不同车辆 ID,可以看到不同车辆的数据。

基于 Tablestore 时序存储的物联网数据存储方案

总结

本文介绍了在物联网场景下使用 Tablestore 时序表存储、分析时序数据的方案。使用 Kafka 将海量时序数据导入 Tablestore 时序表中,利用 Tablestore Serverless、分布式、高写入等特点对数据进行存储,利用其多元索引能力、SQL 能力对数据进行展示、分析。文中以物联网中典型场景车联网为例,模拟了在这种架构下,车辆上报数据并对上报数据进行分析的过程。

希望本次分享对你的时序设计架构有所帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。

基于 Tablestore 时序存储的物联网数据存储方案基于 Tablestore 时序存储的物联网数据存储方案

上一篇:函数计算搭建 Serverless Web 应用(四)- 三分钟搭建 Web 应用


下一篇:为什么我们说云原生时代,企业数字化转型更需要做好 API 全生命周期管理?