使用shardingjdbc执行MySQL游标操作时报错

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用shardingjdbc执行MySQL游标操作时报错

项目中为避免一次查询过多造成oom采用MySQL的游标分批查询,单独使用时正常,但是在集成sharding-jdbc分表中间件后报错:

Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion

shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion

错误日志:

org.apache.ibatis.exceptions.PersistenceException: 
### Error querying database.  Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
### The error may involve cn.com.agree.foundation.offlinedata.server.mapper.OfflineDataMapper.executeOfflineSql
### The error occurred while handling cursor results
### SQL: select * from account_1
### Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
  at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:127)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:116)
  at cn.com.agree.foundation.offlinedata.server.service.impl.DbOfflineDataReader.open(DbOfflineDataReader.java:53)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.execute(OfflineDataProcessorDelegate.java:110)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.offlineDataDelegate(OfflineDataProcessorDelegate.java:85)
  at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessor.process(OfflineDataProcessor.java:23)
  at tech.powerjob.worker.core.tracker.task.light.LightTaskTracker.processTask(LightTaskTracker.java:211)
  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
  at java.util.concurrent.FutureTask.run(FutureTask.java)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
  at org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion(AbstractUnsupportedOperationStatement.java:60)
  at com.baomidou.mybatisplus.core.executor.MybatisSimpleExecutor.doQueryCursor(MybatisSimpleExecutor.java:82)
  at org.apache.ibatis.executor.BaseExecutor.queryCursor(BaseExecutor.java:178)
  at com.baomidou.mybatisplus.core.executor.MybatisCachingExecutor.queryCursor(MybatisCachingExecutor.java:98)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
  at com.sun.proxy.$Proxy223.queryCursor(Unknown Source)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63)
  at com.sun.proxy.$Proxy223.queryCursor(Unknown Source)
  at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:123)
  ... 11 more

原因:查看官网shardingsphere不支持游标操作

解决方案一 自定义处理器

不直接使用Cursor接口实现流式查询,可以通过手动控制结果集的遍历和数据库连接的生命周期来模拟流式查询的效果。

基于自定义结果处理器(ResultHandler)的方式并配合Mapper接口方法实现流式查询读取大量数据的功能:

分批次从数据库中读取数据,每次只加载少量记录到内存中处理,避免一次性加载所有数据造成内存溢出(OOM)。

  • 自定义的 ResultHandler
public class BatchResultHandler<T> implements ResultHandler<T> {
    private final int batchSize;
    private List<T> batch;
    private Consumer<List<T>> batchConsumer;
    public BatchResultHandler(int batchSize, Consumer<List<T>> batchConsumer) {
        this.batchSize = batchSize;
        this.batch = new ArrayList<>(batchSize);
        this.batchConsumer = batchConsumer;
    }
    @Override
    public void handleResult(ResultContext<? extends T> context) {
        batch.add(context.getResultObject());
        if (batch.size() == batchSize) {
            flushBatch();
        }
    }
    private void flushBatch() {
        if (!batch.isEmpty()) {
            batchConsumer.accept(batch);
            batch.clear();
        }
    }
    @Override
    public void close() {
        // 在处理完所有结果后,检查是否有剩余未处理的小批量数据并处理它们
        flushBatch();
    }
}
  • 在Mapper接口中定义查询方法,使用resultHandler参数
public interface YourMapper {
    @Select("SELECT * FROM your_large_table")
    void streamLargeData(@Param("handler") ResultHandler<YourEntity> handler);
}
  • 在业务逻辑中调用并处理数据
@Service
public class YourService {
    private final YourMapper mapper;
    public YourService(YourMapper mapper) {
        this.mapper = mapper;
    }
    public void processLargeDataInBatches() {
        BatchResultHandler<YourEntity> handler = new BatchResultHandler<>(1000, this::processBatch);
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE); // 使用SIMPLE执行器类型
        try {
            mapper.streamLargeData(handler);
        } finally {
            sqlSession.close(); // 在处理完所有数据后关闭SqlSession
        }
    }
    private void processBatch(List<YourEntity> batch) {
        // 在这里处理每个批次的数据,如计算、持久化到其他地方等
    }
}

BatchResultHandler会在接收到指定数量的结果后触发回调函数batchConsumer,从而实现了逐批处理数据的效果。同时,确保在完成所有数据处理后关闭SqlSession,以释放数据库资源。

这种方式虽然没有直接利用Cursor接口,但通过自定义ResultHandler也能达到类似流式查询的目的,即逐次处理结果而不一次性加载所有结果到内存中。然而,使用Cursor接口的优势在于它原生支持流式处理,并且对于数据库连接的管理更为便捷。若能使用Cursor,则推荐优先采用。

解决方案二 分页查询

借助 MyBatis 的分页查询(Pagination)功能来实现类似分批查询的效果。下面是一个使用 RowBounds 实现分批查询的例子:

int batchSize = 100; // 每批查询的数量
int offset = 0; // 当前批次的起始位置
boolean moreDataAvailable = true;
while (moreDataAvailable) {
    try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
        // 使用 RowBounds 实现分页查询
        RowBounds rowBounds = new RowBounds(offset, batchSize);
        
        List<User> batchOfUsers = sqlSession.selectList("yourMapper.selectLargeData", null, rowBounds);
        if (batchOfUsers.isEmpty()) {
            moreDataAvailable = false; // 没有更多数据了
        } else {
            // 处理这一批用户数据
            for (User user : batchOfUsers) {
                processUser(user);
            }
            offset += batchSize; // 移动到下一批次的起始位置
        }
    }
}

RowBounds 对象用于指定查询结果的范围,每次查询从 offset 位置开始,获取 batchSize 个数据。在每次查询后,根据查询结果判断是否还有更多数据,并根据实际情况调整下次查询的 offset 值。通过这种方式,可以避免一次性加载大量数据到内存中。


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
5月前
|
SQL 运维 关系型数据库
MySQL 中 GRANT 操作会引起复制中断吗?
GRANT 操作并不是一个原子性操作,不管执行成功与否,都会触发一个隐式重载授权表的行为。 在生产环境中需要规范用户创建及授权的操作,不推荐使用 DML 语句去直接变更 mysql.user 表,可能会引发其他的问题,若使用了 DML 语句进行变更, 需要手工执行 flush privileges。
76 4
|
5月前
|
JavaScript 关系型数据库 MySQL
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
73 0
|
6月前
|
关系型数据库 MySQL
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
【mysql技巧】如何在这个mysql语句执行前加个前提,也就是只有表里没有相同数据才进行添加插入操作
45 1
|
5月前
|
存储 关系型数据库 文件存储
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
53 2
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 关系型数据库 MySQL
「Python入门」python操作MySQL和SqlServer
**摘要:** 了解如何使用Python的pymysql模块与MySQL数据库交互。首先,通过`pip install pymysql`安装模块。pymysql提供与MySQL的连接功能,例如创建数据库连接、执行SQL查询。在设置好MySQL环境后,使用`pymysql.connect()`建立连接,并通过游标执行SQL(如用户登录验证)。注意防止SQL注入,使用参数化查询。增删改操作需调用`conn.commit()`来保存更改。pymssql模块类似,但导入和连接对象创建略有不同。
72 0
「Python入门」python操作MySQL和SqlServer
|
5月前
|
SQL 存储 关系型数据库
|
6月前
|
关系型数据库 MySQL 数据库
『Django』模型入门教程-操作MySQL
一个后台如果没有数据库可以说废了一半。日常开发中大多数时候都在与数据库打交道。Django 为我们提供了一种更简单的操作数据库的方式。 在 Django 中,模型(Model)是用来定义数据库结构的类。每个模型类通常对应数据库中的一个表,类的属性对应表中的列。通过定义模型,Django 的 ORM(Object-Relational Mapping)可以将 Python 对象映射到数据库表,并提供一套 API 来进行数据库操作。 本文介绍模型的用法。