引入依赖
在项目的 pom.xml 文件中添加以下依赖,就像是往项目里添了一些"工具",这些工具可以帮我们更方便地使用 Spring Boot 和 InfluxDB。
<!-- Spring Boot Web Starter,用于构建 Web 应用 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Configuration Processor,用于处理配置文件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency> <!-- InfluxDB Java 客户端,用于连接和操作 InfluxDB 数据库 --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.23</version> </dependency>
创建连接信息类
下面的代码定义了一个 InfluxdbConfig 类,它用于存储连接 InfluxDB 数据库所需的信息,比如地址、用户名、密码等。
package world.xuewei.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties("spring.influx") public class InfluxdbConfig { /** * 请求路径:http://ip:port */ private String url; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 数据库 */ private String database; /** * 保留策略 */ private String retention; }
创建操作类
接下来是 InfluxDBTemplate 类,这是一个用于操作 InfluxDB 的工具类。比方说,你可以往数据库里写入数据、查询数据,还能够创建或删除数据库。这个类就像是一个帮你和 InfluxDB 打交道的小助手。
package world.xuewei.component; import world.xuewei.config.InfluxdbConfig; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.springframework.beans.BeanWrapperImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.util.ObjectUtils; import java.sql.Timestamp; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @Configuration public class InfluxDBTemplate { private final InfluxdbConfig influxdbConfig; private InfluxDB influxDB; @Autowired public InfluxDBTemplate(InfluxdbConfig influxdbConfig) { this.influxdbConfig = influxdbConfig; getInfluxDB(); } /** * 获取连接 */ public void getInfluxDB() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(influxdbConfig.getUrl(), influxdbConfig.getUsername(), influxdbConfig.getPassword()); // 设置使用数据库,保证库存在 influxDB.setDatabase(influxdbConfig.getDatabase()); // 设置数据库保留策略,保证策略存在 if (ObjectUtils.isEmpty(influxdbConfig.getRetention())) { influxDB.setRetentionPolicy(influxdbConfig.getRetention()); } } } /** * 关闭连接 */ public void close() { if (influxDB != null) { influxDB.close(); } } /** * 指定时间插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 * @param time 时间 * @param unit 单位 */ public void write(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit unit) { Point point = Point.measurement(measurement).tag(tags).fields(fields).time(time, unit).build(); influxDB.write(point); close(); } /** * 插入数据-自动生成时间 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void write(String measurement, Map<String, String> tags, Map<String, Object> fields) { write(measurement, tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 批量插入 * * @param points 批量记录 推荐 1000 条作为一个批 */ public void writeBatch(BatchPoints points) { influxDB.write(points); close(); } /** * 用来执行相关操作 * * @param command 执行命令 * @return 返回结果 */ public QueryResult query(String command) { return influxDB.query(new Query(command)); } /** * 创建数据库 * * @param name 库名 */ public void createDataBase(String name) { query("create database " + name); } /** * 删除数据库 * * @param name 库名 */ public void dropDataBase(String name) { query("drop database " + name); } /** * 查询返回指定对象 * * @param selectCommand select 语句 * @param clazz 类型 * @param <T> 泛型 * @return 结果 */ public <T> List<T> query(String selectCommand, Class<T> clazz) { return handleQueryResult(query(selectCommand), clazz); } /** * select 查询封装 * * @param queryResult 查询返回结果 * @param clazz 封装对象类型 * @param <T> 泛型 * @return 返回处理回收结果 */ public <T> List<T> handleQueryResult(QueryResult queryResult, Class<T> clazz) { // 定义保存结果集合 List<T> lists = new ArrayList<>(); // 获取结果 List<QueryResult.Result> results = queryResult.getResults(); // 遍历结果 results.forEach(result -> { // 获取 series List<QueryResult.Series> seriesList = result.getSeries(); // 遍历 series seriesList.forEach(series -> { // 获取的所有列 List<String> columns = series.getColumns(); // 获取所有值 List<List<Object>> values = series.getValues(); // 遍历数据 获取结果 for (List<Object> value : values) { try { // 根据 clazz 进行封装 T instance = clazz.newInstance(); // 通过 spring 框架提供反射类进行处理 BeanWrapperImpl beanWrapper = new BeanWrapperImpl(instance); HashMap<String, Object> fields = new HashMap<>(); for (int j = 0; j < columns.size(); j++) { String column = columns.get(j); Object val = value.get(j); if ("time".equals(column)) { beanWrapper.setPropertyValue("time", Timestamp.from(ZonedDateTime.parse(String.valueOf(val)).toInstant()).getTime()); } else { // 保存当前列和值到 field map 中 // 注意: 返回结果无须在知道是 tags 还是 fields 认为就是字段和值 可以将所有字段作为 field 进行返回 fields.put(column, val); } } // 通过反射完成 fields 赋值操作 beanWrapper.setPropertyValue("fields", fields); lists.add(instance); } catch (InstantiationException | IllegalAccessException e) { throw new RuntimeException(e); } } }); }); return lists; } }
使用
最后,在你需要使用 InfluxDB 的地方,通过注入 InfluxDBTemplate 来方便地进行数据库的操作。
@Service public class DemoServiceImpl { @Autowired private final InfluxDBTemplate influxDBTemplate; // ... }
这就好像你有了一个小助手,可以帮你方便地与 InfluxDB 打交道,进行各种操作,比如写入数据、查询数据等。