package world.xuewei;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class InfluxConnTest {
private InfluxDB influxDB;
@Before
public void getConnection() {
influxDB = InfluxDBFactory.connect("http://ip:port", "username", "password");
}
@After
public void closeConnection() {
influxDB.close();
}
/**
* 库的基本操作
*/
@Test
public void testDataBase() {
// 创建一个库 create database 库名
influxDB.query(new Query("create database logs"));
// 使用一个库
influxDB.setDatabase("history");
// 查询存在那些库
QueryResult queryResult = influxDB.query(new Query("show databases"));
System.out.println(queryResult);
// 删除一个库
influxDB.query(new Query("drop database logs"));
// 判断库是否存在 需要使用 query 查询,查询之后再结果中进行判断处理
boolean logs = influxDB.databaseExists("logs"); // 已过时
QueryResult result = influxDB.query(new Query("show databases"));
List<QueryResult.Result> results = result.getResults();
results.forEach(res -> {
List<QueryResult.Series> series = res.getSeries();
for (QueryResult.Series value : series) {
List<List<Object>> values = value.getValues();
values.forEach(System.out::println);
}
});
}
/**
* 表的基本操作
*/
@Test
public void testMeasurement() {
// 查看所有表
QueryResult queryResult = influxDB.query(new Query("show measurements"));
System.out.println(queryResult);
// 删除一个表
influxDB.query(new Query("drop measurement test"));
}
/**
* 插入单条数据
*/
@Test
public void testInsert() {
influxDB.setDatabase("history");
Point point = Point.measurement("actual")
.tag("id", "10")
.tag("origin", "slave")
.addField("cpu", 25.0)
.addField("wind", 19.0)
.addField("power", 34.2)
.time(new Date().getTime(), TimeUnit.MILLISECONDS) // 不指定时间的话默认生成
.build();
influxDB.write(point);
}
/**
* 插入批量数据
*/
@Test
public void testBatchInsert() {
influxDB.setDatabase("history");
Point point = Point.measurement("actual")
.tag("id", "10")
.tag("origin", "slave")
.addField("cpu", 25.0)
.addField("wind", 19.0)
.addField("power", 34.2)
.time(new Date().getTime(), TimeUnit.MILLISECONDS)
.build();
influxDB.write(BatchPoints.builder().points(point).build());
}
/**
* 查询数据
*/
@Test
public void testQuery() {
influxDB.setDatabase("history");
QueryResult queryResult = influxDB.query(new Query(" select * from actual"));
System.out.println(queryResult);
List<QueryResult.Result> results = queryResult.getResults();
for (QueryResult.Result result : results) {
List<QueryResult.Series> series = result.getSeries();
for (QueryResult.Series serie : series) {
List<String> columns = serie.getColumns();
List<List<Object>> values = serie.getValues();
for (int i = 0; i < values.size(); i++) {
List<Object> points = values.get(i);
for (int j = 0; j < points.size(); j++) {
System.out.println("column: " + columns.get(j) + " value: " + points.get(j));
}
System.out.println("=======================================");
}
}
}
}
/**
* 保留策略
*/
@Test
public void testPolicy() {
// 选择一个库
influxDB.setDatabase("history");
// 查询当前库策略
QueryResult queryResult = influxDB.query(new Query("show retention policies"));
System.out.println(queryResult);
// 创建一个保留策略
influxDB.query(new Query("create retention policy history_policy on history duration 168h replication 1 shard duration 168h default"));
// 查询当前库策略
QueryResult queryResult1 = influxDB.query(new Query("show retention policies"));
System.out.println(queryResult1);
// 修改默认策略
influxDB.query(new Query("alter retention policy autogen on history default "));
// 删除策略
influxDB.query(new Query("drop retention policy history_policy on history"));
// 查询当前库策略
QueryResult queryResult2 = influxDB.query(new Query("show retention policies"));
System.out.println(queryResult2);
}
}