最近遇到了一个麻烦的需求,我们需要一个微服务应用同时访问两个不同的 Redis 集群。一般我们不会这么使用 Redis,但是这两个 Redis 本来是不同业务集群,现在需要一个微服务同时访问。
其实我们在实际业务开发的时候,可能还会遇到类似的场景。例如 Redis 读写分离,这个也是 spring-data-redis 没有提供的功能,底层连接池例如 Lettuce 或者 Jedis 都提供了获取只读连接的 API,但是缺陷有两个:
- 上层 spring-data-redis 并没有封装这种接口
- 基于 redis 的架构实现的,哨兵模式需要配置 sentinel 的地址,集群模式需要感知集群拓扑,在云原生环境中,这些都默认被云提供商隐藏了,暴露到外面的只有一个个动态 VIP 域名。
因此,我们需要在 spring-data-redis 的基础上实现一个动态切换 Redis 连接的机制。
spring-data-redis 的配置类为:org.springframework.boot.autoconfigure.data.redis.RedisProperties
,可以配置单个 Redis 实例或者 Redis 集群的连接配置。根据这些配置,会生成统一的 Redis 连接工厂 RedisConnectionFactory
spring-data-redis 核心接口与背后的连接相关抽象关系为:
通过这个图,我们可以知道,我们实现一个可以动态返回不同 Redis 连接的 RedisConnectionFactory
即可,并且根据 spring-data-redis 的自动装载源码可以知道,框架内的所有 RedisConnectionFactory
是 @ConditionalOnMissingBean
的,即我们可以使用我们自己实现的 RedisConnectionFactory
进行替换。
项目地址: https://github.com/JoJoTec/spring-boot-starter-redis-related
我们可以给 RedisProperties
配置外层封装一个多 Redis 连接的配置,即MultiRedisProperties
:
@Data @NoArgsConstructor @ConfigurationProperties(prefix = "spring.redis") public class MultiRedisProperties { /** * 默认连接必须配置,配置 key 为 default */ public static final String DEFAULT = "default"; private boolean enableMulti = false; private Map<String, RedisProperties> multi; }
这个配置是在原有配置基础上的,也就是用户可以使用原有配置,也可以使用这种多 Redis 配置,就是需要配置 spring.redis.enable-multi=true
。multi 这个 Map 中放入的 key 是数据源名称,用户可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通过这个数据源名称指定用哪个 Redis。
接下来我们来实现 MultiRedisLettuceConnectionFactory
,即可以动态切换 Redis 连接的 RedisConnectionFactory
,我们的项目采用的 Redis 客户端是 Lettuce:
public class MultiRedisLettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { private final Map<String, LettuceConnectionFactory> connectionFactoryMap; private static final ThreadLocal<String> currentRedis = new ThreadLocal<>(); public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) { this.connectionFactoryMap = connectionFactoryMap; } public void setCurrentRedis(String currentRedis) { if (!connectionFactoryMap.containsKey(currentRedis)) { throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration"); } MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis); } @Override public void destroy() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy); } @Override public void afterPropertiesSet() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet); } private LettuceConnectionFactory currentLettuceConnectionFactory() { String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get(); if (StringUtils.isNotBlank(currentRedis)) { MultiRedisLettuceConnectionFactory.currentRedis.remove(); return connectionFactoryMap.get(currentRedis); } return connectionFactoryMap.get(MultiRedisProperties.DEFAULT); } @Override public ReactiveRedisConnection getReactiveConnection() { return currentLettuceConnectionFactory().getReactiveConnection(); } @Override public ReactiveRedisClusterConnection getReactiveClusterConnection() { return currentLettuceConnectionFactory().getReactiveClusterConnection(); } @Override public RedisConnection getConnection() { return currentLettuceConnectionFactory().getConnection(); } @Override public RedisClusterConnection getClusterConnection() { return currentLettuceConnectionFactory().getClusterConnection(); } @Override public boolean getConvertPipelineAndTxResults() { return currentLettuceConnectionFactory().getConvertPipelineAndTxResults(); } @Override public RedisSentinelConnection getSentinelConnection() { return currentLettuceConnectionFactory().getSentinelConnection(); } @Override public DataAccessException translateExceptionIfPossible(RuntimeException ex) { return currentLettuceConnectionFactory().translateExceptionIfPossible(ex); } }
逻辑非常简单,就是提供了设置 Redis 数据源的接口,并且放入了 ThreadLocal 中,并且仅对当前一次有效,读取后就清空。
然后,将 MultiRedisLettuceConnectionFactory 作为 Bean 注册到我们的 ApplicationContext 中:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false) @Configuration(proxyBeanMethods = false) public class RedisCustomizedConfiguration { /** * @param builderCustomizers * @param clientResources * @param multiRedisProperties * @return * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration */ @Bean public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory( ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers, ClientResources clientResources, MultiRedisProperties multiRedisProperties, ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider, ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider ) { //读取配置 Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap(); Map<String, RedisProperties> multi = multiRedisProperties.getMulti(); multi.forEach((k, v) -> { //这个其实就是框架中原有的源码使用 RedisProperties 的方式,我们其实就是在 RedisProperties 外面包装了一层而已 LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration( v, sentinelConfigurationProvider, clusterConfigurationProvider ); LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources); connectionFactoryMap.put(k, lettuceConnectionFactory); }); return new MultiRedisLettuceConnectionFactory(connectionFactoryMap); } }
我们来测试下,使用 embedded-redis 来启动本地 redis,从而实现单元测试。我们启动两个 Redis,在两个 Redis 中放入不同的 Key,验证是否存在,并且测试同步接口,多线程调用同步接口,和多次异步接口无等待订阅从而测试有效性。:
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.junit.jupiter.SpringExtension; import reactor.core.publisher.Mono; import redis.embedded.RedisServer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @ExtendWith(SpringExtension.class) @SpringBootTest(properties = { "spring.redis.enable-multi=true", "spring.redis.multi.default.host=127.0.0.1", "spring.redis.multi.default.port=6379", "spring.redis.multi.test.host=127.0.0.1", "spring.redis.multi.test.port=6380", }) public class MultiRedisTest { //启动两个 redis private static RedisServer redisServer; private static RedisServer redisServer2; @BeforeAll public static void setUp() throws Exception { System.out.println("start redis"); redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build(); redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build(); redisServer.start(); redisServer2.start(); System.out.println("redis started"); } @AfterAll public static void tearDown() throws Exception { System.out.println("stop redis"); redisServer.stop(); redisServer2.stop(); System.out.println("redis stopped"); } @EnableAutoConfiguration @Configuration public static class App { } @Autowired private StringRedisTemplate redisTemplate; @Autowired private ReactiveStringRedisTemplate reactiveRedisTemplate; @Autowired private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory; private void testMulti(String suffix) { //使用默认连接,设置 "testDefault" + suffix, "testDefault" 键值对 redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault"); //使用 test 连接,设置 "testSecond" + suffix, "testDefault" 键值对 multiRedisLettuceConnectionFactory.setCurrentRedis("test"); redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond"); //使用默认连接,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在 Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix)); Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix)); //使用 test 连接,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在 multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix)); multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix)); } //单次验证 @Test public void testMultiBlock() { testMulti(""); } //多线程验证 @Test public void testMultiBlockMultiThread() throws InterruptedException { Thread thread[] = new Thread[50]; AtomicBoolean result = new AtomicBoolean(true); for (int i = 0; i < thread.length; i++) { int finalI = i; thread[i] = new Thread(() -> { try { testMulti("" + finalI); } catch (Exception e) { e.printStackTrace(); result.set(false); } }); } for (int i = 0; i < thread.length; i++) { thread[i].start(); } for (int i = 0; i < thread.length; i++) { thread[i].join(); } Assertions.assertTrue(result.get()); } //reactive 接口验证 private Mono<Boolean> reactiveMulti(String suffix) { return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault") .flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond"); }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertTrue(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertTrue(b); return b; }); } //多次调用 reactive 验证,并且 subscribe,这本身就是多线程的 @Test public void testMultiReactive() throws InterruptedException { for (int i = 0; i < 10000; i++) { reactiveMulti("" + i).subscribe(System.out::println); } TimeUnit.SECONDS.sleep(10); } }
运行测试,通过。