文章目录
一般我们使用redis最多的场景还是作为缓存中间件使用,redis也能做为消息队列使用,但这不是Redis的强项,不过如果需要的话还是可以使用的。
redis的发布订阅
集成到springboot中
- 引入redis starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
- 新建一个RedisMessageListenerConfig
创建 RedisMessageListenerConfig之间要先自定义定义一个接口RedisPubSub接口,这个接口用于处理收到的信息,如果实现发布订阅只需实现这个接口即可。
public interface RedisPubSub { /** * 接收消息 * @param message */ void receiveMessage(String message); /** * 发布订阅监听的topic key * @return */ CacheKeyEnum getCacheKeyEnum(); }
public enum CacheKeyEnum { /** * 消息订阅 */ PUBSUB_QUEUE("pubsub:queue" , 0L), ; // 缓存键名 private String key; /** * 过期时间,单位秒 * 0 表示不过期 */ private Long expireTime; //省略getter、setter }
RedisMessageListenerConfig.java
@Component public class RedisMessageListenerConfig { // 如果项目中没有RedisPubSub实现类,启动会报错,所以设置required = false @Autowired(required = false) private Set<RedisPubSub> redisPubSubs; /** * 创建连接工厂 * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , Map<? extends MessageListener, Collection<? extends Topic>> listenerAdapters){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageListeners(listenerAdapters); return container; } @Bean public Map<MessageListener, Collection<Topic>> listenerAdapters(){ if (!CollectionUtils.isEmpty(redisPubSubs)) { Map<MessageListener, Collection<Topic>> map = new HashMap<>(redisPubSubs.size()); for (RedisPubSub redisPubSub : redisPubSubs) { final CacheKeyEnum cacheKeyEnum = redisPubSub.getCacheKeyEnum(); // redis会利用反射调用receiveMessage方法 final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisPubSub, "receiveMessage"); messageListenerAdapter.afterPropertiesSet(); map.put(messageListenerAdapter , Collections.singletonList(new PatternTopic(cacheKeyEnum.getKey()))); } return map; } return Collections.emptyMap(); } }
准备redis工具类:
@Slf4j @Component public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 消息发布 * @param cacheKeyEnum * @param message */ public void publish(CacheKeyEnum cacheKeyEnum , Object message){ redisTemplate.convertAndSend(cacheKeyEnum.getKey() , message); } /** * 反序列化redis数据 * @param value * @param <T> * @return */ public <T> T deserialize(String value){ final RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer(); final Object deserialize = valueSerializer.deserialize(value.getBytes()); return deserialize == null ? null : (T) deserialize; } }
集成测试
- 实现RedisPubSub
@Component public class PubsubQueue implements RedisPubSub { @Autowired private RedisUtil redisUtil; /** * 接收消息 * * @param message */ @Override public void receiveMessage(String message) { System.out.println(message); final UserEntity deserialize = redisUtil.deserialize(message); System.out.println("getId="+deserialize.getId()); } /** * 发布订阅监听的topic key * * @return */ @Override public CacheKeyEnum getCacheKeyEnum() { return CacheKeyEnum.PUBSUB_QUEUE; } }
- 单元测试
@Test public void testRedisQueue(){ UserEntity userEntity = new UserEntity(); userEntity.setId(123456L); redisUtil.publish(CacheKeyEnum.PUBSUB_QUEUE , userEntity); }
控制台输出:
使用redis发布订阅的注意点:
RedisPubSub 中receiveMessage接收的参数是String类型,redis在发布订阅中接收到的对象是字节数组,控制台打印是一个json格式的,如果redis用的是默认的JdkSerializationRedisSerializer序列化类,直接想通过将String转成JSON是不行的,会报错的,而且如果用的是jdk的序列化类,要发布的对象必须实现Serializable接口,否则也会报错。
所以可以通过上面的redisUtil中的反序列化方法来进行对象的转化,这样不管是不是用的是什么序列化类都不会报错。
Redis发布订阅的缺点:
消息不持久化,一旦订阅者没收到消息或者重启,消息将丢失。
相对于专业的消息中间件来说,Redis的发布订阅相对简单,慎用即可。
能力一般,水平有限,如有错误,请多指出。
如果对你有用点个关注给个赞呗