彻底搞懂时序数据库InfluxDB,在SpringBoot整合InfluxDB

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
可观测监控 Prometheus 版,每月50GB免费额度
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 之前介绍了运维监控系统Prometheus,然后就有同鞋问我关于时序数据库的情况,所以这里总结一下时序数据库,并以InfluxDB为例,介绍时序数据库的功能特性和使用方式,希望能对大家有所帮助。

之前介绍了运维监控系统Prometheus,然后就有同鞋问我关于时序数据库的情况,所以这里总结一下时序数据库,并以InfluxDB为例,介绍时序数据库的功能特性和使用方式,希望能对大家有所帮助。


一、时序数据库概述

1.1 什么是时序数据库

时序数据是一组按照时间维度索引的数据。时序数据在日常生活中随处可见,比如每个整点的温度、湿度等天气数据,每分钟的股票价格数据等。我们常用曲线图、柱状图等形式去展现时序数据,也就是我们常常听到的“数据可视化”。


时序数据库是一种非关系型数据库,以时间作为数据主键,专门用来存储时序数据。


1.2 时序数据库的特点

  • 高压缩比:由于数据每分每秒都在变化,海量的时序数据往往体量巨大,占用大量硬件资源,所以需要优化数据压缩算法提高数据压缩比。
  • 高并发写入:时序数据库采用持续高并发写入数据,无更新的方式,对于时间相同的重复的数据,只保留一份数据。
  • 低延时、高并发查询:通过索引降低查询延时,通过缓存等技术提高数据并发能力。


1.3 时序数据库的使用场景

  • IOT行业:电力、化工等工业物联网数据监测
  • 金融行业:各类金融产品及其衍生品、数字货币数据存储与量化研究
  • IT行业:服务器、虚拟机、容器等的状态数据实时监测
  • 互联网行业:用户行为轨迹,日志等数据。


目前比较流行的时序数据库有:InfluxDB、Prometheus、OpenTSDB、TDengine等,其中使用最广泛的当属InfluxDB,行业内应用最广泛。还有就是刚进入业内视野的国产时序数据库TDengine。而Prometheus则是Prometheus监控系统自带的数据库。


二、InfluxDB简介

2.1 什么是InfluxDB

InfluxDB 是一个用于存储和分析时间序列数据的开源数据库。由 Golang 语言编写,也是由 Golang 编写的软件中比较著名的一个,在很多 Golang 的沙龙或者文章中可能都会把 InfluxDB 当标杆来介绍,这也间接帮助 InfluxDB 提高了知名度。


2.2 InfluxDB的特性

  • 内置 HTTP 接口,使用方便
  • 数据可以打标记,这样查询可以很灵活
  • 类 SQL 的查询语句
  • 安装管理很简单,并且读写数据很高效
  • 能够实时查询,数据在写入时被索引后就能够被立即查出

在最新的 DB-ENGINES 给出的时间序列数据库的排名中,InfluxDB 高居第一位,可以预见,InfluxDB 会越来越得到广泛的使用。


2.3 InfluxDB几个基本概念

时序数据库由于其存储海量时序数据的特性,因此与传统数据库有些许不同,下面先对influxdb中涉及的基本概念作出解释。


influxdb数据库由database、measurement、point等三部分构成。分别对应关系数据库中的数据库、表、数据行。

  • database:数据库,同Mysql等关系型数据库中的“数据库Database”
  • measurement:数据表,相当于关系型数据库中的“表Table”
  • point:数据点,表示单条数据记录,相当于关系型数据库中的“一行数据”


概念 MySQL InfluxDB

数据库(同)

database database

表(不同)

table

measurement(测量; 度量)

列(不同) column Point,包括:tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)


2.4 Point数据构成

由于database和measurement与传统数据库基本相同,这里不做过多解释,以下针对influxdb中特有的Point进行讲解。

Point是InfluxDB中独有的概念,由时间(time)、数据(field)、标签(tags)三类字段组成。

(1)time:代表每条数据的时间字段,是measurement中的数据主键,因此time字段具有索引属性。一条point只能有一个time。

(2)field:代表各种数据的字段,例如气温、压力、股价等,field字段没有索引属性。一条point可以包括多个field。

(3)tag:代表各类非数据字段,例如设备编码、地区、姓名等,tag字段有索引属性。一条point可以包括多个tag。


例如:监控系统系统中,保存某个服务器的cpu和内存等资源使用情况,使用cpu_usage_total 的表名(measurement)保存数据。以下表示某一个point的样例数据:

image.png

其中time为time字段,记录数据产生的时间;cpu_usage和memory_usage分别代表CPU使用率和内存使用率,因此他们是field字段,真正的监控数据;cpu 和host代表CPU的名字和服务器IP,所以,他们是tag字段,用于查询和检索。



在使用和设计InfluxDB数据结构时,需要注意以下几点:

  • 1. tag 只能为字符串类型,可以加索引;
  • 2. field 类型无限制,不能加索引;
  • 3. InfluxDB不支持 join;
  • 4. InfluxDB支持连续查询操作(汇总统计数据):CONTINUOUS QUERY;



三、InfluxDB安装

InfluxDB安装非常简单,根据操作系统执行对应的安装命令即可。这里以window为例,演示如何安装InfluxDB。

 

3.1 下载

InfluxDB:https://dl.influxdata.com/influxdb/releases/influxdb-1.7.4_windows_amd64.zip  

 

chronograf :https://dl.influxdata.com/chronograf/releases/chronograf-1.7.8_windows_amd64.zip

chronograf为InfluxDB的Web后台管理端,InfluxDB提供了控制台命令端,如果使用不习惯,可以使用chronograf。


3.2 解压安装包

软件下载成功后,解压。

image.png

3.3 修改配置文件

InfluxDB 的数据存储主要有三个目录。默认情况下是 meta, wal 以及 data 三个目录,程序启动后会自动生成。

  • meta 用于存储数据库的一些元数据,meta 目录下有一个 meta.db 文件。
  • wal 目录存放预写日志文件,以 .wal 结尾。
  • data 目录存放实际存储的数据文件,以 .tsm 结尾。

接下来修改influxdb.conf 配置文件,修改以下部分的路径。

image.png

另外,InfluxDB服务默认端口为8086,如果需要更改端口号,则增加以下配置。

image.png


3.4 启动InfluxDB服务

配置文件修改完成后,接下来启动InfluxDB服务。直接运行Influxd.exe使用默认配置运行即可。如果需要使用自定义的配置文件,则指定conf文件进行启动,启动命令如下:

#先cmd 进入influxDB目录
influxd.exe -config influxdb.conf

看到如下输出,说明InfluxDB启动成功。

image.png


四、InfluxDB使用

InfluxQL是一种类似于SQL的查询语言,用于与InfluxDB进行交互。如果你使用过关系数据库及SQL,那么你可以很快速的掌握InfluxQL。但是,InfluxQL又不完全是SQL,缺乏SQL中的一些高级的语法,例如UNION,JOIN,HAVING等。


那么InfluxDB的到底如何操作呢?接下来介绍InfluxQL语言的使用方法。


4.1 连接InfluxDB服务

进入到InfluxDB目录后,在cmd中输入influx命令即可,命令如下:

# 使用Command命令行进入influxdb
influx -port 8086

如果使用的是默认配置,可以不需要加端口,直接influx即可。

image.png


4.2 操作InfluxDB

InfluxQL与SQL命令语法类似。接下来我们看一看InfluxQL 是怎么使用的?

4.2.1创建数据库

# 创建数据库
CREATE DATABASE weiz_tes
# 显示所有数据库
SHOW DATABASES
# 删除数据库
DROP DATABASE weiz_test
# 使用数据库
USE weiz_test


4.2.2 表操作

1.创建表

InfluxDB没有专门的创建表的命令,当插入一条数据point至某A表时,此A表会自动创建,并且表的格式、字段名、字段类型也由此条插入命令决定。




2.修改表

InfluxDB没有修改表的命令,但当插入一条新数据point至表A时,如果此point中的字段多于原A表的字段,会自动修改A表与此条插入数据的格式字段等一致。

注意:此种情况仅限于新插入的数据字段与表A字段的交集即表A的情况,如果新插入数据字段与表A完全不同则会插入失败。


3.查询表

# 显示该数据库中的表
SHOW MEASUREMENTS


4.删除表:

DROP MEASUREMENT "measurementName"


5.插入数据

insert host_cpu_usage_total,host_name=host1,cpu_core=core1 cpu_usage=0.26,cpu_idle=0.76

上面,我们新增一条数据,measurement为host_cpu_usage_total, tag为host_name,cpu_core, field为cpu_usage,cpu_idle


我们简单小结一下插入的语句写法:

  1. 基本格式:.insert + measurement + "," + tag=value,tag=value +空格+ field=value,field=value ;
  2. tag与tag之间用逗号分隔;field与field之间用逗号分隔;
  3. tag与field之间用空格分隔;
  4. tag都是string类型,不需要引号将value包裹;
  5. field如果是string类型,需要加引号;


6.查询数据

select * from host_cpu_usage_total

查询语句使用select 关键字,格式与mysql 基本一致。

image.png


4.2.3 用户管理

InfluxDB 默认管理员账号:admin,密码为空。我们可以新增用户和权限。命令如下:

#显示用户
show users
#创建用户
create user "username" with password 'password'
#创建管理员权限用户
create user "username" with password 'password' with all privileges
#删除用户
drop user "username"

以上是对InfluxDB数据库操作的基本总结,其他复杂的用法可以参考官网教程。官网教程地址:https://docs.influxdata.com/influxdb/v1.7/


五、SpringBoot整合InfluxDB

前面介绍了InfluxDB的基本安装和使用。接下来我们介绍SpringBoot项目如何整合InfluxDB,实现数据的增删改查。这里使用的Spring Boot版本为2.4.1。接下来看看如何实现的。

5.1 添加依赖

首先创建springboot项目spring-boot-starter-influxdb,并添加相关依赖,具体依赖如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>2.14</version>
        </dependency>


5.2 修改application.properties 配置

接下来修改application.properties 配置文件,增加InfluxDB的相关配置,具体如下:

#influxdb 配置
spring.influx.url=http://localhost:8086
spring.influx.user=admin
spring.influx.password=
spring.influx.database=weiz_test

上面配置的是InfluxDB数据库连接配置,默认url为:http://localhost:8086 ,数据库为之前创建的weiz_test数据库。用户名为admin,密码默认为空。


5.3 读取配置文件

创建InfluxDBConfig类,负责读取Influx的数据库连接配置。具体代码如下:

@Configuration
public class InfluxDBConfig {
    @Value("${spring.influx.user}")
    public String userName;
    @Value("${spring.influx.password}")
    public String password;
    @Value("${spring.influx.url}")
    public String url;
    //数据库
    @Value("${spring.influx.database}")
    public String database;
}


5.4 数据库操作类

创建数据库操作类InfluxDBService,负责数据库的初始化,增删改查等操作的具体实现,示例代码如下:

@Service
public class InfluxDBService {
    @Autowired
    private InfluxDBConfig influxDBConfig;
    @PostConstruct
    public void initInfluxDb() {
        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
        this.influxDB = influxDbBuild();
    }
    //保留策略
    private String retentionPolicy;
    private InfluxDB influxDB;
    /**
     * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
     * 表示 设为默认的策略
     */
    public void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "defalut", influxDBConfig.database, "30d", 1);
        this.query(command);
    }
    /**
     * 连接时序数据库;获得InfluxDB
     **/
    private InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(influxDBConfig.url, influxDBConfig.userName, influxDBConfig.password);
            influxDB.setDatabase(influxDBConfig.database);
        }
        return influxDB;
    }
    /**
     * 插入
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxDBConfig.database, "", builder.build());
    }
    /**
     * @desc 插入,带时间time
     * @date 2021/3/27
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insert(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxDBConfig.database, "", builder.build());
    }
    /**
     * @desc influxDB开启UDP功能,默认端口:8089,默认数据库:udp,没提供代码传数据库功能接口
     * @date 2021/3/13
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insertUDP(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        int udpPort = 8089;
        influxDB.write(udpPort,  builder.build());
    }
    /**
     * 查询
     * @param command 查询语句
     * @return
     */
    public QueryResult query(String command) {
        influxDbBuild();
        return influxDB.query(new Query(command, influxDBConfig.database));
    }
    /**
     * @desc 查询结果处理
     * @date 2021/5/12
     *@param queryResult
     */
    public List<Map<String, Object>> queryResultProcess(QueryResult queryResult) {
        List<Map<String, Object>> mapList = new ArrayList<>();
        List<QueryResult.Result> resultList =  queryResult.getResults();
        //把查询出的结果集转换成对应的实体对象,聚合成list
        for(QueryResult.Result query : resultList){
            List<QueryResult.Series> seriesList = query.getSeries();
            if(seriesList != null && seriesList.size() != 0) {
                for(QueryResult.Series series : seriesList){
                    List<String> columns = series.getColumns();
                    String[] keys =  columns.toArray(new String[columns.size()]);
                    List<List<Object>> values = series.getValues();
                    if(values != null && values.size() != 0) {
                        for(List<Object> value : values){
                            Map<String, Object> map = new HashMap(keys.length);
                            for (int i = 0; i < keys.length; i++) {
                                map.put(keys[i], value.get(i));
                            }
                            mapList.add(map);
                        }
                    }
                }
            }
        }
        return mapList;
    }
    /**
     * @desc InfluxDB 查询 count总条数
     * @date 2021/4/8
     */
    public long countResultProcess(QueryResult queryResult) {
        long count = 0;
        List<Map<String, Object>> list = queryResultProcess(queryResult);
        if(list != null && list.size() != 0) {
            Map<String, Object> map = list.get(0);
            double num = (Double)map.get("count");
            count = new Double(num).longValue();
        }
        return count;
    }
    /**
     * 查询
     * @param dbName 创建数据库
     * @return
     */
    public void createDB(String dbName) {
        influxDbBuild();
        influxDB.createDatabase(dbName);
    }
    /**
     * 批量写入测点
     *
     * @param batchPoints
     */
    public void batchInsert(BatchPoints batchPoints) {
        influxDbBuild();
        influxDB.write(batchPoints);
    }
    /**
     * 批量写入数据
     *
     * @param database  数据库
     * @param retentionPolicy 保存策略
     * @param consistency   一致性
     * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
     */
    public void batchInsert(final String database, final String retentionPolicy,
                            final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(database, retentionPolicy, consistency, records);
    }
    /**
     * @desc 批量写入数据
     * @date 2021/3/19
     *@param consistency
     *@param records
     */
    public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(influxDBConfig.database, "", consistency, records);
    }
}


5.5 测试验证

接下来,我们写几个单元测试,验证数据的增删改查等操作是否成功。单元测试代码如下:

@SpringBootTest
class Example01ApplicationTests {
    @Autowired
    private InfluxDBService influxDBService;
    @Test
    void contextLoads() {
    }
    @Test
    public void testSave(){
        String measurement = "host_cpu_usage_total";
        Map<String,String> tags = new HashMap<>();
        tags.put("host_name","host2");
        tags.put("cpu_core","core0");
        Map<String, Object> fields = new HashMap<>();
        fields.put("cpu_usage",0.22);
        fields.put("cpu_idle",0.56);
        influxDBService.insert(measurement, tags, fields);
    }
    @Test
    public void testGetdata(){
        String command = "select * from host_cpu_usage_total";
        QueryResult queryResult = influxDBService.query(command);
        List<Map<String, Object>> result =  influxDBService.queryResultProcess(queryResult);
        for (Map map: result) {
            System.out.println("time:"+ map.get("time")
                                +" host_name:" + map.get("host_name")
                                +" cpu_core:" + map.get("cpu_core")
                                +" cpu_usage:" + map.get("host_name")
                                +" cpu_idle:" + map.get("host_name"));
        }
    }
}


运行上面的新增和查询等单元测试,单击Run Test或在方法上右击,选择Run 'testSave' ,查看单元测试结果,运行结果如下图所示。image.png


image.png

单元测试运行成功,说明InfluxDB的增加和查询操作执行成功。



最后

以上,我们就把时序数据库InfluxDB介绍完了,并通过示例介绍了如何在SpringBoot项目中整合InfluxDB。示例代码也会同步上传:https://gitee.com/weizhong1988/spring-boot-starter 。如有疑问,请在下方留言!

InfluxDB在系统监控、物联网等方面的应用越来越多,希望大家能够熟练掌握。






相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
19天前
|
Java 数据库连接 测试技术
SpringBoot入门(4) - 添加内存数据库H2
SpringBoot入门(4) - 添加内存数据库H2
38 4
SpringBoot入门(4) - 添加内存数据库H2
|
21天前
|
Java 数据库连接 测试技术
SpringBoot入门(4) - 添加内存数据库H2
SpringBoot入门(4) - 添加内存数据库H2
29 2
SpringBoot入门(4) - 添加内存数据库H2
|
14天前
|
Java 数据库连接 测试技术
SpringBoot入门(4) - 添加内存数据库H2
SpringBoot入门(4) - 添加内存数据库H2
54 13
|
8天前
|
Java 数据库连接 测试技术
SpringBoot入门(4) - 添加内存数据库H2
SpringBoot入门(4) - 添加内存数据库H2
24 4
|
23天前
|
SQL Java 数据库
Spring Boot与Flyway:数据库版本控制的自动化实践
【10月更文挑战第19天】 在软件开发中,数据库的版本控制是一个至关重要的环节,它确保了数据库结构的一致性和项目的顺利迭代。Spring Boot结合Flyway提供了一种自动化的数据库版本控制解决方案,极大地简化了数据库迁移管理。本文将详细介绍如何使用Spring Boot和Flyway实现数据库版本的自动化控制。
23 2
|
10天前
|
存储 安全 Java
springboot当中ConfigurationProperties注解作用跟数据库存入有啥区别
`@ConfigurationProperties`注解和数据库存储配置信息各有优劣,适用于不同的应用场景。`@ConfigurationProperties`提供了类型安全和模块化的配置管理方式,适合静态和简单配置。而数据库存储配置信息提供了动态更新和集中管理的能力,适合需要频繁变化和集中管理的配置需求。在实际项目中,可以根据具体需求选择合适的配置管理方式,或者结合使用这两种方式,实现灵活高效的配置管理。
10 0
|
1月前
|
Java 关系型数据库 MySQL
springboot学习五:springboot整合Mybatis 连接 mysql数据库
这篇文章是关于如何使用Spring Boot整合MyBatis来连接MySQL数据库,并进行基本的增删改查操作的教程。
64 0
springboot学习五:springboot整合Mybatis 连接 mysql数据库
|
1月前
|
Java 关系型数据库 MySQL
springboot学习四:springboot链接mysql数据库,使用JdbcTemplate 操作mysql
这篇文章是关于如何使用Spring Boot框架通过JdbcTemplate操作MySQL数据库的教程。
24 0
springboot学习四:springboot链接mysql数据库,使用JdbcTemplate 操作mysql
|
1月前
|
SQL Java 关系型数据库
Springboot引入jpa来管理数据库
Springboot引入jpa来管理数据库
33 0
Springboot引入jpa来管理数据库
|
1月前
|
SQL Java 数据库连接
springBoot+Jpa(hibernate)数据库基本操作
springBoot+Jpa(hibernate)数据库基本操作
39 0

热门文章

最新文章