InfluxDB在项目中的实践

简介: InfluxDB在实际项目中的使用步骤以及各种踩坑避雷。

1.业务背景

       项目中有业务数据需要实时读取,并且这些业务数据每秒产生上千条,而InfluxDB对海量的时序数据高性能读写有很好的支持,于是项目中使用InfluxDB支持该业务。先通过MQ订阅的方式将数据实时存储到InfluxDB,前端调用接口实时读取范围时间内的业务数据。


2.InfluxDB版本及依赖

 项目中使用的InfluxDB是2.1.1版本,引入的POM依赖为:

<dependency>

   <groupId>com.influxdb<groupId>

   <artifactId>influxdb-client-java<artifactId>

   <version>3.1.0<version>

<dependency>


3.准备工作

  因为InfluxDB2.0以上不支持SQL语法的连接方式,所以需要使用HTTP API的方式实现数据读写。首先将数据库(每个组织创建时都必须关联一个存储桶(bucket),所以每个组织都至少有一个bucket,bucket是存储时序数据的一个命名空间,相当于数据表)以及API Tokens(用户使用API token访问对应组织的数据)创建完成。


4、单例化InfluxDBClient

  项目开发初期并没有对InfluxDBClient进行单例化,而是每次写数据都会new一个新的client去获取getWriteApi,导致到后期出现大量写操作时造成内存溢出的问题,经过排查是RxNewThreadScheduler这个线程过多导致的。如下图:

1666331418539_11C25156-DB12-4ba9-A761-076AF28152A9.png

然后扒了一下getWriteApi()这个方法的源码,如下图所示:NewThreadScheduler 这个调度器,在真正执行工作的时候,会创建一个NewThreadWorker;当InfluxDB的大量写入数据时,InfluxDB所用的IO线程调度器RxJava,创建的线程池是几乎没有上限的,最终导致内存溢出,所以在使用完writeApi后及时close

1666331938867_260C78BC-4A99-466e-BA5B-79336A00C47F.png

image.png

单例InfluxDBClient 的代码如下:

OkHttpClient.Builder builder = new OkHttpClient.Builder();

       InfluxDBClientOptions build = InfluxDBClientOptions.builder()

               .okHttpClient(builder)

               .url("InfluxDB的连接地址")

               .authenticateToken("生成的API Token".toCharArray())

               .org("组织信息")

               .bucket("组织的存储桶")

               .build();

       InfluxDBClient client = InfluxDBClientFactory.create(build);

      //设置日志打印级别

       client.setLogLevel(LogLevel.BASIC);


5.读写API的使用

5.1.写操作

    最开始设计表结构的时候没有考虑太多,在表的Tag属性填充了数据值,刚开始没什么问题,数据也能正常写入,突然有一天报了max-values-per-tag limit exceeded这样的错误,经过查阅资料才知道在设计表结构的时候就应该考虑清楚,Tag所填充的数据应该是有范围的,是个能穷举完的集合,并且最多只能10万个,否则数据可能会丢失。

   上面的问题优化之后,几天之后又出现了新的问题,看打印的日志写操作特别慢,设置了InfluxDB写操作需要重试3次之后依然无法解决这个问题,后面才发现是填充数据的时候Field字段的Key和Tag有同样的要求,必须是个能穷举完的集合,这个修改好之后发现写操作快了一些,但是依然还是很慢,于是又改了Time这个属性,这个属性不设置的话会默认填充当前时间的时间戳,但是我们业务上是有时间戳这个属性的,我们需要把业务上的时间戳字段填充到Time里面,刚开始填充的是毫秒级别的,换成秒级别的之后,写操作就飞快了,这时InfluxDB的高性能的大量写入时序数据的效果才完美的体现出来。


批量写入数据的代码如下:

    List points = new ArrayList<>();

           list.forEach(data -> {

               Point point = Point

                       .measurement("表名,若数据库不存在该表会自动创建")

                       .time("时间戳", WritePrecision.S)

                       .addTag("属性名", "")

                       .addField("对应的属性值", "");

               points.add(point);

       WriteApi writeApi = client.getWriteApi();

       writeApi.writePoints(points);

       writeApi.close();

5.2.读操作

读数据的代码如下:

       String flux = String.format(

               "from(bucket: \"bucket名称\")\n" +

                       "    |> range(start: "+查询开始时间+" , stop : "+查询结束时间+" )\n" +

                       "    |> filter(fn: (r) => r._measurement == \""+表名+"\"   and r.XXX == \""+属性过滤条件+"\")\n"

       );

       List models = client.getQueryApi().query(flux,XXX.class);


6.其他

当然,InfluxDB的web页面也可以查询数据,如果查询数据时报如图所示的错误时,可以将查询语句中的  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)这一行去掉,就可以正常展示查询结果了。

image.png

image.png

相关文章
|
存储 NoSQL 关系型数据库
InfluxDB 通识篇
InfluxDB 通识篇
1803 0
|
存储 数据库 时序数据库
InfluxDB基本操作
InfluxDB基本操作
672 0
|
存储 数据采集 Java
InfluxDB 的学习笔记
在Java项目中实现InfluxDB的落地应用,主要包括添加InfluxDB的Java客户端依赖、创建数据库连接、执行数据的增删改查操作等步骤
746 2
|
5月前
|
存储 监控 关系型数据库
InfluxDB 时序数据的高效解决方案
InfluxDB 是一种专为时间序列数据优化的开源数据库,支持高效存储、检索和分析大量时序数据。它采用 Tag-Key-Value 模型,提供高性能写入与查询能力,适合监控系统、物联网设备数据及实时分析等场景。相比传统关系型数据库(如 MySQL),InfluxDB 针对时序数据进行了架构优化,具备无模式设计、自动数据管理及灵活扩展性等优势。本文通过 Go 语言代码实战展示了如何连接、写入和查询 InfluxDB 数据,并介绍了其核心概念与应用场景,助力开发者快速上手时序数据库开发。
1032 0
InfluxDB 时序数据的高效解决方案
|
存储 监控 关系型数据库
InfluxDB入门:基础概念解析
【4月更文挑战第30天】InfluxDB是开源时序数据库,擅长处理实时数据,常用于监控和分析。本文介绍了其基础概念:数据库(数据容器)、测量值(类似表)、字段(数据值)、标签(元数据)、时间戳和数据点。InfluxDB特性包括高性能写入、灵活查询(InfluxQL和Flux)、可扩展性及活跃社区支持。了解这些概念有助于更好地使用InfluxDB处理时间序列数据。
|
存储 监控 Java
InfluxDB时序数据库安装和使用
InfluxDB时序数据库安装和使用
926 2
|
存储 数据库 时序数据库
InfluxDB的安装与Python调用
InfluxDB是一个高性能的时序数据库(Time-Series Database, TSDB),用于存储和分析时间序列数据的开源数据库,它非常适合于处理大量的时间戳数据,如金融市场数据、IoT 设备数据、监控数据等,尤其适合处理大量的时序数据和高频数据。 主要特性有: • 内置HTTP接口,使用方便 • 数据可以打标记,查询可以很灵活 • 类SQL的查询语句 • 安装管理很简单,并且读写数据很高效 • 能够实时查询,数据在写入时被索引后就能够被立即查出
InfluxDB的安装与Python调用
|
存储 数据采集 监控
InfluxDB:开启你的高性能读写数据之旅!
InfluxDB:开启你的高性能读写数据之旅!
960 0
|
存储 缓存 数据库
InfluxDB性能优化:写入与查询调优
【4月更文挑战第30天】本文探讨了InfluxDB的性能优化,主要分为写入和查询调优。写入优化包括批量写入、调整写入缓冲区、数据压缩、shard配置优化和使用HTTP/2协议。查询优化涉及索引优化、查询语句调整、缓存管理、分区与分片策略及并发控制。根据实际需求应用这些策略,可有效提升InfluxDB的性能。
3203 1
|
存储 传感器 SQL
influxdb 中得 fields 与 tag 区别总结
influxdb 中得 fields 与 tag 区别总结
1181 1