InfluxData【付诸实践 02】SpringBoot 集成时序数据库 InfluxDB 应用分享(InfluxDB实例+Feign接口调用InfluxDB API)源码分享

简介: InfluxData【付诸实践 02】SpringBoot 集成时序数据库 InfluxDB 应用分享(InfluxDB实例+Feign接口调用InfluxDB API)源码分享

1.InfluxDB实例

1.1 依赖及配置

<dependency>
  <groupId>org.influxdb</groupId>
  <artifactId>influxdb-java</artifactId>
  <version>2.15</version>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
spring:
  influx:
    url: http://tcloud:18088
    user: admin
    password: admin
    database: mydb
    retentionPolicy: default
    retentionPolicyTime: 30d

1.2 代码实现

配置类:

@Configuration
public class InfluxDatabaseConfig {
    @Value("${spring.influx.url}")
    public String url;
    @Value("${spring.influx.user}")
    public String userName;
    @Value("${spring.influx.password}")
    public String password;
    @Value("${spring.influx.database}")
    public String database;
    @Value("${spring.influx.retentionPolicy}")
    public String retentionPolicy;
    @Value("${spring.influx.retentionPolicyTime}")
    public String retentionPolicyTime;
}

工具类:

@Slf4j
@Component
public class InfluxDatabaseUtil {
    private String database;
    private String retentionPolicy;
    private String retentionPolicyTime;
    private InfluxDB influxdb;
    @Autowired
    private InfluxDatabaseConfig influxDatabaseConfig;
    @PostConstruct
    private void init() {
        // 保存策略
        if (StringUtils.isEmpty(influxDatabaseConfig.retentionPolicy)) {
            this.retentionPolicy = "autogen";
        } else {
            this.retentionPolicy = influxDatabaseConfig.retentionPolicy;
        }
        // 数据保存策略中数据保存时间
        if (StringUtils.isEmpty(influxDatabaseConfig.retentionPolicyTime)) {
            this.retentionPolicyTime = "30d";
        } else {
            this.retentionPolicyTime = influxDatabaseConfig.retentionPolicyTime;
        }
        if (StringUtils.isEmpty(influxDatabaseConfig.database)) {
            // 创建数据库并赋值
            System.out.println("创建数据库!");
        }
        this.database = influxDatabaseConfig.database;
        // 初始化 InfluxDB 实例
        initInfluxDatabase();
        // 数据库设置保存策略
        createRetentionPolicy();
    }
    /**
     * 初始化数据库连接
     */
    private void initInfluxDatabase() {
        influxdb = InfluxDBFactory.connect(influxDatabaseConfig.url, influxDatabaseConfig.userName, influxDatabaseConfig.password);
        influxdb.setDatabase(database);
    }
    /**
     * 设置数据保存策略
     * default 策略名
     * database 数据库名
     * 30d 数据保存时限30天
     * 1 副本个数为1
     * 结尾 DEFAULT 表示 设为默认的策略
     */
    private void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                retentionPolicy, database, retentionPolicyTime, 1);
        this.query(command);
    }
    /**
     * 查询
     *
     * @param command 查询语句
     * @return 查询结果
     */
    public QueryResult query(String command) {
        return influxdb.query(new Query(command, database));
    }
    /**
     * 插入
     *
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        Point.Builder builder = Point.measurement(measurement);
        builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        log.info("influxDB insert data:[{}]", builder.build().toString());
        influxdb.write(database, "", builder.build());
    }
}

对象封装:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Usage {
    private String time;
    private String serviceMethod;
    private String userId;
    private Integer count;
    private String url;
}

测试类:

@SpringBootTest
class InfluxDatabaseApplicationTests {
    @Autowired
    private InfluxDatabaseUtil influxDatabaseUtil;
    @Test
    void testInsert() throws InterruptedException {
        Map<String, String> tagsMap = new HashMap<>();
        Map<String, Object> fieldsMap = new HashMap<>();
        System.out.println("influxDB start time :" + System.currentTimeMillis());
        int i = 0;
        do {
            Thread.sleep(100);
            tagsMap.put("user_id", String.valueOf(i % 10));
            tagsMap.put("url", "http://www.baidu.com");
            tagsMap.put("service_method", "testInsert" + (i % 5));
            fieldsMap.put("count", i % 5);
            influxDatabaseUtil.insert("usage", tagsMap, fieldsMap);
            i++;
        } while (i < 50);
    }
    @Test
    void testQuery() {
        QueryResult query = influxDatabaseUtil.query("select * from usage limit 10");
        ArrayList<Object> lists = new ArrayList<>();
        query.getResults().forEach(result -> {
            result.getSeries().forEach(serie -> {
                List<List<Object>> values = serie.getValues();
                List<String> columns = serie.getColumns();
                lists.addAll(getQueryData(columns, values));
            });
        });
        System.out.println(lists.toString());
    }
    private List<Usage> getQueryData(List<String> columns, List<List<Object>> values) {
        List<Usage> lists = new ArrayList<>();
        for (List<Object> list : values) {
            Usage info = new Usage();
            BeanWrapperImpl bean = new BeanWrapperImpl(info);
            for (int i = 0; i < list.size(); i++) {
                String propertyName = setColumns(columns.get(i));
                // 字段名
                Object value = list.get(i);
                // 相应字段值
                bean.setPropertyValue(propertyName, value);
            }
            lists.add(info);
        }
        return lists;
    }
    /*** 转义字段 ***/
    private String setColumns(String column) {
        String[] cols = column.split("_");
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < cols.length; i++) {
            String col = cols[i].toLowerCase();
            if (i != 0) {
                String start = col.substring(0, 1).toUpperCase();
                String end = col.substring(1).toLowerCase();
                col = start + end;
            }
            sb.append(col);
        }
        return sb.toString();
    }
}

2.Feign接口调用InfluxDB API

2.1 依赖及配置

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-openfeign</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>
ribbon:
    ReadTimeout: 5000
    ConnectTimeout: 5000

2.2 代码实现

Use the InfluxDB API 文档有详细说明,以下是feign接口:

@FeignClient(name = "influxDbApi", url = "${spring.influx.url}")
public interface InfluxDbApi {
    /**
     * 查询数据库
     *
     * @param q  查询语句
     * @param db 数据库
     * @param u  用户名
     * @param p  密码
     * @return 查询结果
     * @throws Exception 可能出现的异常
     */
    @PostMapping("/query")
    String queryInfo(@RequestParam String db, @RequestParam String u, @RequestParam String p, @RequestParam String q) throws Exception;
}

调用测试类:

@Component
public class ApiTesting {
    @Autowired
    private InfluxDbApi influxDbApi;
    String test() throws Exception {
       /* Map<String, Object> param = new HashMap<>(8);
        param.put("q","CREATE DATABASE testDB");*/
        return influxDbApi.queryInfo("mydb", "admin", "admin", "select * from usage");
    }
}

测试接口:

@RestController
@Slf4j
public class ApiController {
    @Resource(name = "apiTesting")
    private ApiTesting apiTesting;
    @PostMapping("/test")
    public String get() throws Exception {
        log.info("------test------");
        return apiTesting.test();
    }
}
目录
相关文章
|
3天前
|
JSON 搜索推荐 API
Lazada Item_review API接口的开发应用与收益
Lazada作为东南亚领先的电商平台,通过其丰富的API接口为第三方开发者提供了强大的工具。其中,Lazada商品评论列表API(item_review API)尤为重要,能够实时获取商品评论数据,帮助开发者了解用户反馈、优化商品策略、提高购物体验和建立品牌形象,从而在电商行业中实现显著的收益。本文将深入探讨该API的开发应用及其多方面的价值。
35 14
|
2天前
|
搜索推荐 数据挖掘 API
Suning商品详情API接口的开发应用与收益
在电商迅猛发展的时代,API接口技术成为连接不同系统的桥梁,为电商平台提供高效的数据交换能力。苏宁易购的商品详情API接口,为商家和开发者带来诸多便利和收益,包括商品信息获取、选品上架、竞品分析、个性化推荐、自动化管理和运营效率提升等方面,助力商家优化销售策略,提高用户体验,降低运营成本,增强市场竞争力,促进业务创新。
24 2
|
1天前
|
供应链 搜索推荐 API
Walgreens商品详情API接口的开发应用与收益
在数字化时代,API成为连接不同系统和服务的桥梁。Walgreens通过开放其商品详情API接口,为开发者提供了丰富的数据源,支持商品搜索、个性化推荐、价格比较、库存管理等应用,创造了新的商业机会和收益模式。本文将深入探讨Walgreens商品详情API接口的开发应用及其多重收益。
19 5
|
4天前
|
搜索推荐 数据挖掘 API
拼多多根据ID取商品详情原数据API接口的开发应用与收益
拼多多作为中国知名电商平台,为开发者和企业提供丰富的API接口,助力快速接入平台,实现商品推广、订单管理等功能。其中,根据ID取商品详情原数据的API接口尤为重要,具备高效性、稳定性和安全性,广泛应用于电商数据分析、价格监测、竞品分析、商品推荐系统、移动应用开发及精准营销等领域,为企业带来显著收益。
16 0
|
4天前
|
人工智能 自然语言处理 API
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
谷歌推出的Multimodal Live API是一个支持多模态交互、低延迟实时互动的AI接口,能够处理文本、音频和视频输入,提供自然流畅的对话体验,适用于多种应用场景。
34 3
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
|
6天前
|
XML JSON 缓存
阿里巴巴商品详情数据接口(alibaba.item_get) 丨阿里巴巴 API 实时接口指南
阿里巴巴商品详情数据接口(alibaba.item_get)允许商家通过API获取商品的详细信息,包括标题、描述、价格、销量、评价等。主要参数为商品ID(num_iid),支持多种返回数据格式,如json、xml等,便于开发者根据需求选择。使用前需注册并获得App Key与App Secret,注意遵守使用规范。
|
5天前
|
JSON API 开发者
淘宝买家秀数据接口(taobao.item_review_show)丨淘宝 API 实时接口指南
淘宝买家秀数据接口(taobao.item_review_show)可获取买家上传的图片、视频、评论等“买家秀”内容,为潜在买家提供真实参考,帮助商家优化产品和营销策略。使用前需注册开发者账号,构建请求URL并发送GET请求,解析响应数据。调用时需遵守平台规定,保护用户隐私,确保内容真实性。
|
5天前
|
搜索推荐 数据挖掘 API
淘宝天猫商品评论数据接口丨淘宝 API 实时接口指南
淘宝天猫商品评论数据接口(Taobao.item_review)提供全面的评论信息,包括文字、图片、视频评论、评分、追评等,支持实时更新和高效筛选。用户可基于此接口进行数据分析,支持情感分析、用户画像构建等,同时确保数据使用的合规性和安全性。使用步骤包括注册开发者账号、创建应用获取 API 密钥、发送 API 请求并解析返回数据。适用于电商商家、市场分析人员和消费者。
|
15天前
|
JSON API 开发工具
淘宝实时 API 接口丨淘宝商品详情接口(Taobao.item_get)
淘宝商品详情接口(Taobao.item_get)允许开发者获取商品的详细信息,包括基本信息、描述、卖家资料、图片、属性及销售情况等。开发者需注册账号、创建应用并获取API密钥,通过构建请求获取JSON格式数据,注意遵守平台规则,合理使用接口,确保数据准确性和时效性。
|
16天前
|
JSON 安全 API
Python调用API接口的方法
Python调用API接口的方法
74 5