基于 Tablestore 时序存储的物联网数据存储方案

简介: 背景物联网时序场景是目前最火热的方向之一。海量的时序数据如汽车轨迹数据、汽车状态监控数据、传感器实时监控数据需要存放进入数据库。一般这类场景下存在如下需求数据高写入,低读取需要对写入数据进行基础的图表展示对写入数据进行聚合分析传统的关系型数据库并不适合此类场景,时序数据库脱颖而出。表格存储时序实例支持时序数据的存储,其具有如下特点:Serverless,分布式,低成本高写入支持优秀的索引能力对数据

背景

物联网时序场景是目前最火热的方向之一。海量的时序数据如汽车轨迹数据、汽车状态监控数据、传感器实时监控数据需要存放进入数据库。一般这类场景下存在如下需求

  • 数据高写入,低读取
  • 需要对写入数据进行基础的图表展示
  • 对写入数据进行聚合分析

传统的关系型数据库并不适合此类场景,时序数据库脱颖而出。表格存储时序实例支持时序数据的存储,其具有如下特点:

  • Serverless,分布式,低成本
  • 高写入支持
  • 优秀的索引能力对数据读取数据分析提供了保障。

本文将以车联网为例介绍如何将 Kafka 中的时序数据写入表格存储并进行读取查询。

常见架构

常见的时序 IOT 场景架构如下。

设备如车辆、各类传感器等在物联网平台上进行注册、登录、消息发布等操作。这些平台会基于对应事件、主题对消息进行 ETL 处理并转发。一般情况下消息会被写入如Kafak的消息队列等待消费。如果有流式数据处理计算的需求,那么会由Flink来消费 Kafka 中的数据。而如果需要直接将这些数据储存,则可以直接利用 Kafka 时序的 sink connector 将数据写入 Tablestore 中的时序表中。而我们可以利用 Tablestore 自身提供的 SQL 能力、索引能力,对这部分时序数据进行展示、分析。

Kafka-connect-tablestore说明

Kafka-connect-tablestore 是阿里云 Tablestore 团队开源出的 Kafka sink connector 组件。它包含 Kafka 官方包中 SinkConnector 接口的的一个实现,支持将 Kafka 数据导入表格存储。该组件既支持写入 Tablestore 普通表也支持通过配置,可以将 Json 数据形式的时序数据导入表格存储中的时序表中。Json 中的字段可以通过配置映射成为表格存储时序表中的字段。

可以根据配置将其各字段映射到时序表中的字段中,给出一个示例 Json,映射如下。

{
    "m": "vehicle",               // 通过配置映射为measurement,表示一个度量类别,一个时序表可以存储多种度量类别的数据。
    "d": "vehicle01",             // 通过配置映射为data source,表示一个数据源的id,这里使用车辆id
    "region": "shanghai",         // 通过配置映射为tags中的一个标签,标签的key为region
    "timestamp": 1638868699090,   // 映射为_time字段,记录的时间
    "speed": 55,                  // 通过配置映射为一个field
    "temperature": "20"           // 通过配置映射为一个field
}

开源项目地址见github地址

下面将以车联网场景为例,演示下如果使用 Kafka + Tablestore 完成时序数据的入库和分析展示工作。

车联网场景测试

场景说明

车联网是 IOT 领域下的一个典型场景,车辆在行程中实时上报位置、里程等信息。下面我们将模拟车辆向 Kafka 实时上传时序数据,然后由 Kakfa 的 Tablestore connector 将数据写入时序表,最后在 Tablestore 中对数据进行查看和分析。

数据结构

车联网可以关注的参数有很多,位置、温度、油量、速度等等。这里测试我们选取如下几个参数温度、地理位置、总里程数、实时速度进行上传。_data_source用来记录设备标识,在车联网场景中,使用车辆 id 填入 _data_source 字段。_tag 字段中记录行程 id,这样可以区分同一辆汽车的不同行程。

参数字段

说明

Tablestore 时序表中对应数据

measurement

记录类型,本例中使用"

vehicle"作为这个参数的值

_m_name

vehicle 

车辆id

_data_source

tripId

行程id

_tag中的tripId字段

timestamp

当前时间戳

_time

temperature

车内温度

field中的数据

location

地理位置经纬度,格式为"x,y"

field中的数据

miles

总里程

field中的数据

speed

速度

field中的数据

参数配置

按照表格存储官网上 Kafka 时序 Connector的使用说明进行 Kafka 部署、表格存储开通、配置等工作。

基于以上数据结构,我们对 Kafka Connector 配置如下:

# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector

# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx

table.name.format=<topic>

# 是否自动创建目标表,默认值为false。
auto.create=true

runtime.error.tolerance=all
runtime.error.mode=ignore

# connector工作模式,默认为normal
tablestore.mode=timeseries
# 时序表主键字段映射
tablestore.timeseries.test.measurement=measurement
tablestore.timeseries.test.dataSource=vehicle
tablestore.timeseries.test.tags=tripId
# 时序表时间字段映射
tablestore.timeseries.test.time=timestamp
tablestore.timeseries.test.time.unit=MILLISECONDS
#field字段类型配置
tablestore.timeseries.test.field.name=temperature,location,miles,speed
tablestore.timeseries.test.field.type=double,string,double,double

写入程序

使用 Java 程序模拟向 Kafka 中写入 Json 数据,进而写入 Tablestore。程序启动 100 个任务,每个任务每秒钟生成一条记录进行上报,模拟 100 辆汽车每秒上报一条时序数据。

连接 Kafak 代码如下:

public void init() {

        Properties properties = new Properties();
        //broker的地址清单,建议至少填写两个,避免宕机
        properties.put("bootstrap.servers", "#########:9092");
        properties.put("acks", "all");
        properties.put("retries", 3);
        properties.put("batch.size", 16);
        properties.put("linger.ms", 0);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
    }

每秒上报数据代码:

   public void upload() {

       int num = 100;
       service = Executors.newScheduledThreadPool(num);

       final Map<String, AutomobileBean> preBeanMap = new ConcurrentHashMap<>();
       final Map<String, NextMove> moveMap = new ConcurrentHashMap<>();

       for (int i = 0; i < num; i++) {
           final int j = i;
           double oilConsumption =  r.nextDouble() * 0.07 + 0.05; // 油耗
           int speed = r.nextInt(80) + 40;
           service.scheduleAtFixedRate(()->{
               try {
                   VehicleRecord msg = getVehicleRecord("vehicle" + j, "" + j, speed, oilConsumption, preBeanMap, moveMap);
                   String jsonStr = map.writeValueAsString(msg);

                   Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", jsonStr));
                   future.get();
                   System.out.println("Sent:" + jsonStr);
               } catch (Exception e) {
                   e.printStackTrace();
               }
           },1,1, TimeUnit.SECONDS);
       }
   }

控制台查询

表数据查询

可以直接通过控制台中的界面读取写入的时序数据。测试中,模拟了 100 辆汽车上报数据。在控制台选择一条时间线,点击查询数据,进一步点击查询,可以看到写入的数据以列表的形式展示。

SQL 查询

表格存储具备强大的 SQL 能力,可以支持用户使用 SQL 直接筛选、分析数据。

执行以下 SQL,拿到 tripId 为1且记录项为总里程的数据,按时间倒序排列。

select count(*) from test where  tag_value_at(_tags, 'tripId')="1"  
and _field_name = "miles"
order by _time desc limit 1000

可以在控制台的 SQL 页面看到执行结果如下。

执行以下SQL,统计在行程 id 为1的行驶过程中,共上报多少个位置记录。

select count(*) from test where  tag_value_at(_tags, 'tripId')="1"  
and _field_name = "location"

可以在控制台看到结果。

执行以下SQL,可以通过元数据表查看上报数据的车辆列表。

select distinct _data_source from `test::meta` limit 1000

可以看到如下结果

Grafana查询

表格存储同样支持通过 Grafana 查询时序表中的数据。

经过配置,我们可以在 Grafana 面板中选择需要查询的车辆,然后就可以看到其各个时序维度上如总里程、油量、温度、时速,数值随时间变化的曲线图。

表格存储的 SQL 支持时序元数据检索,我们利用这一能力在 Grafana 中增加一个车辆 ID 的变量,如下图左上角名称为 _data_source 的变量。切换这一变量,选择不同车辆 ID,可以看到不同车辆的数据。

总结

本文介绍了在物联网场景下使用 Tablestore 时序表存储、分析时序数据的方案。使用 Kafka 将海量时序数据导入 Tablestore 时序表中,利用 Tablestore Serverless、分布式、高写入等特点对数据进行存储,利用其多元索引能力、SQL 能力对数据进行展示、分析。文中以物联网中典型场景车联网为例,模拟了在这种架构下,车辆上报数据并对上报数据进行分析的过程。

希望本次分享对你的时序设计架构有所帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。

目录
相关文章
|
4月前
|
监控 物联网 应用服务中间件
流媒体方案之Nginx——实现物联网视频监控项目
流媒体方案之Nginx——实现物联网视频监控项目
流媒体方案之Nginx——实现物联网视频监控项目
|
4月前
|
监控 物联网 编解码
流媒体方案之FFmpeg——实现物联网视频监控项目
流媒体方案之FFmpeg——实现物联网视频监控项目
流媒体方案之FFmpeg——实现物联网视频监控项目
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
718 1
|
6天前
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
【9月更文挑战第6天】随着物联网技术的发展,海量设备数据对实时存储和处理提出了更高要求。传统数据库在扩展性、性能及实时性方面面临挑战。阿里云推出的PolarDB具备高性能、高可靠及高扩展性特点,能有效应对这些挑战。它采用分布式存储架构,支持多副本写入优化、并行查询等技术,确保数据实时写入与查询;多副本存储架构和数据持久化存储机制保证了数据安全;支持动态调整数据库规模,适应设备和数据增长。通过API或SDK接入IoT设备,实现数据实时写入、分布式存储与高效查询,展现出在IoT数据存储领域的巨大潜力。
17 1
|
1月前
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
随着物联网技术的发展,海量设备数据对数据库提出实时高效存储处理的新要求。PolarDB作为阿里云的高性能云数据库,展现了其在IoT数据存储领域的潜力。面对IoT数据的规模、实时性和多样性挑战,PolarDB凭借分布式架构,实现了高性能、高可靠性和高扩展性,支持动态扩展和冷热数据分层存储,满足IoT数据实时写入、查询及管理需求,展现出广阔的应用前景。
46 1
|
4月前
|
编解码 监控 Ubuntu
MJPG-streamer方案实现物联网视频监控
MJPG-streamer方案实现物联网视频监控
MJPG-streamer方案实现物联网视频监控
|
4月前
|
存储 关系型数据库 物联网
【PolarDB开源】PolarDB在物联网(IoT)数据存储中的应用探索
【5月更文挑战第27天】PolarDB,阿里云的高性能云数据库,针对物联网(IoT)数据存储的挑战,如大规模数据、实时性及多样性,展现出高扩展性、高性能和高可靠性。它采用分布式架构,支持动态扩展,保证99.95%的高可用性,并能处理结构化、半结构化和非结构化数据。通过SDK实现数据实时写入,支持SQL查询和冷热数据分层,有效降低成本。随着IoT发展,PolarDB在该领域的应用将更加广泛。
170 1
|
存储 消息中间件 监控
Tablestore 物联网存储全面升级 -- 分析存储公测
物联网存储功能介绍随着物联网技术的快速发展,物联网已广泛应用于制造业、能源、建筑、医疗、交通、物流仓储等多个领域,物联网的应用能够有效节约资源、提高效率、保障安全以及降低成本,帮助各行业实现可持续发展目标。在物联网场景中根据数据特点进行分类,数据主要包括设备元数据、设备消息数据和设备时序数据三种类型,不同类型数据的存储需求不同。物联网场景中不同类型数据的存储核心需求如下:设备元数据:主要数据为设备
257 0
Tablestore 物联网存储全面升级 -- 分析存储公测
|
存储 人工智能 达摩院
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
带你读《云存储应用白皮书》之29:2. 物联网大数据存储解决方案
300 1
|
存储 Cloud Native 安全
物联网课程论文:《基于云原生的物联网端管云系统方案综述与演进设想》
这篇论文八千多字,主题是 云原生+物联网平台。花了几天心思,查了很多篇论文,因为自己对物联网通信的硬件方面不太会,所以还是选择写综述类的论文了,这篇论文感觉技术深度和广度比我上一篇计算机网络论文要更加深刻一点。
物联网课程论文:《基于云原生的物联网端管云系统方案综述与演进设想》

相关产品

  • 物联网平台