---------------全网首发--------------- 不看后悔!!!---------------------
摘要:
最近用到IoTDB数据库,经过对一些相关文档的搜集,大概了解到了该数据库的主要应用场景和使用方法,本篇就讲一下如何利用IoTDB并结合SpringBoott和Mybatis进行项目整合。经过一番查找全网都没有一篇完整的关于该数据库采用Mybatis做持久化框架的文章,那么就由我来开辟一下先河。
概述:
IoTDB数据库官网: http://iotdb.apache.org/
Apache IoTDB(物联网数据库)是一个物联网原生数据库,具有高性能的数据管理和分析能力,可部署在边缘和云端。Apache IoTDB以其轻量级的架构、高性能和丰富的特性集以及与Apache Hadoop、Spark和Flink的深度集成,可以满足物联网海量数据存储、高速数据摄取和复杂数据分析的需求工业领域。
特点:
- 高吞吐量读写
Apache IoTDB 可以支持数百万个低功耗智能联网设备的高速写入访问。它还提供用于检索数据的闪电读取访问。 - 高效的目录结构 Apache IoTDB
可以通过对时间序列数据复杂目录的模糊搜索策略,有效地组织来自物联网设备的复杂数据结构和大尺寸时间序列数据。 - 丰富的查询语义 Apache
IoTDB 可以支持跨设备和传感器的时间序列数据的时间对齐、时间序列领域的计算和丰富的时间维度聚合功能。 - 硬件成本低 Apache
IoTDB 可以达到很高的磁盘存储压缩率(在硬盘上存储 1GB 数据的成本不到 0.23 美元) - 灵活部署 Apache IoTDB
可以为用户提供云端一键安装、桌面终端工具以及云平台与本地机器之间的桥梁工具(数据同步工具)。 - 与开源生态系统的紧密集成
Apache IoTDB 可以支持分析生态系统,例如 Hadoop、Spark、Flink 和 Grafana(可视化工具)。
同时该数据库只支持以下几种数据类型
BOOLEAN、INT32 (int)、INT64 (long)、FLOAT、DOUBLE 、TEXT (String)
具体的使用说明可以参照官网上地址,写的很全:http://iotdb.apache.org/UserGuide/Master/QuickStart/QuickStart.html
实战:
SpringBoot项目相关配置
首先创建一个SpringBoot项目 这里就不过多赘述了,然后pom.xml引入iotdb依赖
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-jdbc</artifactId> <version>0.12.1</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.9</version> </dependency>
application.yml文件配置
server: port: 8080 #iotdb spring: datasource: username: root password: root driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver url: jdbc:iotdb://192.168.80.200:6667/ initial-size: 5 min-idle: 10 max-active: 50 max-wait: 60000 remove-abandoned: true remove-abandoned-timeout: 30 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 test-while-idle: false test-on-borrow: false test-on-return: false #mybatis mybatis: mapper-locations: classpath*:/mappers/*.xml
IotDbConfig 工具类
这是iotdb数据库的jdbc操作,很多文章中都是采用手写工具类去操作,都没有结合Mybatis,这可能和官网提出的不建议使用该方式有关。我参考了一下别人的JDBC工具类并重构了一下:
package com.iotdb.zjc.demo.util; import com.alibaba.druid.pool.DruidDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.sql.*; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Author: zjc * @ClassName IotDBConfig * @Description TODO IoTDB-JDBC操作工具 * @date 2021/11/24 14:31 * @Version 1.0 */ @Component @Configuration public class IotDbConfig { private static final Logger log = LoggerFactory.getLogger(IotDbConfig.class); @Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; @Value("${spring.datasource.driver-class-name}") private String driverName; @Value("${spring.datasource.url}") private String url; @Value("${spring.datasource.initial-size}") private int initialSize; @Value("${spring.datasource.min-idle}") private int minIdle; @Value("${spring.datasource.max-active}") private int maxActive; @Value("${spring.datasource.max-wait}") private int maxWait; @Value("${spring.datasource.remove-abandoned}") private boolean removeAbandoned; @Value("${spring.datasource.remove-abandoned-timeout}") private int removeAbandonedTimeout; @Value("${spring.datasource.time-between-eviction-runs-millis}") private int timeBetweenEvictionRunsMillis; @Value("${spring.datasource.min-evictable-idle-time-millis}") private int minEvictableIdleTimeMillis; @Value("${spring.datasource.test-while-idle}") private boolean testWhileIdle; @Value("${spring.datasource.test-on-borrow}") private boolean testOnBorrow; @Value("${spring.datasource.test-on-return}") private boolean testOnReturn; private static DruidDataSource iotDbDataSource; /** * 使用阿里的druid连接池 * * @return */ public Connection getConnection() { if (iotDbDataSource == null) { iotDbDataSource = new DruidDataSource(); //设置连接参数 iotDbDataSource.setUrl(url); iotDbDataSource.setDriverClassName(driverName); iotDbDataSource.setUsername(username); iotDbDataSource.setPassword(password); //配置初始化大小、最小、最大 iotDbDataSource.setInitialSize(initialSize); iotDbDataSource.setMinIdle(minIdle); iotDbDataSource.setMaxActive(maxActive); //配置获取连接等待超时的时间 iotDbDataSource.setMaxWait(maxWait); //连接泄漏监测 iotDbDataSource.setRemoveAbandoned(removeAbandoned); iotDbDataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout); //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 iotDbDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); iotDbDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); //防止过期 iotDbDataSource.setTestWhileIdle(testWhileIdle); iotDbDataSource.setTestOnBorrow(testOnBorrow); iotDbDataSource.setTestOnReturn(testOnReturn); } Connection connection = null; try { connection = iotDbDataSource.getConnection(); } catch (SQLException e) { e.printStackTrace(); log.error("iotDB getConnection失败: error={}", e.getMessage()); } return connection; } /** * 执行单条数据操作 * * @param sql */ public boolean execute(String sql) { log.info("iotDB execute sql={}", sql); Connection connection = getConnection(); PreparedStatement preparedStatement = null; boolean flag = false; try { if (connection != null) { preparedStatement = connection.prepareStatement(sql); long systemTime = System.currentTimeMillis(); flag = preparedStatement.execute(); log.info("执行IotDb的sql----[{}],时间:[{}]ms", sql, System.currentTimeMillis() - systemTime); } } catch (SQLException e) { log.error("iotDB insert失败: error={}", e.getMessage()); } finally { close(preparedStatement, connection); } return flag; } /** * 执行批量数据操作 * * @param sqls * @return */ public Integer executeBatch(List<String> sqls) { Connection connection = getConnection(); PreparedStatement preparedStatement = null; int[] flag = null; try { if (connection != null) { for (String sql : sqls) { log.info("iotDB executeBatch sql={}", sql); preparedStatement = connection.prepareStatement(sql); long systemTime = System.currentTimeMillis(); preparedStatement.addBatch(sql); log.info("执行IotDb的sql----[{}],时间:[{}]ms", sql, System.currentTimeMillis() - systemTime); } flag = preparedStatement.executeBatch(); } } catch (SQLException e) { log.error("iotDB 执行失败: error={}", e.getMessage()); } finally { close(preparedStatement, connection); } return flag.length; } /** * 查询操作 * * @param sql * @return */ public List<Map<String, Object>> executeQuery(String sql) { log.info("iotDB executeQuery sql={}", sql); Connection connection = getConnection(); PreparedStatement preparedStatement = null; List<Map<String, Object>> resultList = null; ResultSet resultSet = null; try { if (connection != null) { preparedStatement = connection.prepareStatement(sql); long systemTime = System.currentTimeMillis(); resultSet = preparedStatement.executeQuery(); log.info("查询IotDb的sql----[{}],时间:[{}]ms", sql, System.currentTimeMillis() - systemTime); resultList = outputResult(resultSet); } } catch (SQLException e) { e.printStackTrace(); log.error("iotDB query失败: error={}", e.getMessage()); } finally { try { if (resultSet != null) { resultSet.close(); } } catch (SQLException e) { log.error("iotDB resultSet关闭异常: error={}", e.getMessage()); } close(preparedStatement, connection); } return resultList; } /** * 输出结果集 * * @param resultSet * @return * @throws SQLException */ private List<Map<String, Object>> outputResult(ResultSet resultSet) throws SQLException { List<Map<String, Object>> resultList = new ArrayList<>(); if (resultSet != null) { ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); while (resultSet.next()) { Map resultMap = new HashMap<>(16); for (int i = 1; i <= columnCount; i++) { String colunmName = metaData.getColumnLabel(i); if (colunmName.indexOf('.') >= 0) { colunmName = colunmName.substring(colunmName.lastIndexOf('.') + 1); } //过滤 函数()括号 if (colunmName.indexOf(')') >= 0) { colunmName = colunmName.substring(0, colunmName.lastIndexOf(')')); } //时序库自带的时间格式转换 if (colunmName.equals("Time")) { Long timeStamp = Long.parseLong(resultSet.getString(i)); if (timeStamp > 0) { Date d = new Date(timeStamp); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); resultMap.put("Time", sf.format(d)); } } else { resultMap.put(colunmName, resultSet.getString(i)); } } resultList.add(resultMap); } } return resultList; } /** * 关闭连接 * * @param statement * @param connection */ private void close(Statement statement, Connection connection) { try { if (statement != null) { statement.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { log.error("iotDB close失败: error={}", e.getMessage()); e.printStackTrace(); } } }
整合mybatis
创建controller、service、mapper文件
controller:
package com.iotdb.zjc.demo.controller; import com.iotdb.zjc.demo.service.OrderService; import com.iotdb.zjc.demo.constant.MyResponse; import com.iotdb.zjc.demo.data.OrderInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.Date; import java.util.List; /** * @Author: zjc * @ClassName OrderController * @Description TODO * @date 2021/11/23 17:16 * @Version 1.0 */ @RestController public class OrderController { private static final Logger log = LoggerFactory.getLogger(OrderController.class); @Autowired OrderService orderService; @RequestMapping(value = "/createOrder", method = RequestMethod.POST) // @Transactional(rollbackFor = Exception.class) public MyResponse createOrder(@RequestBody OrderInfo orderInfo) { try { orderInfo.setTimestamp(System.currentTimeMillis()); orderInfo.setCreateTime(new Date()); Integer v = orderService.createOrder(orderInfo); if (v == -1) { return MyResponse.ok(v); } else { return MyResponse.checkForbidden(false); } } catch (Exception e) { log.info("创建订单失败!" + e); return new MyResponse(false); } } /** * 更新操作 其实也是插入操作 时间戳相同 只和时间戳相关 * @param orderInfo * @return */ @RequestMapping(value = "/updateOrder", method = RequestMethod.POST) public MyResponse updateOrder(@RequestBody OrderInfo orderInfo) { try { orderInfo.setTimestamp(System.currentTimeMillis()); orderInfo.setCreateTime(new Date()); Integer v = orderService.updateOrder(orderInfo); if (v == -1) { return MyResponse.ok(v); } else { return MyResponse.checkForbidden(false); } } catch (Exception e) { log.info("更新订单失败!" + e); return new MyResponse(false); } } /** * 删除操作 要将时间戳的加号变成%2B * @param timestamp * @return */ @RequestMapping(value = "/deleteOrder", method = RequestMethod.GET) public MyResponse deleteOrder(String timestamp) { try { Integer v = orderService.deleteOrder(timestamp); if (v == -1) { return MyResponse.ok(v); } else { return MyResponse.checkForbidden(false); } } catch (Exception e) { log.info("删除订单失败!" + e); return new MyResponse(false); } } /** * 创建组 也就是相当于数据库 * @return */ @RequestMapping(value = "/createOrderGroup", method = RequestMethod.POST) public MyResponse createOrderGroup() { try { Integer v = orderService.createOrderGroup(); if (v > 0) { return new MyResponse(v); } else { return new MyResponse(false); } } catch (Exception e) { log.info("创建订单组失败!" + e); return new MyResponse(false); } } /** * 查询所有订单数据 * @return */ @RequestMapping(value = "/queryOrder", method = RequestMethod.GET) public MyResponse queryOrder() { try { List<OrderInfo> v = orderService.queryOrder(); if (v.size() > 0) { v.forEach(x -> { System.out.println(x.toString()); }); return MyResponse.ok(v); } else { return new MyResponse(false); } } catch (Exception e) { log.info("查询订单组失败!" + e); return new MyResponse(false); } } /** * 查看数据库有多少组 * * @return */ @RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET) public MyResponse selectStorageGroup() { try { List<String> v = orderService.selectStorageGroup(); if (v.size() > 0) { v.forEach(x -> { System.out.println("group------------------" + x.toString()); }); return MyResponse.ok(v); } else { return new MyResponse(false); } } catch (Exception e) { log.info("查询组失败!" + e); return new MyResponse(false); } } }
serviceImpl文件
package com.iotdb.zjc.demo.serviceimpl; import com.iotdb.zjc.demo.service.OrderService; import com.iotdb.zjc.demo.data.OrderInfo; import com.iotdb.zjc.demo.mapper.OrderMapper; import com.iotdb.zjc.demo.util.IotDbConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @Author: zjc * @ClassName OrderServiceImpl * @Description TODO * @date 2021/11/23 17:32 * @Version 1.0 */ @Service public class OrderServiceImpl implements OrderService { @Autowired IotDbConfig iotDbConfig; @Autowired OrderMapper orderMapper; @Override public Integer createOrder(OrderInfo orderInfo) { return orderMapper.createOrder(orderInfo); } @Override public Integer updateOrder(OrderInfo orderInfo) { return orderMapper.updateOrder(orderInfo); } @Override public Integer createOrderGroup() { try { // List<String> statements = new ArrayList<>(); // statements.add("set storage group to root.order"); // statements.add("CREATE TIMESERIES root.order.orderdetail.order_id WITH DATATYPE=INT64, ENCODING=PLAIN, COMPRESSOR=SNAPPY"); // statements.add("CREATE TIMESERIES root.order.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY"); // statements.add("CREATE TIMESERIES root.order.orderdetail.order_name WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY"); // statements.add("CREATE TIMESERIES root.order.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY"); // Integer flag = iotDbConfig.executeBatch(statements); Integer flag = orderMapper.createOrderGroup(); Integer flagEle = orderMapper.createOrderGroupElement(); System.out.println("\n\t执行sql数量为{}" + flagEle); return flagEle; } catch (Exception e) { e.printStackTrace(); } return 0; } @Override public List<OrderInfo> queryOrder() { return orderMapper.queryOrder(); } @Override public List<String> selectStorageGroup() { return orderMapper.selectStorageGroup(); } @Override public Integer deleteOrder(String timestamp) { return orderMapper.deleteOrder(timestamp); } }
mapper文件
package com.iotdb.zjc.demo.mapper; import com.iotdb.zjc.demo.data.OrderInfo; import org.apache.ibatis.annotations.Param; import java.util.List; /** * @Author: zjc * @ClassName OrderMapper * @Description TODO * @date 2021/11/24 10:09 * @Version 1.0 */ public interface OrderMapper { Integer createOrder(OrderInfo orderInfo); List<OrderInfo> queryOrder(); Integer createOrderGroup(); Integer createOrderGroupElement(); List<String> selectStorageGroup(); Integer updateOrder(OrderInfo orderInfo); Integer deleteOrder(@Param("timestamp") String timestamp); }
mapper.xml文件
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.iotdb.zjc.demo.mapper.OrderMapper"> <resultMap id="BaseResultMap" type="com.iotdb.zjc.demo.data.OrderInfo"> <result column="root.order2.orderdetail.order_id" property="orderId"/> <result column="root.order2.orderdetail.order_num" property="orderNum"/> <result column="root.order2.orderdetail.order_name" property="orderName"/> <result column="root.order2.orderdetail.create_time" property="createTime"/> </resultMap> <insert id="createOrder" parameterType="com.iotdb.zjc.demo.data.OrderInfo"> insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(#{timestamp},#{orderId},#{orderNum},#{orderName},#{createTime}); </insert> <select id="queryOrder" resultMap="BaseResultMap"> select * from root.order2.orderdetail limit 3 offset 0 </select> <select id="selectStorageGroup" resultType="java.lang.String"> show storage group limit 3 offset 0 </select> <delete id="deleteOrder" parameterType="java.lang.String"> delete from root.order2.orderdetail where timestamp = ${timestamp}; </delete> <insert id="updateOrder"> insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(2021-11-24T18:28:20.689+08:00,#{orderId},#{orderNum},#{orderName},#{createTime}); </insert> <update id="createOrderGroup"> SET STORAGE GROUP TO root.order2 </update> <update id="createOrderGroupElement"> -- CREATE TIMESERIES root.order2.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY; CREATE TIMESERIES root.order2.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY; </update> </mapper>
注意几点:
1、一定要注意映射关系要写成这样root.order2.orderdetail.order_id 组名的形式,不然映射不到对应实体的字段。
2、添加时要增加时间戳,时间戳就是主键,删除也是删除对应的时间戳。 3、修改操作就是新增操作,只是对应的时间戳不变,其他的都可以变。
4、创建组和创建时间序列,当我们创建一个时间序列时,我们应该定义它的数据类型和编码方案
常用的sql操作地址可以参考一下这个链接:https://blog.csdn.net/zjy660358/article/details/111315567
IoTDB安装操作请查看如下链接: