springboot接入influxdb

简介: springboot接入influxdb

转载请注明出处:

1.添加maven依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>2.14</version>
        </dependency>

2.spring boot 的application.yaml配置文件中添加influxdb的配置

influxdb:
  url: http://127.0.0.1:8086
  database: influx_db
  username: influx_db_user
  password: influx_db_pwd

  8086 为influxdb默认的端口

3.influxdb配置应用与service方法编写

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Service
public class InfluxDBService {
    @Value("${influxdb.username}")
    public String influxdbUserName;
    @Value("${influxdb.password}")
    public String influxdbPassword;
    @Value("${influxdb.url}")
    public String influxdbUrl;
    //数据库
    @Value("${influxdb.database}")
    public String influxdbDatabase;
    @PostConstruct
    public void initInfluxDb() {
        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
        this.influxDB = influxDbBuild();
    }
    //保留策略
    private String retentionPolicy;
    private InfluxDB influxDB;
    /**
     * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
     * 表示 设为默认的策略
     */
    public void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "defalut", influxdbDatabase, "30d", 1);
        this.query(command);
    }
    /**
     * 连接时序数据库;获得InfluxDB
     **/
    private InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUserName, influxdbPassword);
            influxDB.setDatabase(influxdbDatabase);
        }
        return influxDB;
    }
    /**
     * 插入
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxdbDatabase, "", builder.build());
    }
    /**
     * @desc 插入,带时间time
     * @date 2021/3/27
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insert(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(influxdbDatabase, "", builder.build());
    }
    /**
     * @desc influxDB开启UDP功能,默认端口:8089,默认数据库:udp,没提供代码传数据库功能接口
     * @date 2021/3/13
     *@param measurement
     *@param time
     *@param tags
     *@param fields
     * @return void
     */
    public void insertUDP(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
        influxDbBuild();
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time, TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        int udpPort = 8089;
        influxDB.write(udpPort,  builder.build());
    }
    /**
     * 查询
     * @param command 查询语句
     * @return
     */
    public QueryResult query(String command) {
        influxDbBuild();
        return influxDB.query(new Query(command, influxdbDatabase));
    }
    /**
     * @desc 查询结果处理
     * @date 2021/5/12
     *@param queryResult
     */
    public List<Map<String, Object>> queryResultProcess(QueryResult queryResult) {
        List<Map<String, Object>> mapList = new ArrayList<>();
        List<QueryResult.Result> resultList =  queryResult.getResults();
        //把查询出的结果集转换成对应的实体对象,聚合成list
        for(QueryResult.Result query : resultList){
            List<QueryResult.Series> seriesList = query.getSeries();
            if(seriesList != null && seriesList.size() != 0) {
                for(QueryResult.Series series : seriesList){
                    List<String> columns = series.getColumns();
                    String[] keys =  columns.toArray(new String[columns.size()]);
                    List<List<Object>> values = series.getValues();
                    if(values != null && values.size() != 0) {
                        for(List<Object> value : values){
                            Map<String, Object> map = new HashMap(keys.length);
                            for (int i = 0; i < keys.length; i++) {
                                map.put(keys[i], value.get(i));
                            }
                            mapList.add(map);
                        }
                    }
                }
            }
        }
        return mapList;
    }
    /**
     * @desc InfluxDB 查询 count总条数
     * @date 2021/4/8
     */
    public long countResultProcess(QueryResult queryResult) {
        long count = 0;
        List<Map<String, Object>> list = queryResultProcess(queryResult);
        if(list != null && list.size() != 0) {
            Map<String, Object> map = list.get(0);
            double num = (Double)map.get("count");
            count = new Double(num).longValue();
        }
        return count;
    }
    /**
     * 查询
     * @param dbName 创建数据库
     * @return
     */
    public void createDB(String dbName) {
        influxDbBuild();
        influxDB.createDatabase(dbName);
    }
    /**
     * 批量写入测点
     *
     * @param batchPoints
     */
    public void batchInsert(BatchPoints batchPoints) {
        influxDbBuild();
        influxDB.write(batchPoints);
    }
    /**
     * 批量写入数据
     *
     * @param database  数据库
     * @param retentionPolicy 保存策略
     * @param consistency   一致性
     * @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
     */
    public void batchInsert(final String database, final String retentionPolicy,
                            final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(database, retentionPolicy, consistency, records);
    }
    /**
     * @desc 批量写入数据
     * @date 2021/3/19
     *@param consistency
     *@param records
     */
    public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
        influxDbBuild();
        influxDB.write(influxdbDatabase, "", consistency, records);
    }
}

4.进行单元测试验证

@SpringBootTest
public class InfluxDbTest {
    @Autowired
    private InfluxDBService influxDBService;
    @Test
    void contextLoads() {
    }
    @Test
    public void testSave(){
        String measurement = "host_cpu_usage_total";
        Map<String,String> tags = new HashMap<>();
        tags.put("host_name","host2");
        tags.put("cpu_core","core0");
        Map<String, Object> fields = new HashMap<>();
        fields.put("cpu_usage",0.22);
        fields.put("cpu_idle",0.56);
        influxDBService.insert(measurement, tags, fields);
    }
}

  influxdb 在执行新增测试用例的时候,如果influxdb的数据库中对应的表不存在,会自动创建数据库表

5.查询示例

 

 

 

 

 


标签: influxdb

目录
相关文章
|
7月前
|
存储 传感器 Java
整合Spring Boot和InfluxDB实现时序数据存储
整合Spring Boot和InfluxDB实现时序数据存储
|
7月前
|
Java API 时序数据库
springboot如何配置influxdb
【6月更文挑战第24天】springboot如何配置influxdb
391 0
|
8月前
|
存储 Java 数据库
SpringBoot整合InfluxDB
SpringBoot整合InfluxDB
234 0
|
8月前
|
Java API 时序数据库
InfluxData【付诸实践 02】SpringBoot 集成时序数据库 InfluxDB 应用分享(InfluxDB实例+Feign接口调用InfluxDB API)源码分享
InfluxData【付诸实践 02】SpringBoot 集成时序数据库 InfluxDB 应用分享(InfluxDB实例+Feign接口调用InfluxDB API)源码分享
186 0
|
存储 SQL Prometheus
彻底搞懂时序数据库InfluxDB,在SpringBoot整合InfluxDB
之前介绍了运维监控系统Prometheus,然后就有同鞋问我关于时序数据库的情况,所以这里总结一下时序数据库,并以InfluxDB为例,介绍时序数据库的功能特性和使用方式,希望能对大家有所帮助。
12988 4
彻底搞懂时序数据库InfluxDB,在SpringBoot整合InfluxDB
|
SQL 存储 数据采集
SpringBoot整合TICK(Telegraf+InfluxDB+Chronograf +Kapacitor)监控系列之一:InfluxDB
TICK各个模块说明如下所示: T(Telegraf):服务监控数据采集,包括服务器CPU、内存、IO、进程状态、服务状态等等; I(InfluxDB):时序型数据库,存储Telegraf采集的监控数据,每条数据都会有time序列; C(Chronograf):时间序列数据可视化展示; K(Kapacitor):可以按照预先编写好的规则,实时地订阅influxDB数据或者批量查询数据,并进行告警。
SpringBoot整合TICK(Telegraf+InfluxDB+Chronograf +Kapacitor)监控系列之一:InfluxDB
|
存储 缓存 监控
Spring Boot 2.x基础教程:使用时序数据库InfluxDB
Spring Boot 2.x基础教程:使用时序数据库InfluxDB
1173 0
|
存储 监控 Java
SpringBoot 2.0 + InfluxDB+ Sentinel 实时监控数据存储
前言 阿里巴巴提供的控制台只是用于演示 Sentinel 的基本能力和工作流程,并没有依赖生产环境中所必需的组件,比如持久化的后端数据库、可靠的配置中心等。目前 Sentinel 采用内存态的方式存储监控和规则数据,监控最长存储时间为 5 分钟,控制台重启后数据丢失。
2187 0
|
监控 Java 数据库
spring boot +RabbitMQ +InfluxDB+Grafara监控实践
本文需要有相关spring boot 或spring cloud 相关微服务框架的基础,如果您具备相关基础可以很容易的实现下述过程!!!!!!!   希望本文的所说对需要的您有所帮助   从这里我们开始进入闲聊阶段。
2864 0
|
3月前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,包括版本兼容性、安全性、性能调优等方面。
208 1
下一篇
开通oss服务