flea-cache使用之Redis分片模式接入

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【1月更文挑战第1天】本篇介绍 Huazie 的Flea框架下的 flea-cache模块中 Redis分片模式接入和使用

cacheplus.jpeg

1. 参考

flea-cache使用之Redis分片模式接入 源代码
image.png

2. 依赖

jedis-3.0.1.jar

<!-- Java redis -->
<dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
     <version>3.0.1</version>
</dependency>

spring-context-4.3.18.RELEASE.jar

<!-- Spring相关 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.18.RELEASE</version>
</dependency>

spring-context-support-4.3.18.RELEASE.jar

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.3.18.RELEASE</version>
</dependency>

3. 基础接入

3.1 定义Flea缓存接口

IFleaCache 可参考笔者的这篇博文 Memcached接入,不再赘述。

3.2 定义抽象Flea缓存类

AbstractFleaCache 可参考笔者的这篇博文 Memcached接入,不再赘述。

3.3 定义Redis客户端接口类

RedisClient , 注意该版,相比《flea-frame-cache使用之Redis接入》博文中,废弃如下与 ShardedJedis 有关的方法:

    ShardedJedisPool getJedisPool();

    void setShardedJedis(ShardedJedis shardedJedis);

    ShardedJedis getShardedJedis();

flea-frame-cache使用之Redis接入》博文中 提到了使用 Redis客户端代理方式 访问 RedisClient, 在这版为了实现Redis访问异常后的重试机制,废弃了代理模式,采用了命令行模式,可参考下面的 RedisClientCommand

3.4 定义Redis客户端命令行

RedisClientCommand 封装了使用ShardedJedis操作Redis缓存的公共逻辑

/**
 * Redis客户端命令行,封装了使用ShardedJedis操作Redis缓存的公共逻辑,
 * 如果出现异常可以重试{@code maxAttempts} 次。
 *
 * <p> 抽象方法 {@code execute},由子类或匿名类实现。在实际调用前,
 * 需要从分布式Jedis连接池中获取分布式Jedis对象;调用结束后,
 * 关闭分布式Jedis对象,归还给分布式Jedis连接池。
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.1.0
 */
public abstract class RedisClientCommand<T> {
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisClientCommand.class);

    private final ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private final int maxAttempts; // Redis客户端操作最大尝试次数【包含第一次操作】

    public RedisClientCommand(ShardedJedisPool shardedJedisPool, int maxAttempts) {
   
        this.shardedJedisPool = shardedJedisPool;
        this.maxAttempts = maxAttempts;
    }

    public abstract T execute(ShardedJedis connection);

    /**
     * 执行分布式Jedis操作
     *
     * @return 分布式Jedis对象操作的结果
     * @since 1.0.0
     */
    public T run() {
   
        return runWithRetries(this.maxAttempts);
    }

    /**
     * 执行分布式Jedis操作,如果出现异常,包含第一次操作,可最多尝试maxAttempts次。
     *
     * @param attempts 重试次数
     * @return 分布式Jedis对象操作的结果
     * @since 1.0.0
     */
    private T runWithRetries(int attempts) {
   
        if (attempts <= 0) {
   
            throw new FleaCacheMaxAttemptsException("No more attempts left.");
        }
        ShardedJedis connection = null;
        try {
   
            connection = shardedJedisPool.getResource();
            Object obj = null;
            if (LOGGER.isDebugEnabled()) {
   
                obj = new Object() {
   };
                LOGGER.debug1(obj, "Get ShardedJedis = {}", connection);
            }
            T result = execute(connection);
            if (LOGGER.isDebugEnabled()) {
   
                LOGGER.debug1(obj, "Result = {}", result);
            }
            return result;
        } catch (JedisConnectionException e) {
   
            // 在开始下一次尝试前,释放当前分布式Jedis的连接,将分布式Jedis对象归还给分布式Jedis连接池
            releaseConnection(connection);
            connection = null; // 这里置空是为了最后finally不重复操作
            if (LOGGER.isErrorEnabled()) {
   
                Object obj = new Object() {
   };
                LOGGER.error1(obj, "Redis连接异常:", e);
                int currAttempts = this.maxAttempts - attempts + 1;
                LOGGER.error1(obj, "第 {} 次尝试失败,开始第 {} 次尝试...", currAttempts, currAttempts + 1);
            }
            return runWithRetries(attempts - 1);
        } finally {
   
            releaseConnection(connection);
        }
    }

    /**
     * 释放指定分布式Jedis的连接,将分布式Jedis对象归还给分布式Jedis连接池
     *
     * @param connection 分布式Jedis实例
     * @since 1.0.0
     */
    private void releaseConnection(ShardedJedis connection) {
   
        if (ObjectUtils.isNotEmpty(connection)) {
   
            if (LOGGER.isDebugEnabled()) {
   
                LOGGER.debug1(new Object() {
   }, "Close ShardedJedis");
            }
            connection.close();
        }
    }
}

3.5 定义分片模式Redis客户端实现类

FleaRedisShardedClient 主要使用 ShardedJedis 来操作 Redis 数据。

/**
 * Flea分片模式Redis客户端实现类,封装了Flea框架操作Redis缓存的基本操作。
 *
 * <p> 它内部具体操作Redis缓存的功能,由分布式Jedis对象完成,
 * 包含读、写、删除Redis缓存的基本操作方法。
 * 
 * 详见笔者 https://github.com/Huazie/flea-frame,欢迎 Star
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class FleaRedisShardedClient extends FleaRedisClient {
   

    private ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private int maxAttempts; // Redis客户端操作最大尝试次数【包含第一次操作】

    /**
     * <p> Redis客户端构造方法 (默认连接池名) </p>
     *
     * @since 1.0.0
     */
    private FleaRedisShardedClient() {
   
        this(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
    }

    /**
     * <p> Redis客户端构造方法(指定连接池名) </p>
     *
     * @param poolName 连接池名
     * @since 1.0.0
     */
    private FleaRedisShardedClient(String poolName) {
   
        super(poolName);
        init();
    }

    /**
     * <p> 初始化分布式Jedis连接池 </p>
     *
     * @since 1.0.0
     */
    private void init() {
   
        if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(getPoolName())) {
   
            shardedJedisPool = RedisShardedPool.getInstance().getJedisPool();
            maxAttempts = RedisShardedConfig.getConfig().getMaxAttempts();
        } else {
   
            shardedJedisPool = RedisShardedPool.getInstance(getPoolName()).getJedisPool();
            maxAttempts = CacheConfigUtils.getMaxAttempts();
        }
    }

    @Override
    public String set(final String key, final Object value) {
   
        return new RedisClientCommand<String>(this.shardedJedisPool, this.maxAttempts) {
   
            @Override
            public String execute(ShardedJedis connection) {
   
                if (value instanceof String)
                    return connection.set(key, (String) value);
                else
                    return connection.set(SafeEncoder.encode(key), ObjectUtils.serialize(value));
            }
        }.run();
    }

    @Override
    public String set(final byte[] key, final byte[] value) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final String key, final Object value, final int expiry) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final byte[] key, final byte[] value, final int expiry) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final String key, final Object value, final long expiry) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final byte[] key, final byte[] value, final long expiry) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final String key, final Object value, final SetParams params) {
   
        // 省略。。。。。。
    }

    @Override
    public String set(final byte[] key, final byte[] value, final SetParams params) {
   
        // 省略。。。。。。
    }

    @Override
    public byte[] get(final byte[] key) {
   
        // 省略。。。。。。
    }

    @Override
    public Long del(final String key) {
   
        // 省略。。。。。。
    }

    /**
     * <p> 获取客户端类 </p>
     *
     * @param key 数据键
     * @return 客户端类
     * @since 1.0.0
     */
    @Override
    protected Client getClientByKey(final Object key) {
   
        // 省略。。。。。。
    }

    /**
     * <p> 内部建造者类 </p>
     */
    public static class Builder {
   
        // 省略。。。。。。
    }
}

该类的构造函数初始化逻辑,可以看出我们使用了 RedisShardedPool, 下面来介绍一下。

3.6 定义Redis分片连接池

RedisShardedPool ,上个版本我们使用 RedisPool 初始化Redis相关配置信息,为了体现Redis分片模式,这个版本里面,我们使用 RedisShardedPool 用于Redis相关配置信息的初始化,其中重点是获取分布式Jedis连接池 ShardedJedisPool ,该类其中一个构造方法如下:

/**
 * @param poolConfig 连接池配置信息
 * @param shards Jedis分布式服务器列表
 * @param algo 分布式算法
 */
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards,
      Hashing algo)
/**
 * Redis分片连接池,用于初始化分布式 Jedis 连接池。
 *
 * <p> 针对单独缓存接入场景,采用默认连接池初始化的方式;<br/>
 * 可参考如下:
 * <pre>
 *   // 初始化默认连接池
 *   RedisShardedPool.getInstance().initialize(); </pre>
 *
 * <p> 针对整合缓存接入场景,采用指定连接池初始化的方式;<br/>
 * 可参考如下:
 * <pre>
 *   // 初始化指定连接池
 *   RedisShardedPool.getInstance(group).initialize(cacheServerList); </pre>
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class RedisShardedPool {
   

    private static final ConcurrentMap<String, RedisShardedPool> redisPools = new ConcurrentHashMap<>();

    private String poolName; // 连接池名

    private ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private RedisShardedPool(String poolName) {
   
        this.poolName = poolName;
    }

    /**
     * <p> 获取Redis连接池实例 (默认连接池) </p>
     *
     * @return Redis连接池实例对象
     * @since 1.0.0
     */
    public static RedisShardedPool getInstance() {
   
        return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
    }

    /**
     * <p> 获取Redis连接池实例 (指定连接池名) </p>
     *
     * @param poolName 连接池名
     * @return Redis连接池实例对象
     * @since 1.0.0
     */
    public static RedisShardedPool getInstance(String poolName) {
   
        if (!redisPools.containsKey(poolName)) {
   
            synchronized (redisPools) {
   
                if (!redisPools.containsKey(poolName)) {
   
                    RedisShardedPool redisShardedPool = new RedisShardedPool(poolName);
                    redisPools.putIfAbsent(poolName, redisShardedPool);
                }
            }
        }
        return redisPools.get(poolName);
    }

    /**
     * <p> 默认初始化 </p>
     *
     * @since 1.0.0
     */
    public void initialize() {
   
        // 省略。。。。。。
    }

    /**
     * <p> 初始化 (非默认连接池) </p>
     *
     * @param cacheServerList 缓存服务器集
     * @since 1.0.0
     */
    public void initialize(List<CacheServer> cacheServerList) {
   
        // 省略。。。。。。
    }

    /**
     * <p> 获取当前连接池名 </p>
     *
     * @return 连接池名
     * @since 1.0.0
     */
    public String getPoolName() {
   
        return poolName;
    }

    /**
     * <p> 分布式Jedis连接池 </p>
     *
     * @return 分布式Jedis连接池
     * @since 1.0.0
     */
    public ShardedJedisPool getJedisPool() {
   
        if (ObjectUtils.isEmpty(shardedJedisPool)) {
   
            throw new FleaCacheConfigException("获取分布式Jedis连接池失败:请先调用initialize初始化");
        }
        return shardedJedisPool;
    }
}

3.7 Redis配置文件

flea-cache读取 redis.properties(Redis配置文件),用作初始化 RedisShardedPool

# Redis配置
# Redis缓存所属系统名
redis.systemName=FleaFrame

# Redis服务器地址
redis.server=127.0.0.1:10001,127.0.0.1:10002,127.0.0.1:10003

# Redis服务登录密码
redis.password=huazie123,huazie123,huazie123

# Redis服务器权重分配
redis.weight=1,1,1

# Redis客户端socket连接超时时间(单位:ms)
redis.connectionTimeout=2000

# Redis客户端socket读写超时时间(单位:ms)
redis.soTimeout=2000

# Redis分布式hash算法
# 1 : MURMUR_HASH
# 2 : MD5
redis.hashingAlg=1

# Redis客户端连接池配置
# Jedis连接池最大连接数
redis.pool.maxTotal=100

# Jedis连接池最大空闲连接数
redis.pool.maxIdle=10

# Jedis连接池最小空闲连接数
redis.pool.minIdle=0

# Jedis连接池获取连接时的最大等待时间(单位:ms)
redis.pool.maxWaitMillis=2000

# Redis客户端操作最大尝试次数【包含第一次操作】
redis.maxAttempts=5

# 空缓存数据有效期(单位:s)
redis.nullCacheExpiry=10

3.8 定义Redis Flea缓存类

RedisFleaCache 继承抽象Flea缓存类 AbstractFleaCache ,其构造方法可见如需要传入Redis客户端 RedisClient ,相关使用下面介绍:

/**
 * Redis Flea缓存类,实现了以Flea框架操作Redis缓存的基本操作方法。
 *
 * <p> 在上述基本操作方法中,实际使用Redis客户端【{@code redisClient}】
 * 读、写和删除Redis缓存。其中写缓存方法【{@code putNativeValue}】在
 * 添加的数据值为【{@code null}】时,默认添加空缓存数据【{@code NullCache}】
 * 到Redis中,有效期取初始化参数【{@code nullCacheExpiry}】。
 *
 * <p> 单个缓存接入场景,有效期配置可查看【redis.properties】中的配置参数
 * 【redis.nullCacheExpiry】
 *
 * <p> 整合缓存接入场景,有效期配置可查看【flea-cache-config.xml】中的缓存参数
 * 【{@code <cache-param key="fleacore.nullCacheExpiry"
 * desc="空缓存数据有效期(单位:s)">300</cache-param>}】
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class RedisFleaCache extends AbstractFleaCache {
   

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisFleaCache.class);

    private RedisClient redisClient; // Redis客户端

    private CacheModeEnum cacheMode; // 缓存模式【分片模式和集群模式】

    /**
     * <p> 带参数的构造方法,初始化Redis Flea缓存类 </p>
     *
     * @param name            缓存数据主关键字
     * @param expiry          缓存数据有效期(单位:s)
     * @param nullCacheExpiry 空缓存数据有效期(单位:s)
     * @param cacheMode       缓存模式【分分片模式和集群模式】
     * @param redisClient     Redis客户端
     * @since 1.0.0
     */
    public RedisFleaCache(String name, int expiry, int nullCacheExpiry, CacheModeEnum cacheMode, RedisClient redisClient) {
   
        super(name, expiry, nullCacheExpiry);
        this.cacheMode = cacheMode;
        this.redisClient = redisClient;
        if (CacheUtils.isClusterMode(cacheMode))
            cache = CacheEnum.RedisCluster; // 缓存实现之Redis集群模式
        else
            cache = CacheEnum.RedisSharded; // 缓存实现之Redis分片模式
    }

    @Override
    public Object getNativeValue(String key) {
   
        if (LOGGER.isDebugEnabled()) {
   
            LOGGER.debug1(new Object() {
   }, "KEY = {}", key);
        }
        return redisClient.get(key);
    }

    @Override
    public Object putNativeValue(String key, Object value, int expiry) {
   
        if (LOGGER.isDebugEnabled()) {
   
            Object obj = new Object() {
   };
            LOGGER.debug1(obj, "REDIS FLEA CACHE, KEY = {}", key);
            LOGGER.debug1(obj, "REDIS FLEA CACHE, VALUE = {}", value);
            LOGGER.debug1(obj, "REDIS FLEA CACHE, EXPIRY = {}s", expiry);
            LOGGER.debug1(obj, "REDIS FLEA CACHE, NULL CACHE EXPIRY = {}s", getNullCacheExpiry());
        }
        if (ObjectUtils.isEmpty(value)) {
   
            return redisClient.set(key, new NullCache(key), getNullCacheExpiry());
        } else {
   
            if (expiry == CommonConstants.NumeralConstants.INT_ZERO) {
   
                return redisClient.set(key, value);
            } else {
   
                return redisClient.set(key, value, expiry);
            }
        }
    }

    @Override
    public Object deleteNativeValue(String key) {
   
        if (LOGGER.isDebugEnabled()) {
   
            LOGGER.debug1(new Object() {
   }, "KEY = {}", key);
        }
        return redisClient.del(key);
    }

    @Override
    public String getSystemName() {
   
        if (CacheUtils.isClusterMode(cacheMode))
            // 集群模式下获取缓存归属系统名
            return RedisClusterConfig.getConfig().getSystemName();
        else
            // 分片模式下获取缓存归属系统名
            return RedisShardedConfig.getConfig().getSystemName();
    }
}

3.9 定义抽象Flea缓存管理类

AbstractFleaCacheManager 可参考笔者的这篇博文 Memcached接入,不再赘述。

3.10 定义Redis分片模式Flea缓存管理类

RedisShardedFleaCacheManager 继承抽象Flea缓存管理类 AbstractFleaCacheManager,构造方法使用了 RedisClientFactory 获取分片模式下默认连接池的Redis客户端 RedisClient,可在 3.11 查看。newCache 方法返回的是 RedisFleaCache 的实例对象,每一类 Redis 缓存数据都对应了一个 RedisFleaCache 的实例对象。

/**
 * Redis分片模式Flea缓存管理类,用于接入Flea框架管理Redis缓存。
 *
 * <p> 它的默认构造方法,用于初始化分片模式下默认连接池的Redis客户端,
 * 这里需要先初始化Redis连接池,默认连接池名为【default】;
 * 然后通过Redis客户端工厂类来获取Redis客户端。
 *
 * <p> 方法 {@code newCache} 用于创建一个Redis Flea缓存,
 * 它里面包含了 读、写、删除 和 清空 缓存的基本操作。
 *
 * @author huazie
 * @version 1.1.0
 * @see RedisFleaCache
 * @since 1.0.0
 */
public class RedisShardedFleaCacheManager extends AbstractFleaCacheManager {
   

    private RedisClient redisClient; // Redis客户端

    /**
     * <p> 默认构造方法,初始化分片模式下默认连接池的Redis客户端 </p>
     *
     * @since 1.0.0
     */
    public RedisShardedFleaCacheManager() {
   
        // 初始化默认连接池
        RedisShardedPool.getInstance().initialize();
        // 获取分片模式下默认连接池的Redis客户端
        redisClient = RedisClientFactory.getInstance();
    }

    @Override
    protected AbstractFleaCache newCache(String name, int expiry) {
   
        int nullCacheExpiry = RedisShardedConfig.getConfig().getNullCacheExpiry();
        return new RedisFleaCache(name, expiry, nullCacheExpiry, CacheModeEnum.SHARDED, redisClient);
    }
}

3.11 定义Redis客户端工厂类

RedisClientFactory ,有四种方式获取 Redis 客户端:

  • 一是获取分片模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景【3.10 采用】;
  • 二是获取指定模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景;
  • 三是获取分片模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景;
  • 四是获取指定模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景。

/**
 * Redis客户端工厂,用于获取Redis客户端。
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class RedisClientFactory {
   

    private static final ConcurrentMap<String, RedisClient> redisClients = new ConcurrentHashMap<>();

    private RedisClientFactory() {
   
    }

    /**
     * 获取分片模式下默认连接池的Redis客户端
     *
     * @return 分片模式的Redis客户端
     * @since 1.0.0
     */
    public static RedisClient getInstance() {
   
        return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
    }

    /**
     * 获取指定模式下默认连接池的Redis客户端
     *
     * @param mode 缓存模式
     * @return 指定模式的Redis客户端
     * @since 1.1.0
     */
    public static RedisClient getInstance(CacheModeEnum mode) {
   
        return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME, mode);
    }

    /**
     * 获取分片模式下指定连接池的Redis客户端
     *
     * @param poolName 连接池名
     * @return 分片模式的Redis客户端
     * @since 1.0.0
     */
    public static RedisClient getInstance(String poolName) {
   
        return getInstance(poolName, CacheModeEnum.SHARDED);
    }

    /**
     * 获取指定模式下指定连接池的Redis客户端
     *
     * @param poolName 连接池名
     * @param mode     缓存模式
     * @return 指定模式的Redis客户端
     * @since 1.1.0
     */
    public static RedisClient getInstance(String poolName, CacheModeEnum mode) {
   
        String key = StringUtils.strCat(poolName, CommonConstants.SymbolConstants.UNDERLINE, StringUtils.valueOf(mode.getMode()));
        if (!redisClients.containsKey(key)) {
   
            synchronized (redisClients) {
   
                if (!redisClients.containsKey(key)) {
   
                    RedisClientStrategyContext context = new RedisClientStrategyContext(poolName);
                    redisClients.putIfAbsent(key, FleaStrategyFacade.invoke(mode.name(), context));
                }
            }
        }
        return redisClients.get(key);
    }
}

在上面 的 getInstance(String poolName, CacheModeEnum mode) 方法中,使用了 RedisClientStrategyContext ,用于定义 Redis 客户端策略上下文。根据不同的缓存模式,就可以找到对应的 Redis 客户端策略。

3.12 定义 Redis 客户端策略上下文

RedisClientStrategyContext 包含了 Redis 分片 和 Redis 集群 相关的客户端策略。

/**
 * Redis客户端策略上下文
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.1.0
 */
public class RedisClientStrategyContext extends FleaStrategyContext<RedisClient, String> {
   

    public RedisClientStrategyContext() {
   
        super();
    }

    public RedisClientStrategyContext(String contextParam) {
   
        super(contextParam);
    }

    @Override
    protected Map<String, IFleaStrategy<RedisClient, String>> init() {
   
        Map<String, IFleaStrategy<RedisClient, String>> fleaStrategyMap = new HashMap<>();
        fleaStrategyMap.put(CacheModeEnum.SHARDED.name(), new RedisShardedClientStrategy());
        fleaStrategyMap.put(CacheModeEnum.CLUSTER.name(), new RedisClusterClientStrategy());
        return Collections.unmodifiableMap(fleaStrategyMap);
    }
}

3.13 定义分片模式 Redis 客户端策略

RedisShardedClientStrategy 用于新建分片模式Redis客户端

/**
 * 分片模式Redis客户端 策略
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.1.0
 */
public class RedisShardedClientStrategy implements IFleaStrategy<RedisClient, String> {
   

    @Override
    public RedisClient execute(String poolName) throws FleaStrategyException {
   
        RedisClient originRedisClient;
        // 新建一个Flea Redis客户端类实例
        if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(poolName)) {
   
            originRedisClient = new FleaRedisShardedClient.Builder().build();
        } else {
   
            originRedisClient = new FleaRedisShardedClient.Builder(poolName).build();
        }
        return originRedisClient;
    }
}

好了,到这里我们可以来测试 Redis 分片模式。

3.14 Redis接入自测

单元测试类详见 FleaCacheTest

首先,这里需要按照 Redis 配置文件中的地址部署相应的 Redis 服务,可参考笔者的 这篇博文

    @Test
    public void testRedisShardedFleaCache() {
   
        try {
   
            // 分片模式下Flea缓存管理类
            AbstractFleaCacheManager manager = FleaCacheManagerFactory.getFleaCacheManager(CacheEnum.RedisSharded.getName());
            AbstractFleaCache cache = manager.getCache("fleaparadetail");
            LOGGER.debug("Cache={}", cache);
            //## 1.  简单字符串
            cache.put("menu1", "huazie");
            cache.put("menu2", null);
//            cache.get("menu1");
//            cache.get("menu2");
//            cache.delete("menu1");
//            cache.delete("menu2");
//            cache.clear();
            cache.getCacheKey();
            LOGGER.debug(cache.getCacheName() + ">>>" + cache.getCacheDesc());
        } catch (Exception e) {
   
            LOGGER.error("Exception:", e);
        }
    }

4. 进阶接入

4.1 定义抽象Spring缓存

AbstractSpringCache 可参考笔者的这篇博文 Memcached接入,不再赘述。

4.2 定义Redis Spring缓存类

RedisSpringCache 继承抽象 Spring 缓存 AbstractSpringCache,用于对接 Spring; 从构造方法可见,该类初始化还是使用 RedisFleaCache

/**
 * Redis Spring缓存类,继承了抽象Spring缓存父类的读、写、删除 和 清空
 * 缓存的基本操作方法,由Redis Spring缓存管理类初始化。
 *
 * <p> 它的构造方法中,必须传入一个具体Flea缓存实现类,这里我们使用
 * Redis Flea缓存【{@code RedisFleaCache}】。
 *
 * @author huazie
 * @version 1.1.0
 * @see RedisFleaCache
 * @since 1.0.0
 */
public class RedisSpringCache extends AbstractSpringCache {
   

    /**
     * <p> 带参数的构造方法,初始化Redis Spring缓存类 </p>
     *
     * @param name      缓存数据主关键字
     * @param fleaCache 具体Flea缓存实现
     * @since 1.0.0
     */
    public RedisSpringCache(String name, IFleaCache fleaCache) {
   
        super(name, fleaCache);
    }

    /**
     * <p> 带参数的构造方法,初始化Redis Spring缓存类 </p>
     *
     * @param name            缓存数据主关键字
     * @param expiry          缓存数据有效期(单位:s)
     * @param nullCacheExpiry 空缓存数据有效期(单位:s)
     * @param cacheMode       缓存模式【分片模式和集群模式】
     * @param redisClient     Redis客户端
     * @since 1.0.0
     */
    public RedisSpringCache(String name, int expiry, int nullCacheExpiry, CacheModeEnum cacheMode, RedisClient redisClient) {
   
        this(name, new RedisFleaCache(name, expiry, nullCacheExpiry, cacheMode, redisClient));
    }

}

4.3 定义抽象Spring缓存管理类

AbstractSpringCacheManager 可参考笔者的这篇博文 Memcached接入,不再赘述。

4.4 定义Redis分片模式Spring缓存管理类

RedisShardedSpringCacheManager 继承抽象 Spring 缓存管理类 AbstractSpringCacheManager,用于对接Spring; 基本实现同 RedisShardedFleaCacheManager,唯一不同在于 newCache 的实现。

/**
 * Redis分片模式Spring缓存管理类,用于接入Spring框架管理Redis缓存。
 *
 * <p> 它的默认构造方法,用于初始化分片模式下默认连接池的Redis客户端,
 * 这里需要先初始化Redis连接池,默认连接池名为【default】;
 * 然后通过Redis客户端工厂类来获取Redis客户端。
 *
 * <p> 方法【{@code newCache}】用于创建一个Redis Spring缓存,
 * 而它内部是由Redis Flea缓存实现具体的 读、写、删除 和 清空
 * 缓存的基本操作。
 *
 * @author huazie
 * @version 1.1.0
 * @see RedisSpringCache
 * @since 1.0.0
 */
public class RedisShardedSpringCacheManager extends AbstractSpringCacheManager {
   

    private RedisClient redisClient; // Redis客户端

    /**
     * <p> 默认构造方法,初始化分片模式下默认连接池的Redis客户端 </p>
     *
     * @since 1.0.0
     */
    public RedisShardedSpringCacheManager() {
   
        // 初始化默认连接池
        RedisShardedPool.getInstance().initialize();
        // 获取分片模式下默认连接池的Redis客户端
        redisClient = RedisClientFactory.getInstance();
    }

    @Override
    protected AbstractSpringCache newCache(String name, int expiry) {
   
        int nullCacheExpiry = RedisShardedConfig.getConfig().getNullCacheExpiry();
        return new RedisSpringCache(name, expiry, nullCacheExpiry, CacheModeEnum.SHARDED, redisClient);
    }
}

4.5 spring 配置

    <!--
        配置缓存管理 redisShardedSpringCacheManager
        配置缓存时间 configMap (key缓存对象名称 value缓存过期时间)
    -->
    <bean id="redisShardedSpringCacheManager" class="com.huazie.frame.cache.redis.manager.RedisShardedSpringCacheManager">
        <property name="configMap">
            <map>
                <entry key="fleaconfigdata" value="86400"/>
            </map>
        </property>
    </bean>

    <!-- 开启缓存 -->
    <cache:annotation-driven cache-manager="redisShardedSpringCacheManager" proxy-target-class="true"/>

4.6 缓存自测

    private ApplicationContext applicationContext;

    @Before
    public void init() {
   
        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        LOGGER.debug("ApplicationContext={}", applicationContext);
    }

    @Test
    public void testRedisShardedSpringCache() {
   
        try {
   
            // 分片模式下Spring缓存管理类
            AbstractSpringCacheManager manager = (RedisShardedSpringCacheManager) applicationContext.getBean("redisShardedSpringCacheManager");
            LOGGER.debug("RedisCacheManager={}", manager);

            AbstractSpringCache cache = manager.getCache("fleaconfigdata");
            LOGGER.debug("Cache={}", cache);

            Set<String> cacheKey = cache.getCacheKey();
            LOGGER.debug("CacheKey = {}", cacheKey);

            //## 1.  简单字符串
//            cache.put("menu1", "huazie");
//            cache.get("menu1");
//            cache.get("menu1", String.class);

            //## 2.  简单对象(要是可以序列化的对象)
//            String user = new String("huazie");
//            cache.put("user", user);
//            LOGGER.debug(cache.get("user", String.class));
//            cache.get("FLEA_RES_STATE");
//            cache.clear();

            //## 3.  List塞对象
//            List<String> userList = new ArrayList<>();
//            userList.add("huazie");
//            userList.add("lgh");
//            cache.put("user_list", userList);

//            LOGGER.debug(cache.get("user_list",userList.getClass()).toString());

        } catch (Exception e) {
   
            LOGGER.error("Exception:", e);
        }
    }

结语

Redis 接入重构工作已经全部结束,当前版本为 Redis 分片模式。下一篇博文,我将要介绍 《Redis 集群模式的接入》 工作,敬请期待!!!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis消息的生产者消费者模式
消息队列在Redis中可通过List数据结构实现,支持发布者订阅者和生产者消费者两种模式。生产者通过`lpush`向List添加消息,消费者通过`rpop`或`brpop`消费消息,后者支持阻塞等待。示例代码展示了如何使用Redis的生产者消费者模式。
|
2月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
74 4
|
2月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
30 2
|
2月前
|
存储 缓存 NoSQL
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
72 1
|
4月前
|
存储 NoSQL 算法
深入理解Redis分片Cluster原理
本文深入探讨了Redis Cluster的分片原理,作为Redis官方提供的高可用性和高性能解决方案,Redis Cluster通过数据分片和横向扩展能力,有效降低单个主节点的压力。
深入理解Redis分片Cluster原理
|
5月前
|
存储 NoSQL 算法
Redis 集群模式搭建
Redis 集群模式搭建
104 5
|
4月前
|
存储 缓存 NoSQL
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
|
4月前
|
存储 NoSQL Redis
Tair的发展问题之在Redis集群模式下,Lua脚本操作key面临什么问题,如何解决
Tair的发展问题之在Redis集群模式下,Lua脚本操作key面临什么问题,如何解决
|
4月前
|
NoSQL Java 调度
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
|
4月前
|
NoSQL 网络协议 安全
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决