使用shardingjdbc执行MySQL游标操作时报错

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用shardingjdbc执行MySQL游标操作时报错

项目中为避免一次查询过多造成oom采用MySQL的游标分批查询,单独使用时正常,但是在集成sharding-jdbc分表中间件后报错:

Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion

shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion

错误日志:

org.apache.ibatis.exceptions.PersistenceException: 
### Error querying database.  Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
### The error may involve cn.com.agree.foundation.offlinedata.server.mapper.OfflineDataMapper.executeOfflineSql
### The error occurred while handling cursor results
### SQL: select * from account_1
### Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
  at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:127)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:116)
  at cn.com.agree.foundation.offlinedata.server.service.impl.DbOfflineDataReader.open(DbOfflineDataReader.java:53)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.execute(OfflineDataProcessorDelegate.java:110)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.offlineDataDelegate(OfflineDataProcessorDelegate.java:85)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessor.process(OfflineDataProcessor.java:23)
  at tech.powerjob.worker.core.tracker.task.light.LightTaskTracker.processTask(LightTaskTracker.java:211)
  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
  at java.util.concurrent.FutureTask.run(FutureTask.java)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
  at org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion(AbstractUnsupportedOperationStatement.java:60)
  at com.baomidou.mybatisplus.core.executor.MybatisSimpleExecutor.doQueryCursor(MybatisSimpleExecutor.java:82)
  at org.apache.ibatis.executor.BaseExecutor.queryCursor(BaseExecutor.java:178)
  at com.baomidou.mybatisplus.core.executor.MybatisCachingExecutor.queryCursor(MybatisCachingExecutor.java:98)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
  at com.sun.proxy.$Proxy223.queryCursor(Unknown Source)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
  at com.sun.proxy.$Proxy223.queryCursor(Unknown Source)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:123)
  ... 11 more

原因:查看官网shardingsphere不支持游标操作

解决方案一 自定义处理器

不直接使用Cursor接口实现流式查询,可以通过手动控制结果集的遍历和数据库连接的生命周期来模拟流式查询的效果。

基于自定义结果处理器(ResultHandler)的方式并配合Mapper接口方法实现流式查询读取大量数据的功能:

分批次从数据库中读取数据,每次只加载少量记录到内存中处理,避免一次性加载所有数据造成内存溢出(OOM)。

  • 自定义的 ResultHandler
public class BatchResultHandler<T> implements ResultHandler<T> {
    private final int batchSize;
    private List<T> batch;
    private Consumer<List<T>> batchConsumer;
    public BatchResultHandler(int batchSize, Consumer<List<T>> batchConsumer) {
        this.batchSize = batchSize;
        this.batch = new ArrayList<>(batchSize);
        this.batchConsumer = batchConsumer;
    }
    @Override
    public void handleResult(ResultContext<? extends T> context) {
        batch.add(context.getResultObject());
        if (batch.size() == batchSize) {
            flushBatch();
        }
    }
    private void flushBatch() {
        if (!batch.isEmpty()) {
            batchConsumer.accept(batch);
            batch.clear();
        }
    }
    @Override
    public void close() {
        // 在处理完所有结果后,检查是否有剩余未处理的小批量数据并处理它们
        flushBatch();
    }
}
  • 在Mapper接口中定义查询方法,使用resultHandler参数
public interface YourMapper {
    @Select("SELECT * FROM your_large_table")
    void streamLargeData(@Param("handler") ResultHandler<YourEntity> handler);
}
  • 在业务逻辑中调用并处理数据
@Service
public class YourService {
    private final YourMapper mapper;
    public YourService(YourMapper mapper) {
        this.mapper = mapper;
    }
    public void processLargeDataInBatches() {
        BatchResultHandler<YourEntity> handler = new BatchResultHandler<>(1000, this::processBatch);
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE); // 使用SIMPLE执行器类型
        try {
            mapper.streamLargeData(handler);
        } finally {
            sqlSession.close(); // 在处理完所有数据后关闭SqlSession
        }
    }
    private void processBatch(List<YourEntity> batch) {
        // 在这里处理每个批次的数据,如计算、持久化到其他地方等
    }
}

BatchResultHandler会在接收到指定数量的结果后触发回调函数batchConsumer,从而实现了逐批处理数据的效果。同时,确保在完成所有数据处理后关闭SqlSession,以释放数据库资源。

这种方式虽然没有直接利用Cursor接口,但通过自定义ResultHandler也能达到类似流式查询的目的,即逐次处理结果而不一次性加载所有结果到内存中。然而,使用Cursor接口的优势在于它原生支持流式处理,并且对于数据库连接的管理更为便捷。若能使用Cursor,则推荐优先采用。

解决方案二 分页查询

借助 MyBatis 的分页查询(Pagination)功能来实现类似分批查询的效果。下面是一个使用 RowBounds 实现分批查询的例子:

int batchSize = 100; // 每批查询的数量
int offset = 0; // 当前批次的起始位置
boolean moreDataAvailable = true;
while (moreDataAvailable) {
    try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
        // 使用 RowBounds 实现分页查询
        RowBounds rowBounds = new RowBounds(offset, batchSize);
        
        List<User> batchOfUsers = sqlSession.selectList("yourMapper.selectLargeData", null, rowBounds);
        if (batchOfUsers.isEmpty()) {
            moreDataAvailable = false; // 没有更多数据了
        } else {
            // 处理这一批用户数据
            for (User user : batchOfUsers) {
                processUser(user);
            }
            offset += batchSize; // 移动到下一批次的起始位置
        }
    }
}

RowBounds 对象用于指定查询结果的范围,每次查询从 offset 位置开始,获取 batchSize 个数据。在每次查询后,根据查询结果判断是否还有更多数据,并根据实际情况调整下次查询的 offset 值。通过这种方式,可以避免一次性加载大量数据到内存中。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
9天前
|
关系型数据库 MySQL
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
10 1
|
23天前
|
SQL 关系型数据库 MySQL
go 通过sql操作mysql
go 通过sql操作mysql
18 1
|
5天前
|
SQL 存储 关系型数据库
|
8天前
|
关系型数据库 MySQL 数据库
『Django』模型入门教程-操作MySQL
一个后台如果没有数据库可以说废了一半。日常开发中大多数时候都在与数据库打交道。Django 为我们提供了一种更简单的操作数据库的方式。 在 Django 中,模型(Model)是用来定义数据库结构的类。每个模型类通常对应数据库中的一个表,类的属性对应表中的列。通过定义模型,Django 的 ORM(Object-Relational Mapping)可以将 Python 对象映射到数据库表,并提供一套 API 来进行数据库操作。 本文介绍模型的用法。
|
20天前
|
关系型数据库 MySQL Go
Go语言介绍以及如何在Go语言中操作MySQL数据库
Go语言介绍以及如何在Go语言中操作MySQL数据库
28 3
|
20天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步MySQL数据并EP(复杂事件处理)时,编译报错,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
20天前
|
存储 自然语言处理 关系型数据库
✅生产问题之Emoji表情如何操作存储,MySQL是否支持
MySQL支持存储Emoji表情,需使用UTF8MB4编码。UTF8MB3,MySQL早期的UTF-8实现,不支持部分Unicode字符包括Emoji,已被弃用。推荐使用UTF8MB4,它支持全部Unicode字符。转换时,现有UTF8MB3表需转换为UTF8MB4,列和表都需设置相应字符集。
|
23天前
|
SQL 关系型数据库 MySQL
MySQL数据库——事务操作-begin-commit-rollback
MySQL数据库——事务操作-begin-commit-rollback
14 1
|
24天前
|
存储 关系型数据库 MySQL
MYSQL--存储过程操作
MYSQL--存储过程操作
|
9天前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之无法创建mysql的连接池什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。