引言
了解Jedis的童鞋可能清楚,Jedis中JedisCluster
是不支持pipeline操作的,如果使用了redis集群,在spring-boot-starter-data-redis
中又正好用到的pipeline,那么会接收到Pipeline is currently not supported for JedisClusterConnection.
这样的报错。错误来自于org.springframework.data.redis.connection.jedis.JedisClusterConnection
:
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisConnection#openPipeline()
*/
@Override
public void openPipeline() {
throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection.");
}
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
会帮我们自动配置,无论你redis使用的是standalone、sentinel、cluster配置。这个源码很容易理解,读者可自行阅读,不理解的可以一起讨论。
Lettuce中的pipeline
spring boot 2.0开始,配置spring-boot-starter-data-redis
将不依赖Jedis,而是依赖Lettuce,在Lettuce中,redis cluster使用pipeline不会有问题。
知识储备
再往下看可能需要读者具备如下的能力:
- redis cluster hash slot
- JedisCluster & Jedis的关系
- pipeline和*mset等命令的区别
哈希槽(hash slot)
redis cluster一共有16384个桶(hash slot),用来装数据,建立集群的时候每个集群节点会负责一些slot的数据存储,比如我负责0-1000,你负责1001-2000,他负责2001-3000……
数据存储时,每个key在存入redis cluster前,会利用CRC16计算出一个值,这个值就是对应redis cluster的hash slot,就知道这个key会被放到哪个服务器上了。
参考文档:
JedisCluster & Jedis的关系
JedisCluster本质上是使用Jedis来和redis集群进行打交道的,具体过程是:
- 获取该key的slot值:
JedisClusterCRC16.getSlot(key)
- 从
JedisClusterConnectionHandler
实例中获取到该slot对应的Jedis
实例:Jedis connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
- 利用connection操作。
pipeline和*mset等命令的区别
redis提供了mset,hmset之类的命令,或者说集合操作可以使用sadd key 1 2 3 4 5 6 ..... 10000000000这种一口气传一堆数据的命令。
有时候你甚至会发现*mset这种一口气操作一堆数据的速度更快。那么这种使用场景会有什么弊端呢?答案是:阻塞。
操作这一堆数据需要多久,就会阻塞多久。
Redis Cluster下pipeline使用的思考
由于JedisCluster中的所有操作本质上是使用Jedis,而Jedis是支持pipeline操作的,所有,要在redis cluster中使用pipeline是有可能的,只要你操作同一个键即可,准确的说,应该是你操作的键位于同一台服务器,更直白的,你操作的键是同一个Jedis实例。ok,如果你已经晕了,那你需要回看一下“知识储备”。
说说笔者的使用场景吧,我们是把csv文件的一批数据读到内存中,同一批数据是存储到同一个key中的,最后的操作会类似于:
set key member1
set key member2
set key member3
...
set key member100000
操作的是同一个key,可以利用JedisCluster获取到该key的Jedis实例,然后利用pipeline操作。
让spring-data-redis也支持pipeline的思路
提供一下代码思路。
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection redisConnection = factory.getConnection();
JedisClusterConnection jedisClusterConnection = (JedisClusterConnection) redisConnection;
// 获取到原始到JedisCluster连接
JedisCluster jedisCluster = jedisClusterConnection.getNativeConnection();
// 通过key获取到具体的Jedis实例
// 计算hash slot,根据特定的slot可以获取到特定的Jedis实例
int slot = JedisClusterCRC16.getSlot(key);
/**
* 不建议这么使用,官方在2.10版本已经修复<a href="https://github.com/xetorthio/jedis/pull/1532">此问题</a><br>
* 2.10版本中,官方会直接提供JedisCluster#getConnectionFromSlot
*/
Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);
field.setAccessible(true);
JedisSlotBasedConnectionHandler jedisClusterConnectionHandler = (JedisSlotBasedConnectionHandler) field.get(jedisCluster);
Jedis jedis = jedisClusterConnectionHandler.getConnectionFromSlot(slot);
// 接下来就是pipeline操作了
Pipeline pipeline = jedis.pipelined();
...
pipeline.syncAndReturnAll();
以上代码完全可以模仿spring-data-redis中RedisTemplate#executePipelined
方法写成一个通用的方法,供使用者调用。