1.业务背景
项目中有业务数据需要实时读取,并且这些业务数据每秒产生上千条,而InfluxDB对海量的时序数据高性能读写有很好的支持,于是项目中使用InfluxDB支持该业务。先通过MQ订阅的方式将数据实时存储到InfluxDB,前端调用接口实时读取范围时间内的业务数据。
2.InfluxDB版本及依赖
项目中使用的InfluxDB是2.1.1版本,引入的POM依赖为:
<dependency>
<groupId>com.influxdb<groupId>
<artifactId>influxdb-client-java<artifactId>
<version>3.1.0<version>
<dependency>
3.准备工作
因为InfluxDB2.0以上不支持SQL语法的连接方式,所以需要使用HTTP API的方式实现数据读写。首先将数据库(每个组织创建时都必须关联一个存储桶(bucket),所以每个组织都至少有一个bucket,bucket是存储时序数据的一个命名空间,相当于数据表)以及API Tokens(用户使用API token访问对应组织的数据)创建完成。
4、单例化InfluxDBClient
项目开发初期并没有对InfluxDBClient进行单例化,而是每次写数据都会new一个新的client去获取getWriteApi,导致到后期出现大量写操作时造成内存溢出的问题,经过排查是RxNewThreadScheduler这个线程过多导致的。如下图:
然后扒了一下getWriteApi()这个方法的源码,如下图所示:NewThreadScheduler 这个调度器,在真正执行工作的时候,会创建一个NewThreadWorker;当InfluxDB的大量写入数据时,InfluxDB所用的IO线程调度器RxJava,创建的线程池是几乎没有上限的,最终导致内存溢出,所以在使用完writeApi后及时close。
单例InfluxDBClient 的代码如下:
OkHttpClient.Builder builder = new OkHttpClient.Builder();
InfluxDBClientOptions build = InfluxDBClientOptions.builder()
.okHttpClient(builder)
.url("InfluxDB的连接地址")
.authenticateToken("生成的API Token".toCharArray())
.org("组织信息")
.bucket("组织的存储桶")
.build();
InfluxDBClient client = InfluxDBClientFactory.create(build);
//设置日志打印级别
client.setLogLevel(LogLevel.BASIC);
5.读写API的使用
5.1.写操作
最开始设计表结构的时候没有考虑太多,在表的Tag属性填充了数据值,刚开始没什么问题,数据也能正常写入,突然有一天报了max-values-per-tag limit exceeded这样的错误,经过查阅资料才知道在设计表结构的时候就应该考虑清楚,Tag所填充的数据应该是有范围的,是个能穷举完的集合,并且最多只能10万个,否则数据可能会丢失。
上面的问题优化之后,几天之后又出现了新的问题,看打印的日志写操作特别慢,设置了InfluxDB写操作需要重试3次之后依然无法解决这个问题,后面才发现是填充数据的时候Field字段的Key和Tag有同样的要求,必须是个能穷举完的集合,这个修改好之后发现写操作快了一些,但是依然还是很慢,于是又改了Time这个属性,这个属性不设置的话会默认填充当前时间的时间戳,但是我们业务上是有时间戳这个属性的,我们需要把业务上的时间戳字段填充到Time里面,刚开始填充的是毫秒级别的,换成秒级别的之后,写操作就飞快了,这时InfluxDB的高性能的大量写入时序数据的效果才完美的体现出来。
批量写入数据的代码如下:
List points = new ArrayList<>();
list.forEach(data -> {
Point point = Point
.measurement("表名,若数据库不存在该表会自动创建")
.time("时间戳", WritePrecision.S)
.addTag("属性名", "")
.addField("对应的属性值", "");
points.add(point);
WriteApi writeApi = client.getWriteApi();
writeApi.writePoints(points);
writeApi.close();
5.2.读操作
读数据的代码如下:
String flux = String.format(
"from(bucket: \"bucket名称\")\n" +
" |> range(start: "+查询开始时间+" , stop : "+查询结束时间+" )\n" +
" |> filter(fn: (r) => r._measurement == \""+表名+"\" and r.XXX == \""+属性过滤条件+"\")\n"
);
List models = client.getQueryApi().query(flux,XXX.class);
6.其他
当然,InfluxDB的web页面也可以查询数据,如果查询数据时报如图所示的错误时,可以将查询语句中的 |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)这一行去掉,就可以正常展示查询结果了。