spring-data-redis 动态切换数据源

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: spring-data-redis 动态切换数据源

最近遇到了一个麻烦的需求,我们需要一个微服务应用同时访问两个不同的 Redis 集群。一般我们不会这么使用 Redis,但是这两个 Redis 本来是不同业务集群,现在需要一个微服务同时访问。

其实我们在实际业务开发的时候,可能还会遇到类似的场景。例如 Redis 读写分离,这个也是 spring-data-redis 没有提供的功能,底层连接池例如 Lettuce 或者 Jedis 都提供了获取只读连接的 API,但是缺陷有两个:

  • 上层 spring-data-redis 并没有封装这种接口
  • 基于 redis 的架构实现的,哨兵模式需要配置 sentinel 的地址,集群模式需要感知集群拓扑,在云原生环境中,这些都默认被云提供商隐藏了,暴露到外面的只有一个个动态 VIP 域名。

因此,我们需要在 spring-data-redis 的基础上实现一个动态切换 Redis 连接的机制



image.png


spring-data-redis 的配置类为:org.springframework.boot.autoconfigure.data.redis.RedisProperties,可以配置单个 Redis 实例或者 Redis 集群的连接配置。根据这些配置,会生成统一的 Redis 连接工厂 RedisConnectionFactory

spring-data-redis 核心接口与背后的连接相关抽象关系为:


微信图片_20220625165905.jpg


通过这个图,我们可以知道,我们实现一个可以动态返回不同 Redis 连接的 RedisConnectionFactory 即可,并且根据 spring-data-redis 的自动装载源码可以知道,框架内的所有 RedisConnectionFactory@ConditionalOnMissingBean 的,即我们可以使用我们自己实现的 RedisConnectionFactory 进行替换。


image.png


项目地址: https://github.com/JoJoTec/spring-boot-starter-redis-related

我们可以给 RedisProperties 配置外层封装一个多 Redis 连接的配置,即MultiRedisProperties

@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "spring.redis")
public class MultiRedisProperties {
    /**
     * 默认连接必须配置,配置 key 为 default
     */
    public static final String DEFAULT = "default";
    private boolean enableMulti = false;
    private Map<String, RedisProperties> multi;
}

这个配置是在原有配置基础上的,也就是用户可以使用原有配置,也可以使用这种多 Redis 配置,就是需要配置 spring.redis.enable-multi=truemulti 这个 Map 中放入的 key 是数据源名称,用户可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通过这个数据源名称指定用哪个 Redis

接下来我们来实现 MultiRedisLettuceConnectionFactory,即可以动态切换 Redis 连接的 RedisConnectionFactory,我们的项目采用的 Redis 客户端是 Lettuce:

public class MultiRedisLettuceConnectionFactory
        implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
    private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
    private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
    public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
        this.connectionFactoryMap = connectionFactoryMap;
    }
    public void setCurrentRedis(String currentRedis) {
        if (!connectionFactoryMap.containsKey(currentRedis)) {
            throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");
        }
        MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
    }
    @Override
    public void destroy() throws Exception {
        connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
    }
    private LettuceConnectionFactory currentLettuceConnectionFactory() {
        String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
        if (StringUtils.isNotBlank(currentRedis)) {
            MultiRedisLettuceConnectionFactory.currentRedis.remove();
            return connectionFactoryMap.get(currentRedis);
        }
        return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
    }
    @Override
    public ReactiveRedisConnection getReactiveConnection() {
        return currentLettuceConnectionFactory().getReactiveConnection();
    }
    @Override
    public ReactiveRedisClusterConnection getReactiveClusterConnection() {
        return currentLettuceConnectionFactory().getReactiveClusterConnection();
    }
    @Override
    public RedisConnection getConnection() {
        return currentLettuceConnectionFactory().getConnection();
    }
    @Override
    public RedisClusterConnection getClusterConnection() {
        return currentLettuceConnectionFactory().getClusterConnection();
    }
    @Override
    public boolean getConvertPipelineAndTxResults() {
        return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
    }
    @Override
    public RedisSentinelConnection getSentinelConnection() {
        return currentLettuceConnectionFactory().getSentinelConnection();
    }
    @Override
    public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
        return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
    }
}

逻辑非常简单,就是提供了设置 Redis 数据源的接口,并且放入了 ThreadLocal 中,并且仅对当前一次有效,读取后就清空。

然后,将 MultiRedisLettuceConnectionFactory 作为 Bean 注册到我们的 ApplicationContext 中:

@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)
@Configuration(proxyBeanMethods = false)
public class RedisCustomizedConfiguration {
    /**
     * @param builderCustomizers
     * @param clientResources
     * @param multiRedisProperties
     * @return
     * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
     */
    @Bean
    public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(
            ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
            ClientResources clientResources,
            MultiRedisProperties multiRedisProperties,
            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider
    ) {
        //读取配置
        Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();
        Map<String, RedisProperties> multi = multiRedisProperties.getMulti();
        multi.forEach((k, v) -> {
            //这个其实就是框架中原有的源码使用 RedisProperties 的方式,我们其实就是在 RedisProperties 外面包装了一层而已
            LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(
                    v,
                    sentinelConfigurationProvider,
                    clusterConfigurationProvider
            );
            LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);
            connectionFactoryMap.put(k, lettuceConnectionFactory);
        });
        return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
    }
}



image.png


我们来测试下,使用 embedded-redis 来启动本地 redis,从而实现单元测试。我们启动两个 Redis,在两个 Redis 中放入不同的 Key,验证是否存在,并且测试同步接口,多线程调用同步接口,和多次异步接口无等待订阅从而测试有效性。:

import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        "spring.redis.enable-multi=true",
        "spring.redis.multi.default.host=127.0.0.1",
        "spring.redis.multi.default.port=6379",
        "spring.redis.multi.test.host=127.0.0.1",
        "spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
    //启动两个 redis
    private static RedisServer redisServer;
    private static RedisServer redisServer2;
    @BeforeAll
    public static void setUp() throws Exception {
        System.out.println("start redis");
        redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
        redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
        redisServer.start();
        redisServer2.start();
        System.out.println("redis started");
    }
    @AfterAll
    public static void tearDown() throws Exception {
        System.out.println("stop redis");
        redisServer.stop();
        redisServer2.stop();
        System.out.println("redis stopped");
    }
    @EnableAutoConfiguration
    @Configuration
    public static class App {
    }
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private ReactiveStringRedisTemplate reactiveRedisTemplate;
    @Autowired
    private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;
    private void testMulti(String suffix) {
        //使用默认连接,设置 "testDefault" + suffix, "testDefault" 键值对
        redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
        //使用 test 连接,设置 "testSecond" + suffix, "testDefault" 键值对
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
        //使用默认连接,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在
        Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
        Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
        //使用 test 连接,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
        multiRedisLettuceConnectionFactory.setCurrentRedis("test");
        Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
    }
    //单次验证
    @Test
    public void testMultiBlock() {
        testMulti("");
    }
    //多线程验证
    @Test
    public void testMultiBlockMultiThread() throws InterruptedException {
        Thread thread[] = new Thread[50];
        AtomicBoolean result = new AtomicBoolean(true);
        for (int i = 0; i < thread.length; i++) {
            int finalI = i;
            thread[i] = new Thread(() -> {
                try {
                    testMulti("" + finalI);
                } catch (Exception e) {
                    e.printStackTrace();
                    result.set(false);
                }
            });
        }
        for (int i = 0; i < thread.length; i++) {
            thread[i].start();
        }
        for (int i = 0; i < thread.length; i++) {
            thread[i].join();
        }
        Assertions.assertTrue(result.get());
    }
    //reactive 接口验证
    private Mono<Boolean> reactiveMulti(String suffix) {
        return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
                .flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
                }).flatMap(b -> {
                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
                }).map(b -> {
                    Assertions.assertTrue(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
                }).map(b -> {
                    Assertions.assertFalse(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
                }).map(b -> {
                    Assertions.assertFalse(b);
                    System.out.println(Thread.currentThread().getName());
                    return b;
                }).flatMap(b -> {
                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");
                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
                }).map(b -> {
                    Assertions.assertTrue(b);
                    return b;
                });
    }
    //多次调用 reactive 验证,并且 subscribe,这本身就是多线程的
    @Test
    public void testMultiReactive() throws InterruptedException {
        for (int i = 0; i < 10000; i++) {
            reactiveMulti("" + i).subscribe(System.out::println);
        }
        TimeUnit.SECONDS.sleep(10);
    }
}

运行测试,通过。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
前端开发 Java 数据库连接
基于Spring boot轻松实现一个多数据源框架
基于Spring boot轻松实现一个多数据源框架
322 0
|
XML Java 数据库连接
spring管理数据源和引入外部属性文件~
spring管理数据源和引入外部属性文件~
|
8月前
|
前端开发 Java 数据库连接
Spring Boot 3 整合 Mybatis-Plus 动态数据源实现多数据源切换
Spring Boot 3 整合 Mybatis-Plus 动态数据源实现多数据源切换
|
Java 数据库连接 Go
如何在Spring Boot应用中使用Nacos实现动态更新数据源
如何在Spring Boot应用中使用Nacos实现动态更新数据源
658 0
|
7月前
|
DataWorks NoSQL Java
DataWorks操作报错合集之数据集成使用公共数据集成资源组写入到redis数据源(使用的是VPC连接),提示以下错误:request action:[InnerVpcGrantVpcInstanceAccessToApp], message:[InvalidInstanceId.怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
存储 Java 数据库
Spring Boot中的动态数据源切换
Spring Boot中的动态数据源切换
|
7月前
|
Java Spring
spring基于注解配置数据源
spring基于注解配置数据源
|
8月前
|
设计模式 Java 数据库连接
【Spring源码】JDBC数据源访问实现
我们再来看看阅读线索三,这方面我们从设计模式进行入手。阅读线索三:从这个模块可以学到什么我们看下以下代码,PreparedStatement实例的是由PreparedStatementCreator实现的。再来看看PreparedStatementCreator接口,一共有三个子类实现。也就是说PreparedStatement的三种不同实现被封装到三个子类中,而具体需要哪种实现,只需要传入不同。
【Spring源码】JDBC数据源访问实现
|
Java Spring
java202304java学习笔记第六十天-ssm-spring配置文件-spring产生数据源对象2
java202304java学习笔记第六十天-ssm-spring配置文件-spring产生数据源对象2
49 0
java202304java学习笔记第六十天-ssm-spring配置文件-数据源开发步骤
java202304java学习笔记第六十天-ssm-spring配置文件-数据源开发步骤
46 0