一.引言
使用 SharedJedisPool 时注意到内部涉及到 hash 函数,其中对应的 hash 接口需要复写两个 hash 函数分别是 hash (String var1) 和 hash (Byte[] var1),默认使用 Hashing.MURMUR_HASH 算法,除此之外也可以使用自带的 MD5,下面针对 SharedJedisPool 以及两个 Hash 函数的使用和含义进行分解。
public interface Hashing { Hashing MURMUR_HASH = new MurmurHash(); ThreadLocal<MessageDigest> md5Holder = new ThreadLocal(); Hashing MD5 = new Hashing() { public long hash(String key) { return this.hash(SafeEncoder.encode(key)); } public long hash(byte[] key) { try { if (md5Holder.get() == null) { md5Holder.set(MessageDigest.getInstance("MD5")); } } catch (NoSuchAlgorithmException var6) { throw new IllegalStateException("++++ no md5 algorythm found"); } MessageDigest md5 = (MessageDigest)md5Holder.get(); md5.reset(); md5.update(key); byte[] bKey = md5.digest(); long res = (long)(bKey[3] & 255) << 24 | (long)(bKey[2] & 255) << 16 | (long)(bKey[1] & 255) << 8 | (long)(bKey[0] & 255); return res; } }; long hash(String var1); long hash(byte[] var1); }
二.SharedJedisPool
1.初始化源代码
编辑
最常见的初始化方法就是传入 poolConfig 以及对应的 List<JedisShardInfo> 即需要绑定的 redis 池的 host 和 port,也可以自定义 Hashing 类即 algo 参数,这里 Hashing 默认使用 MURMUR_HASH,如果需要自定义则需要实现 hash(String) 和 hash(Byte[]) 两个 hash 函数,也就是上面提到的两个不同分工的 hash 函数。
2. JedisPool 初始化示例
SharedJedisPool 的初始化与 Jedis 不同,由于是连接池,所以涉及到资源的连接与释放,连接池的大小等,这些统一配置到 JedisPoolConfig 中,其次就是选择要绑定的 redis 集合,将 host-port 一次添加至 List 中,为了区分读写任务,这里通过 rm 和 rs 对 redis host 进行了区分,最后初始化 SharedJedisPool 即可。
def getSharedJedisPool(hostAndPorts: Array[(String, String)], isRead: Boolean): ShardedJedisPool = { val jedisShardInfoList = new util.ArrayList[JedisShardInfo]() val config = new JedisPoolConfig config.setMaxTotal(30) config.setMaxIdle(20) config.setMinIdle(20) config.setTimeBetweenEvictionRunsMillis(30000) config.setSoftMinEvictableIdleTimeMillis(3600000) config.setTestOnBorrow(true) config.setTestOnReturn(true) config.setTestWhileIdle(true) hostAndPorts.foreach{ case (host, port) => if (isRead && host.contains("rs")) { jedisShardInfoList.add(new JedisShardInfo(host, port.toInt)) } else if (!isRead && host.contains("rm")) { jedisShardInfoList.add(new JedisShardInfo(host, port.toInt)) } else { println("主从与任务不匹配!") } } val shardedJedisPool = new ShardedJedisPool(config, jedisShardInfoList, Hashing.MURMUR_HASH, Sharded.DEFAULT_KEY_TAG_PATTERN) shardedJedisPool }
3.JedisPool 常见使用方法
一般 JedisPool 的使用遵循 Try-Catch-Finaly 的原则。
Try:
相比 Jedis,sharedJedis 有一些操作不支持,例如 mset,mget 等,可以理解为 SharedJedis 只能处理单 key 的情况,因为涉及到 hash 到不同的 redis 上,所以这些操作都不被允许。
Catch:
由于是连接池,就会存在连接失效,连接超时,连接不够的情况,所以为了增加程序的鲁棒性,必要的 catch 一定要有
Finaly:
由于 JedisPool 中 resource 即 SharedJedis 有限,所以一般操作后需要close 或者调用 JedisPool.returnResource() 将连接返回,这样其他 task 可以获取空闲连接。
try { val resource = jedisPool.getResource resource.set("k", "v") } catch { case e: Exception => { e.printStackTrace() } } finally { resource.close() }
三.hash 函数调用示例
1.hash(String var)
第一个 hash 函数的作用是维护一个 treeMap,k1-Redis1, k2-Redis2,初始化的函数位于 redis.clients.util.ShardInfo 类内,针对每一个 ShardInfo 都会调用 initialize 函数将初始化好的各个 redis 的连接放入 treeMap 中,当用户传入 key 时,调用 hash(byte[] var) 将 key 映射到 K1,K2..,然后通过 K1,K2... 映射到对应的 redis 连接,从而实现 key - K - Redis 的映射,保证存储的分布均匀。
编辑
这里调用 hash(String var),this.alog 为默认的 hash 函数或者我们定义的函数,可以看到针对这个 hash 函数,它要 hash 的 key 是固定的,即 SHARD-i-NODE-n,变量就是 n = [0,159] 和 i = JedisPool 绑定的 redis 数量,所以 hash(String var) 的参数 var 基本是固定的,最终要做的就是:
hash(SHARD-i-NODE-n) 得到 k1,k2,k3......,对应 redis r1,r2,......,ri 的 shardInfo,最后将 shardInfo 放到 resoureces 中,resources 本质上是一个 LinkedHashMap。
为了打印日志,我们自定义一个 hash 函数,并使用该 hash 函数初始化 JedisPool:
val hashFunction: Hashing = new Hashing() { println("进入Hash函数!") // 决定 hash 到哪台 redis override def hash(key: Array[Byte]): Long = { println(s"进入Byte Hash! ${new String(key)}") (SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) % RedisNum } override def hash(key: String): Long = { println(s"进入String Hash! key: $key ${key.split("-")(1)}") key.split("-")(1).toLong } } val shardedJedisPool = new ShardedJedisPool(config, jedisShardInfoList, hashFunction, Sharded.DEFAULT_KEY_TAG_PATTERN)
运行函数看一下日志:
我采用 key.split("-")(1) 作为 hash 结果,key 的样式是 SHARD-i-NODE-n,i 代表 redis 顺序,n 代表 0-160,所以 split("-")(1) 得到的结果为 i 即 redis 的顺序,因为我绑定了4台 redis,最终到 treeMap 里就只有 4 个 KeyValue 对,Map { 0 -> Redis1, 1-> Redis2,2-> Redis3,3 -> Redis4 }。
编辑
2.hash(Byte[] var)
上面通过 hash (String var) 生成了基于 redis id 映射的 map,Map { 0 -> Redis1, 1-> Redis2,2-> Redis3,3 -> Redis4 } 。接下来就需要 hash(byte[] var) 函数将对应的 key 映射到 map 的 keySet 中了,先看下来了一个 key 的请求后 JedisPool 的处理顺序:
A.获取 Resource
val jedisPool = getSharedJedisPool(hostsAndPorts, false) val resource = jedisPool.getResource resource.set("test_key", "test_value") resource.close()
最简单的就是上面这样,首先 getResource,上面 initialize 函数将每个 redis 的 shardInfo 放置到了 resoucres 中,这样来了 key 就可以通过 hash 函数获取 hash 值然后选择对应的 redis 执行相关操作了
B.Set 操作入口
编辑
所以 set 方法的第一件事情就是根据 key 找到对应的 Jedis
C.寻找 redis 索引
set 方法通过 getShard(key: String) 获取对应 Jedis,getShard 函数再调用 getShardInfo(key: String) 方法,该方法内部再调用 this.algo.hash(key: Byte[]) 方法获取该 key 的索引,然后用过初始化好的 node map 映射到对应 redis 的 shardInfo,再逐级回调,最后返回对应的 Jedis 执行相关的操作。
编辑
四.hash 函数使用解释
编辑
上面基本解释了两个 Hash 函数的含义,下面再回看一下我们自定义的 hash 函数是如何运作的
1.hash(key: String) 释义
这一步很好理解,key.split("-")(1) 完成 redis 索引到 ShardInfo 的一一映射,不再赘述
2.hash(key: Array[Byte]) 释义
这个写法和 java 不同,java 是 Byte[] ,含义相同。主要看这一行:
(SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) % RedisNum
A.SafeEncoder.encode(key).hashCode
SafeEncoder.encode(key).hashCode 该方法针对指定 key 进行 encode 编码并获取一个 long 值的 hashCode,这个是官方 API 内带的方法
B.& Integer.MAX_VALUE
Interget.MAX_VALUE 的值是 2147483647,其二进制表示为 0111 1111 1111 1111 1111 1111 1111 1111,可以看到第一位是 1,执行 & 操作就是保证 HashCode 最终得到的总是正整数,因为 0 & 0 或者 0 & 1 都是 0,所以保证了 (SafeEncoder.encode(key).hashCode & Integer.MAX_VALUE) 的非负性
C. % RedisNum
这一步保证了这个 hash 函数最终返回的 Long 范围在 redis 索引范围内,配合一一映射的 map,保证每一个 key 都能找到 redis
3.如何快速判断 key 对应的 redis
上面也提到过,判断哪一台 redis 调用 getShard 函数即可,也可以结合同样的 hash 方法获取映射,看索引是否和自己的 redis 绑定顺序符合。
val redisNum = 4 val key = "test_key_1" val host = resource.getShard("test_key_1").getClient.getHost val hashNum = (SafeEncoder.encode("test_key_1".getBytes()).hashCode() & Integer.MAX_VALUE) % redisNum println(s"key: $key HashNum: $hashNum host: $host")
五.总结
编辑
所以 Hash(Byte[]) 决定了 key 走对应哪个索引 K,Hash(String) 决定这个索引 K 对应哪台 redis,这样两个函数配合就实现了 key -> Redis 的映射,除此之外,使用 JedisPool 一定要注意 return Resource 或者 close !!