Redis进阶-Redis集群原理剖析及gossip协议初探 集群原理部分 简单的提了下Jest是如何实现Redis Cluster 的 ,这里我们再来梳理一下
import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import; import java.util.HashSet; import java.util.Set; public class JedisClusterDemo { public static void main(String[] args) throws IOException { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(20); config.setMaxIdle(10); config.setMinIdle(5); Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("", 8001)); jedisClusterNode.add(new HostAndPort("", 8004)); jedisClusterNode.add(new HostAndPort("", 8002)); jedisClusterNode.add(new HostAndPort("", 8005)); jedisClusterNode.add(new HostAndPort("", 8003)); jedisClusterNode.add(new HostAndPort("", 8006)); JedisCluster jedisCluster = null; try { //connectionTimeout:指的是连接一个url的连接等待时间 //soTimeout:指的是连接上一个url,获取response的返回等待时间 jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config); System.out.println(jedisCluster.set("clusterArtisan", "artisanValue")); System.out.println(jedisCluster.get("clusterArtisan")); } catch (Exception e) { e.printStackTrace(); } finally { if (jedisCluster != null) jedisCluster.close(); } } }
这里是个简单的demo, 生产中用的话,需要确保jedisCluster是单例的,并且无需手工调用close,不然的话 这个连接池就关闭了,你就无法获取到连接了。
当 Redis Cluster 的客户端来连接集群时,它也会得到一份集群的槽位配置信息并将其缓存在客户端本地。这样当客户端要查找某个 key 时,可以直接定位到目标节点。
jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config);
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); initializeSlotsCache(nodes, poolConfig, password); }
重点看下 initializeSlotsCache
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } }
继续 cache.discoverClusterNodesAndSlots(jedis);
cache为 JedisClusterInfoCache 对象。
public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { reset(); List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } HostAndPort targetNode = generateHostAndPort(hostInfos); setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock(); } }
set --------> run ----> runWithRetries ----> connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))
CRC16算法,计算key对应的slot connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))
public static int getSlot(byte[] key) { int s = -1; int e = -1; boolean sFound = false; for (int i = 0; i < key.length; i++) { if (key[i] == '{' && !sFound) { s = i; sFound = true; } if (key[i] == '}' && sFound) { e = i; break; } } if (s > -1 && e > -1 && e != s + 1) { return getCRC16(key, s + 1, e) & (16384 - 1); } return getCRC16(key) & (16384 - 1); }
jedisCluster.set("clusterArtisan", "artisanValue")
@Override public String set(final String key, final String value) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.set(key, value); } }.run(key); }
命令模式, 关注 run方法
public T run(String key) { if (key == null) { throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); } return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false); }
继续 runWithRetries , 截取核心逻辑
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { Jedis connection = null; try { if (asking) { ...... } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } finally { releaseConnection(connection); } }
- CRC16算法,计算key对应的slot
public static int getSlot(byte[] key) { int s = -1; int e = -1; boolean sFound = false; for (int i = 0; i < key.length; i++) { if (key[i] == '{' && !sFound) { s = i; sFound = true; } if (key[i] == '}' && sFound) { e = i; break; } } if (s > -1 && e > -1 && e != s + 1) { return getCRC16(key, s + 1, e) & (16384 - 1); } return getCRC16(key) & (16384 - 1); }
- getConnectionFromSlot 通过 JedisPool 获取连接
关注下 JedisCluster是如何获取连接的 getConnectionFromSlot 方法
@Override public Jedis getConnectionFromSlot(int slot) { JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment return connectionPool.getResource(); } else { renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool.getResource(); } else { //no choice, fallback to new connection to random node return getConnection(); } } }
本质上还是通过 JedisPool 来获取一个getResource ,跟我们使用Sentinel 啊 单节点获取方法是一样的
- finally 语句中的
private void releaseConnection(Jedis connection) { if (connection != null) { connection.close(); } }
说白了,JedisCluster set后会自动释放连接,调用的是jedis 的close方法,所以我们无需手工关闭,否则你这个jedis的连接池就挂逼了…