Redis进阶-JedisCluster初始化 & 自动管理连接池中的连接 _ 源码分析

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis进阶-JedisCluster初始化 & 自动管理连接池中的连接 _ 源码分析

20200307112715522.png


Pre

Redis进阶-Redis集群原理剖析及gossip协议初探 集群原理部分 简单的提了下Jest是如何实现Redis Cluster 的 ,这里我们再来梳理一下


Code

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class JedisClusterDemo {
    public static void main(String[] args) throws IOException {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(10);
        config.setMinIdle(5);
        Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
        jedisClusterNode.add(new HostAndPort("192.168.18.131", 8001));
        jedisClusterNode.add(new HostAndPort("192.168.18.131", 8004));
        jedisClusterNode.add(new HostAndPort("192.168.18.132", 8002));
        jedisClusterNode.add(new HostAndPort("192.168.18.132", 8005));
        jedisClusterNode.add(new HostAndPort("192.168.18.133", 8003));
        jedisClusterNode.add(new HostAndPort("192.168.18.133", 8006));
        JedisCluster jedisCluster = null;
        try {
            //connectionTimeout:指的是连接一个url的连接等待时间
            //soTimeout:指的是连接上一个url,获取response的返回等待时间
            jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config);
            System.out.println(jedisCluster.set("clusterArtisan", "artisanValue"));
            System.out.println(jedisCluster.get("clusterArtisan"));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jedisCluster != null)
                jedisCluster.close();
        }
    }
}



这里是个简单的demo, 生产中用的话,需要确保jedisCluster是单例的,并且无需手工调用close,不然的话 这个连接池就关闭了,你就无法获取到连接了。


初始化


当 Redis Cluster 的客户端来连接集群时,它也会得到一份集群的槽位配置信息并将其缓存在客户端本地。这样当客户端要查找某个 key 时,可以直接定位到目标节点。

我们来看下jedis的实现

 jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config);

跟下源码

public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                       final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
    initializeSlotsCache(nodes, poolConfig, password);
  }


重点看下 initializeSlotsCache

  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      if (password != null) {
        jedis.auth(password);
      }
      try {
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }


继续 cache.discoverClusterNodesAndSlots(jedis); cache为 JedisClusterInfoCache 对象。

  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();
    try {
      reset();
      List<Object> slots = jedis.clusterSlots();
      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;
        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }
        List<Integer> slotNums = getAssignedSlotArray(slotInfo);
        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.size() <= 0) {
            continue;
          }
          HostAndPort targetNode = generateHostAndPort(hostInfos);
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }


槽计算

set --------> run  ----> runWithRetries ----> connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))


CRC16算法,计算key对应的slot connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))

  public static int getSlot(byte[] key) {
    int s = -1;
    int e = -1;
    boolean sFound = false;
    for (int i = 0; i < key.length; i++) {
      if (key[i] == '{' && !sFound) {
        s = i;
        sFound = true;
      }
      if (key[i] == '}' && sFound) {
        e = i;
        break;
      }
    }
    if (s > -1 && e > -1 && e != s + 1) {
      return getCRC16(key, s + 1, e) & (16384 - 1);
    }
    return getCRC16(key) & (16384 - 1);
  }


无需手工调用close方法

进入到set方法中看下源码

jedisCluster.set("clusterArtisan", "artisanValue")

如下:

  @Override
  public String set(final String key, final String value) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      }
    }.run(key);
  }


命令模式, 关注 run方法

  public T run(String key) {
    if (key == null) {
      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
    }
    return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
  }

继续 runWithRetries , 截取核心逻辑

private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    Jedis connection = null;
    try {
      if (asking) {
       ......
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }
      return execute(connection);
    } finally {
      releaseConnection(connection);
    }
  }



关注点

  • CRC16算法,计算key对应的slot connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key))
  public static int getSlot(byte[] key) {
    int s = -1;
    int e = -1;
    boolean sFound = false;
    for (int i = 0; i < key.length; i++) {
      if (key[i] == '{' && !sFound) {
        s = i;
        sFound = true;
      }
      if (key[i] == '}' && sFound) {
        e = i;
        break;
      }
    }
    if (s > -1 && e > -1 && e != s + 1) {
      return getCRC16(key, s + 1, e) & (16384 - 1);
    }
    return getCRC16(key) & (16384 - 1);
  }
  • getConnectionFromSlot 通过 JedisPool 获取连接

关注下 JedisCluster是如何获取连接的 getConnectionFromSlot 方法

  @Override
  public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // It can't guaranteed to get valid connection because of node
      // assignment
      return connectionPool.getResource();
    } else {
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }


本质上还是通过 JedisPool 来获取一个getResource ,跟我们使用Sentinel 啊 单节点获取方法是一样的


  • finally 语句中的 releaseConnection(connection); ,自动释放连接

看下该方法

  private void releaseConnection(Jedis connection) {
    if (connection != null) {
      connection.close();
    }
  }

说白了,JedisCluster set后会自动释放连接,调用的是jedis 的close方法,所以我们无需手工关闭,否则你这个jedis的连接池就挂逼了…


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
26天前
|
NoSQL Redis 数据库
Redis 连接
10月更文挑战第19天
27 0
|
24天前
|
NoSQL 网络协议 算法
Redis 客户端连接
10月更文挑战第21天
27 1
|
2月前
|
NoSQL Linux Redis
linux安装单机版redis详细步骤,及python连接redis案例
这篇文章提供了在Linux系统中安装单机版Redis的详细步骤,并展示了如何配置Redis为systemctl启动,以及使用Python连接Redis进行数据操作的案例。
69 2
|
3月前
|
NoSQL 网络协议 Redis
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
|
3月前
|
NoSQL 算法 Java
诡异!Redis Proxy RT上升后连接倾斜
本文细致地描述了关于Redis Proxy RT上升后连接倾斜问题的排查过程和根本原因,最后给出了优化方案。
|
3月前
|
Kubernetes NoSQL Redis
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
|
3月前
|
NoSQL 网络协议 Linux
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
|
3月前
|
NoSQL Linux 网络安全
【Azure Redis】Redis-CLI连接Redis 6380端口始终遇见 I/O Error
【Azure Redis】Redis-CLI连接Redis 6380端口始终遇见 I/O Error
|
3月前
|
缓存 NoSQL Linux
【Azure Redis 缓存】应用中出现连接Redis服务错误(production.ERROR: Connection refused)的排查步骤
【Azure Redis 缓存】应用中出现连接Redis服务错误(production.ERROR: Connection refused)的排查步骤
|
3月前
|
缓存 NoSQL 网络协议
【Azure Redis 缓存】Azure Redis 遇见的连接不上问题和数据丢失的情况解答
【Azure Redis 缓存】Azure Redis 遇见的连接不上问题和数据丢失的情况解答