项目中为避免一次查询过多造成oom采用MySQL的游标分批查询,单独使用时正常,但是在集成sharding-jdbc分表中间件后报错:
Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion
shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion
错误日志:
org.apache.ibatis.exceptions.PersistenceException: ### Error querying database. Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion ### The error may involve cn.com.agree.foundation.offlinedata.server.mapper.OfflineDataMapper.executeOfflineSql ### The error occurred while handling cursor results ### SQL: select * from account_1 ### Cause: java.sql.SQLFeatureNotSupportedException: closeOnCompletion at org.apache.ibatis.exceptions.ExceptionFactory.wrapException(ExceptionFactory.java:30) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:127) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:116) at cn.com.agree.foundation.offlinedata.server.service.impl.DbOfflineDataReader.open(DbOfflineDataReader.java:53) at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.execute(OfflineDataProcessorDelegate.java:110) at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessorDelegate.offlineDataDelegate(OfflineDataProcessorDelegate.java:85) at cn.com.agree.foundation.offlinedata.server.OfflineDataProcessor.process(OfflineDataProcessor.java:23) at tech.powerjob.worker.core.tracker.task.light.LightTaskTracker.processTask(LightTaskTracker.java:211) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.sql.SQLFeatureNotSupportedException: closeOnCompletion at org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement.closeOnCompletion(AbstractUnsupportedOperationStatement.java:60) at com.baomidou.mybatisplus.core.executor.MybatisSimpleExecutor.doQueryCursor(MybatisSimpleExecutor.java:82) at org.apache.ibatis.executor.BaseExecutor.queryCursor(BaseExecutor.java:178) at com.baomidou.mybatisplus.core.executor.MybatisCachingExecutor.queryCursor(MybatisCachingExecutor.java:98) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63) at com.sun.proxy.$Proxy223.queryCursor(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:63) at com.sun.proxy.$Proxy223.queryCursor(Unknown Source) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectCursor(DefaultSqlSession.java:123) ... 11 more
原因:查看官网shardingsphere不支持游标操作
- ShardingSphere > JDBC不支持项
解决方案一 自定义处理器
不直接使用Cursor接口实现流式查询,可以通过手动控制结果集的遍历和数据库连接的生命周期来模拟流式查询的效果。
基于自定义结果处理器(ResultHandler)的方式并配合Mapper接口方法实现流式查询读取大量数据的功能:
分批次从数据库中读取数据,每次只加载少量记录到内存中处理,避免一次性加载所有数据造成内存溢出(OOM)。
- 自定义的 ResultHandler
public class BatchResultHandler<T> implements ResultHandler<T> { private final int batchSize; private List<T> batch; private Consumer<List<T>> batchConsumer; public BatchResultHandler(int batchSize, Consumer<List<T>> batchConsumer) { this.batchSize = batchSize; this.batch = new ArrayList<>(batchSize); this.batchConsumer = batchConsumer; } @Override public void handleResult(ResultContext<? extends T> context) { batch.add(context.getResultObject()); if (batch.size() == batchSize) { flushBatch(); } } private void flushBatch() { if (!batch.isEmpty()) { batchConsumer.accept(batch); batch.clear(); } } @Override public void close() { // 在处理完所有结果后,检查是否有剩余未处理的小批量数据并处理它们 flushBatch(); } }
- 在Mapper接口中定义查询方法,使用resultHandler参数
public interface YourMapper { @Select("SELECT * FROM your_large_table") void streamLargeData(@Param("handler") ResultHandler<YourEntity> handler); }
- 在业务逻辑中调用并处理数据
@Service public class YourService { private final YourMapper mapper; public YourService(YourMapper mapper) { this.mapper = mapper; } public void processLargeDataInBatches() { BatchResultHandler<YourEntity> handler = new BatchResultHandler<>(1000, this::processBatch); SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE); // 使用SIMPLE执行器类型 try { mapper.streamLargeData(handler); } finally { sqlSession.close(); // 在处理完所有数据后关闭SqlSession } } private void processBatch(List<YourEntity> batch) { // 在这里处理每个批次的数据,如计算、持久化到其他地方等 } }
BatchResultHandler会在接收到指定数量的结果后触发回调函数batchConsumer,从而实现了逐批处理数据的效果。同时,确保在完成所有数据处理后关闭SqlSession,以释放数据库资源。
这种方式虽然没有直接利用Cursor接口,但通过自定义ResultHandler也能达到类似流式查询的目的,即逐次处理结果而不一次性加载所有结果到内存中。然而,使用Cursor接口的优势在于它原生支持流式处理,并且对于数据库连接的管理更为便捷。若能使用Cursor,则推荐优先采用。
解决方案二 分页查询
借助 MyBatis 的分页查询(Pagination)功能来实现类似分批查询的效果。下面是一个使用 RowBounds 实现分批查询的例子:
int batchSize = 100; // 每批查询的数量 int offset = 0; // 当前批次的起始位置 boolean moreDataAvailable = true; while (moreDataAvailable) { try (SqlSession sqlSession = sqlSessionFactory.openSession()) { // 使用 RowBounds 实现分页查询 RowBounds rowBounds = new RowBounds(offset, batchSize); List<User> batchOfUsers = sqlSession.selectList("yourMapper.selectLargeData", null, rowBounds); if (batchOfUsers.isEmpty()) { moreDataAvailable = false; // 没有更多数据了 } else { // 处理这一批用户数据 for (User user : batchOfUsers) { processUser(user); } offset += batchSize; // 移动到下一批次的起始位置 } } }
RowBounds 对象用于指定查询结果的范围,每次查询从 offset 位置开始,获取 batchSize 个数据。在每次查询后,根据查询结果判断是否还有更多数据,并根据实际情况调整下次查询的 offset 值。通过这种方式,可以避免一次性加载大量数据到内存中。