Spring boot整合Redis实现发布订阅(超详细)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统Redis客户端可以订阅任意数量的频道

基础知识

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统
Redis客户端可以订阅任意数量的频道

订阅/发布消息图:
在这里插入图片描述
剖析:
1.消息发送者,2.频道,3.消息订阅者

下图展示频道channel1,以及订阅这个频道的三个客户端--client2,client5和client1之间的关系
在这里插入图片描述
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端
在这里插入图片描述


相关命令

订阅者/等待接收消息

首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1

使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。

发布者/发送消息

下面再启动一个 Redis 客户端,输入如下命令:

127.0.0.1:6379> PUBLISH bbx hello
(integer) 1
127.0.0.1:6379> PUBLISH bbx world
(integer) 1
127.0.0.1:6379> 

订阅者/成功接收消息

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
#等待读取推送消息
1) "message"    #消息
2) "bbx"        #频道
3) "hello"        #消息具体内容
1) "message"
2) "bbx"
3) "world"

常用命令汇总

命令 说明
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合指定模式的频道
PUBSUB subcommand [argument [argument ...]] 查看发布/订阅系统状态,可选参数
1) channel 返回在线状态的频道
2) numpat 返回指定模式的订阅者数量
3) numsub 返回指定频道的订阅者数量
PUBSUB subcommand [argument [argument ...]] 将信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有指定模式的频道
SUBSCRIBE channel [channel ...] 订阅一个或者多个频道的消息
UNSUBSCRIBE [channel [channel ...]] 退订指定的频道

原理

Redis是使用C实现的,可以通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现

Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能

通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者

Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。


Spring boot整合redis

导入依赖

 <!--操作redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

Redis配置

#SpringBoot 所有的配置类,都有一个自动配置类 RedisAutoConfiguration
#自动配置类都每绑定一个properties配置文件  RedisProperties

#配置redis
spring.redis.host=localhost
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=*****
#默认是数据库0
spring.redis.database= 0

# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

消息封装类(MessageDto)

@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {
    private String data;
    private String title;
    private String content;
}

Redis配置类

@Configuration
public class RedisConfig {
    //编写配置类,可模仿RedisAutoConfiguration配置类,该类在开发中可直接使用
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        //由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(factory);

        //Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用string的序列化
        template.setKeySerializer(stringRedisSerializer);
        //hash的key采用string的序列化
       template.setHashKeySerializer(stringRedisSerializer);
        //value序列化采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式采用jackson
       template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    /**
     * Redis消息监听器容器
     * 这个容器加载了RedisConnectionFactory和消息监听器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     *
     * @param redisConnectionFactory 连接工厂
     * @param adapter                适配器
     * @return redis消息监听容器
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                                   RedisMessageListener listener,
                                                   MessageListenerAdapter adapter) {


        final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
        final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 监听所有库的key过期事件
        container.setConnectionFactory(redisConnectionFactory);
        // 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息
        // 可以添加多个 messageListener,配置不同的通道
        container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
        container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

        /**
         * 设置序列化对象
         * 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
         *         2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);

        return container;
    }

    /**
     * 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
     * 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage
     *
     * @param printMessageReceiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(PrintMessageReceiver printMessageReceiver) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "receiveMessage");

        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
}

该类中,可以通过调用消息接收容器(container)的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;在这里我们分别使用两种实现方式去订阅两个不通的频道(channel)。

  1. RedisMessageListener 通过实现MessageListener接口,从而实现该接口中的onMessage(Message message, byte[] pattern)方法。
  2. MessageListenerAdapter 通过适配器的方式,自定义一个消息接收类PrintMessageReceiver和接收消息的方法
  container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
  container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

分别使用listener去订阅主题TOPIC_NAME1,adapter去订阅TOPIC_NAME2。
接下来分别探讨测试这两种方式。

测试类

@Slf4j
@SpringBootTest
public class RedisMessageTest {

    @Autowired
    private RedisUtils redisUtils;

    @Test
    public void test(){
        final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
        final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
        // 发布消息
        MessageDto dto = new MessageDto();
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        dto.setData(timeFormatter.format(now));
        dto.setTitle("日常信息");
        dto.setContent("hello world!");

        redisUtils.convertAndSend(TOPIC_NAME1, dto);

    }
}

该类中的RedisUtils是之前自己封装的一个工具类,在该类中新增convertAndSend()方法。
RedisUtils中其他方法可跳转此连接查看

    /**
     * 向通道发布消息
     */
    public boolean convertAndSend(String channel, Object message) {
        if (!StringUtils.hasText(channel)) {
            return false;
        }
        try {
            redisTemplate.convertAndSend(channel, message);
            log.info("发送消息成功,channel:{},message:{}", channel, message);
            return true;
        } catch (Exception e) {
            log.info("发送消息失败,channel:{},message:{}", channel, message);
            e.printStackTrace();
        }
        return false;
    }

订阅方实现一:RedisMessageListener

@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        // 接收的topic
        log.info("channel:" + new String(pattern));

        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        MessageDto messageDto = (MessageDto) redisTemplate.getValueSerializer().deserialize(message.getBody());
        log.info(messageDto.getData()+","+messageDto.getContent());
    }
}

对使用RedisMessageListener 进行接收消息测试。
测试结果:
在这里插入图片描述

订阅方实现二:PrintMessageReceiver

@Slf4j
@Component
public class PrintMessageReceiver {
    @Autowired
    private RedisTemplate redisTemplate;

    public void receiveMessage(MessageDto messageDto , String channel) {

        // 接收的topic
        log.info("channel:" + channel);

        log.info("message:" + messageDto.getTitle());
    }
}

注意:该方法的接收参数类型以及顺序,查阅源码得知,该方法的参数可以是一个(只有消息message),也可是两个(message,channel)并且顺序不能变。
在测试类中将 redisUtils.convertAndSend(TOPIC_NAME1, dto);中的TOPIC_NAME1改为TOPIC_NAME2,
测试结果:
在这里插入图片描述

MessageListenerAdapter源码分析

  1. 构造函数
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
        this(delegate);
        setDefaultListenerMethod(defaultListenerMethod);
    }

其中this()方法中,初始化了序列化方式,该适配器默认的序列化方式是UTF-8的字符串序列化。
在这里插入图片描述
在这里插入图片描述
2.onMessage()

@Override
    public void onMessage(Message message, @Nullable byte[] pattern) {
        try {
            // Check whether the delegate is a MessageListener impl itself.
            // In that case, the adapter will simply act as a pass-through.
            if (delegate != this) {
                if (delegate instanceof MessageListener) {
                    ((MessageListener) delegate).onMessage(message, pattern);
                    return;
                }
            }

            // Regular case: find a handler method reflectively.
            Object convertedMessage = extractMessage(message);
            String convertedChannel = stringSerializer.deserialize(pattern);
            // Invoke the handler method with appropriate arguments.
            Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

            invokeListenerMethod(invoker.getMethodName(), listenerArguments);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

该方法当订阅频道有消息时默认执行,首先,if(delegate instanceof MessageListener)判断该对象的类是不是实现了MessageListener接口,如果是,就会执行它实现的onMessage()。很显然,我们是自定义的
PrintMessageReceiver 对象,,所以接着往下看。
Object convertedMessage = extractMessage(message);会将message反序列化,如未自定义序列化方式,就会用使用默认的字符串序列化,这就是为什么我们在RedisConfig类中注入listenerAdapter对象时,自定义了Jackson2JsonRedisSerializer 。
在这里插入图片描述

  1. invokeListenerMethod(invoker.getMethodName(), listenerArguments);
    通过反射查找定义对象中处理消息的方法。我们会看到如下的方法实现。
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {

            Object[] message = new Object[] { arguments[0] };

            for (Method m : methods) {

                Class<?>[] types = m.getParameterTypes();
                Object[] args = //
                        types.length == 2 //
                                && types[0].isInstance(arguments[0]) //
                                && types[1].isInstance(arguments[1]) ? arguments : message;

                if (!types[0].(args[0])) {
                    continue;
                }

                m.invoke(delegate, args);

                return;
            }
        }

在这里插入图片描述
从而得知我们自定义方法中参数个数可以是一个也可以是两个,如两个参数时,第一个参数接收消息(message),第二个参数接收频道(channel),也可得知为什么自定义方法中,接收消息参数类型我们可以直接写MessageDto。


以上内容如有不对之处,还望不吝赐教。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8天前
|
NoSQL Redis
Redis 发布订阅
10月更文挑战第18天
18 1
Redis 发布订阅
|
26天前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
30 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
11天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
31 2
|
27天前
|
NoSQL Java Redis
在 Spring 中操作 Redis
本文详细介绍了在Spring框架中如何通过引入依赖、配置文件、使用StringRedisTemplate类以及执行原生命令等方式来操作Redis数据库,并提供了对String、List、Set、Hash和ZSet数据类型的操作示例。
46 0
在 Spring 中操作 Redis
|
2月前
|
NoSQL 网络协议 Java
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
34 7
|
2月前
|
NoSQL Java 网络安全
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
|
1月前
|
存储 NoSQL Java
Spring Boot项目中使用Redis实现接口幂等性的方案
通过上述方法,可以有效地在Spring Boot项目中利用Redis实现接口幂等性,既保证了接口操作的安全性,又提高了系统的可靠性。
29 0
|
2月前
|
消息中间件 存储 NoSQL
18)Redis 的发布订阅模型
18)Redis 的发布订阅模型
28 0
|
NoSQL Java 数据库
|
Java Spring 数据格式
spring 整合redis
用的是最新的jedis-2.6.2.jar这个包,这个和以前的有点不同。还需要添加spring-data-redis-1.2.1.RELEASE.jar和commons-pool2-2.3.jar。 在类路径下创建spring-redis-config.
934 0