Pipeline 不生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { redisTemplate.opsForValue().get("test"); redisTemplate.opsForValue().get("test2"); return null; } });
然后,我们尝试将其加入事务中,由于我们的目的不是真的测试事务,只是为了演示问题,所以,仅仅是用 SessionCallback
将 GET 命令包装起来:
redisTemplate.execute(new SessionCallback<Object>() { @Override public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException { return operations.opsForValue().get("test"); } });
这里最大的区别就是,外层获取连接的时候,这次是 bind = true 即将连接与当前线程绑定,用于保持会话连接。外层流程如下:
里面的 SessionCallback
其实就是 redisTemplate.opsForValue().get("test")
,使用的是共享的连接,而不是独占的连接,因为我们这里还没开启事务(即执行 multi 命令),如果开启了事务使用的就是独占的连接,流程如下:
由于 SessionCallback
需要保持连接,所以流程有很大变化,首先需要绑定连接,其实就是获取连接放入 ThreadLocal 中。同时,针对 LettuceConnection 进行了封装,我们主要关注这个封装有一个引用计数的变量。每嵌套一次 execute
就会将这个计数 + 1,执行完之后,就会将这个计数 -1, 同时每次 execute
结束的时候都会检查这个引用计数,如果引用计数归零,就会调用LettuceConnection.close()
。
接下来再来看,如果是 executePipelined(SessionCallback)
会怎么样:
List<Object> objects = redisTemplate.executePipelined(new SessionCallback<Object>() { @Override public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException { operations.opsForValue().get("test"); return null; } });
其实与第二个例子在流程上的主要区别在于,使用的连接不是共享连接,而是直接是独占的连接。
最后我们再来看一个例子,如果是在 execute(RedisCallback)
中执行基于 executeWithStickyConnection(RedisCallback callback)
的命令会怎么样,各种 SCAN 就是基于 executeWithStickyConnection(RedisCallback callback)
的,例如:
redisTemplate.execute(new SessionCallback<Object>() { @Override public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException { Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build()); //scan 最后一定要关闭,这里采用 try-with-resource try (scan) { } catch (IOException e) { e.printStackTrace(); } return null; } });
这里 Session callback 的流程,如下图所示,因为处于 SessionCallback,所以 executeWithStickyConnection
会发现当前绑定了连接,于是标记 + 1,但是并不会标记 - 1,因为 executeWithStickyConnection
可以将资源暴露到外部,例如这里的 Cursor,需要外部手动关闭。
在这个例子中,会发生连接泄漏,首先执行:
redisTemplate.execute(new SessionCallback<Object>() { @Override public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException { Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build()); //scan 最后一定要关闭,这里采用 try-with-resource try (scan) { } catch (IOException e) { e.printStackTrace(); } return null; } });
这样呢,LettuceConnection 会和当前线程绑定,并且在结束时,引用计数不为零,而是 1。并且 cursor 关闭时,会调用 LettuceConnection 的 close。但是 LettuceConnection 的 close 的实现,其实只是标记状态,并且把独占的连接asyncDedicatedConn
关闭,由于当前没有使用到独占的连接,所以为空,不需要关闭;如下面源码所示:
@Override public void close() throws DataAccessException { super.close(); if (isClosed) { return; } isClosed = true; if (asyncDedicatedConn != null) { try { if (customizedDatabaseIndex()) { potentiallySelectDatabase(defaultDbIndex); } connectionProvider.release(asyncDedicatedConn); } catch (RuntimeException ex) { throw convertLettuceAccessException(ex); } } if (subscription != null) { if (subscription.isAlive()) { subscription.doClose(); } subscription = null; } this.dbIndex = defaultDbIndex; }
之后我们继续执行一个 Pipeline 命令:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { connection.get("test".getBytes()); redisTemplate.opsForValue().get("test"); return null; } });
这时候由于连接已经绑定到当前线程,同时同上上一节分析我们知道第一步解开释放这个绑定,但是调用了 LettuceConnection
的 close。执行这个代码,会创建一个独占连接,并且,由于计数不能归零,导致连接一直与当前线程绑定,这样,这个独占连接一直不会关闭(如果有连接池的话,就是一直不返回连接池)
即使后面我们手动关闭这个链接,但是根据源码,由于状态 isClosed 已经是 true,还是不能将独占链接关闭。这样,就会造成连接泄漏。
针对这个 Bug,我已经向 spring-data-redis 一个 Issue:Lettuce Connection Leak while using execute(SessionCallback) and executeWithStickyConnection in same thread by random turn
- 尽量避免使用
SessionCallback
,尽量仅在需要使用 Redis 事务的时候,使用SessionCallback
。 - 使用
SessionCallback
的函数单独封装,将事务相关的命令单独放在一起,并且外层尽量避免再继续套RedisTemplate
的execute
相关函数。