1.InfluxDB实例
1.1 依赖及配置
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
spring: influx: url: http://tcloud:18088 user: admin password: admin database: mydb retentionPolicy: default retentionPolicyTime: 30d
1.2 代码实现
配置类:
@Configuration public class InfluxDatabaseConfig { @Value("${spring.influx.url}") public String url; @Value("${spring.influx.user}") public String userName; @Value("${spring.influx.password}") public String password; @Value("${spring.influx.database}") public String database; @Value("${spring.influx.retentionPolicy}") public String retentionPolicy; @Value("${spring.influx.retentionPolicyTime}") public String retentionPolicyTime; }
工具类:
@Slf4j @Component public class InfluxDatabaseUtil { private String database; private String retentionPolicy; private String retentionPolicyTime; private InfluxDB influxdb; @Autowired private InfluxDatabaseConfig influxDatabaseConfig; @PostConstruct private void init() { // 保存策略 if (StringUtils.isEmpty(influxDatabaseConfig.retentionPolicy)) { this.retentionPolicy = "autogen"; } else { this.retentionPolicy = influxDatabaseConfig.retentionPolicy; } // 数据保存策略中数据保存时间 if (StringUtils.isEmpty(influxDatabaseConfig.retentionPolicyTime)) { this.retentionPolicyTime = "30d"; } else { this.retentionPolicyTime = influxDatabaseConfig.retentionPolicyTime; } if (StringUtils.isEmpty(influxDatabaseConfig.database)) { // 创建数据库并赋值 System.out.println("创建数据库!"); } this.database = influxDatabaseConfig.database; // 初始化 InfluxDB 实例 initInfluxDatabase(); // 数据库设置保存策略 createRetentionPolicy(); } /** * 初始化数据库连接 */ private void initInfluxDatabase() { influxdb = InfluxDBFactory.connect(influxDatabaseConfig.url, influxDatabaseConfig.userName, influxDatabaseConfig.password); influxdb.setDatabase(database); } /** * 设置数据保存策略 * default 策略名 * database 数据库名 * 30d 数据保存时限30天 * 1 副本个数为1 * 结尾 DEFAULT 表示 设为默认的策略 */ private void createRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", retentionPolicy, database, retentionPolicyTime, 1); this.query(command); } /** * 查询 * * @param command 查询语句 * @return 查询结果 */ public QueryResult query(String command) { return influxdb.query(new Query(command, database)); } /** * 插入 * * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) { Point.Builder builder = Point.measurement(measurement); builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS); builder.tag(tags); builder.fields(fields); log.info("influxDB insert data:[{}]", builder.build().toString()); influxdb.write(database, "", builder.build()); } }
对象封装:
@Data @NoArgsConstructor @AllArgsConstructor public class Usage { private String time; private String serviceMethod; private String userId; private Integer count; private String url; }
测试类:
@SpringBootTest class InfluxDatabaseApplicationTests { @Autowired private InfluxDatabaseUtil influxDatabaseUtil; @Test void testInsert() throws InterruptedException { Map<String, String> tagsMap = new HashMap<>(); Map<String, Object> fieldsMap = new HashMap<>(); System.out.println("influxDB start time :" + System.currentTimeMillis()); int i = 0; do { Thread.sleep(100); tagsMap.put("user_id", String.valueOf(i % 10)); tagsMap.put("url", "http://www.baidu.com"); tagsMap.put("service_method", "testInsert" + (i % 5)); fieldsMap.put("count", i % 5); influxDatabaseUtil.insert("usage", tagsMap, fieldsMap); i++; } while (i < 50); } @Test void testQuery() { QueryResult query = influxDatabaseUtil.query("select * from usage limit 10"); ArrayList<Object> lists = new ArrayList<>(); query.getResults().forEach(result -> { result.getSeries().forEach(serie -> { List<List<Object>> values = serie.getValues(); List<String> columns = serie.getColumns(); lists.addAll(getQueryData(columns, values)); }); }); System.out.println(lists.toString()); } private List<Usage> getQueryData(List<String> columns, List<List<Object>> values) { List<Usage> lists = new ArrayList<>(); for (List<Object> list : values) { Usage info = new Usage(); BeanWrapperImpl bean = new BeanWrapperImpl(info); for (int i = 0; i < list.size(); i++) { String propertyName = setColumns(columns.get(i)); // 字段名 Object value = list.get(i); // 相应字段值 bean.setPropertyValue(propertyName, value); } lists.add(info); } return lists; } /*** 转义字段 ***/ private String setColumns(String column) { String[] cols = column.split("_"); StringBuffer sb = new StringBuffer(); for (int i = 0; i < cols.length; i++) { String col = cols[i].toLowerCase(); if (i != 0) { String start = col.substring(0, 1).toUpperCase(); String end = col.substring(1).toLowerCase(); col = start + end; } sb.append(col); } return sb.toString(); } }
2.Feign接口调用InfluxDB API
2.1 依赖及配置
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <version>2.2.6.RELEASE</version> </dependency>
ribbon: ReadTimeout: 5000 ConnectTimeout: 5000
2.2 代码实现
Use the InfluxDB API 文档有详细说明,以下是feign接口:
@FeignClient(name = "influxDbApi", url = "${spring.influx.url}") public interface InfluxDbApi { /** * 查询数据库 * * @param q 查询语句 * @param db 数据库 * @param u 用户名 * @param p 密码 * @return 查询结果 * @throws Exception 可能出现的异常 */ @PostMapping("/query") String queryInfo(@RequestParam String db, @RequestParam String u, @RequestParam String p, @RequestParam String q) throws Exception; }
调用测试类:
@Component public class ApiTesting { @Autowired private InfluxDbApi influxDbApi; String test() throws Exception { /* Map<String, Object> param = new HashMap<>(8); param.put("q","CREATE DATABASE testDB");*/ return influxDbApi.queryInfo("mydb", "admin", "admin", "select * from usage"); } }
测试接口:
@RestController @Slf4j public class ApiController { @Resource(name = "apiTesting") private ApiTesting apiTesting; @PostMapping("/test") public String get() throws Exception { log.info("------test------"); return apiTesting.test(); } }