Spring boot整合Redis实现发布订阅(超详细)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统Redis客户端可以订阅任意数量的频道

基础知识

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统
Redis客户端可以订阅任意数量的频道

订阅/发布消息图:
在这里插入图片描述
剖析:
1.消息发送者,2.频道,3.消息订阅者

下图展示频道channel1,以及订阅这个频道的三个客户端--client2,client5和client1之间的关系
在这里插入图片描述
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端
在这里插入图片描述


相关命令

订阅者/等待接收消息

首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1

使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。

发布者/发送消息

下面再启动一个 Redis 客户端,输入如下命令:

127.0.0.1:6379> PUBLISH bbx hello
(integer) 1
127.0.0.1:6379> PUBLISH bbx world
(integer) 1
127.0.0.1:6379> 

订阅者/成功接收消息

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
#等待读取推送消息
1) "message"    #消息
2) "bbx"        #频道
3) "hello"        #消息具体内容
1) "message"
2) "bbx"
3) "world"

常用命令汇总

命令 说明
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合指定模式的频道
PUBSUB subcommand [argument [argument ...]] 查看发布/订阅系统状态,可选参数
1) channel 返回在线状态的频道
2) numpat 返回指定模式的订阅者数量
3) numsub 返回指定频道的订阅者数量
PUBSUB subcommand [argument [argument ...]] 将信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有指定模式的频道
SUBSCRIBE channel [channel ...] 订阅一个或者多个频道的消息
UNSUBSCRIBE [channel [channel ...]] 退订指定的频道

原理

Redis是使用C实现的,可以通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现

Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能

通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者

Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。


Spring boot整合redis

导入依赖

 <!--操作redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

Redis配置

#SpringBoot 所有的配置类,都有一个自动配置类 RedisAutoConfiguration
#自动配置类都每绑定一个properties配置文件  RedisProperties

#配置redis
spring.redis.host=localhost
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=*****
#默认是数据库0
spring.redis.database= 0

# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

消息封装类(MessageDto)

@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {
    private String data;
    private String title;
    private String content;
}

Redis配置类

@Configuration
public class RedisConfig {
    //编写配置类,可模仿RedisAutoConfiguration配置类,该类在开发中可直接使用
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        //由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(factory);

        //Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用string的序列化
        template.setKeySerializer(stringRedisSerializer);
        //hash的key采用string的序列化
       template.setHashKeySerializer(stringRedisSerializer);
        //value序列化采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式采用jackson
       template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    /**
     * Redis消息监听器容器
     * 这个容器加载了RedisConnectionFactory和消息监听器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     *
     * @param redisConnectionFactory 连接工厂
     * @param adapter                适配器
     * @return redis消息监听容器
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                                   RedisMessageListener listener,
                                                   MessageListenerAdapter adapter) {


        final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
        final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 监听所有库的key过期事件
        container.setConnectionFactory(redisConnectionFactory);
        // 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息
        // 可以添加多个 messageListener,配置不同的通道
        container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
        container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

        /**
         * 设置序列化对象
         * 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
         *         2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);

        return container;
    }

    /**
     * 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
     * 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage
     *
     * @param printMessageReceiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(PrintMessageReceiver printMessageReceiver) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "receiveMessage");

        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
}

该类中,可以通过调用消息接收容器(container)的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;在这里我们分别使用两种实现方式去订阅两个不通的频道(channel)。

  1. RedisMessageListener 通过实现MessageListener接口,从而实现该接口中的onMessage(Message message, byte[] pattern)方法。
  2. MessageListenerAdapter 通过适配器的方式,自定义一个消息接收类PrintMessageReceiver和接收消息的方法
  container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));
  container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

分别使用listener去订阅主题TOPIC_NAME1,adapter去订阅TOPIC_NAME2。
接下来分别探讨测试这两种方式。

测试类

@Slf4j
@SpringBootTest
public class RedisMessageTest {

    @Autowired
    private RedisUtils redisUtils;

    @Test
    public void test(){
        final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题
        final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
        // 发布消息
        MessageDto dto = new MessageDto();
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        dto.setData(timeFormatter.format(now));
        dto.setTitle("日常信息");
        dto.setContent("hello world!");

        redisUtils.convertAndSend(TOPIC_NAME1, dto);

    }
}

该类中的RedisUtils是之前自己封装的一个工具类,在该类中新增convertAndSend()方法。
RedisUtils中其他方法可跳转此连接查看

    /**
     * 向通道发布消息
     */
    public boolean convertAndSend(String channel, Object message) {
        if (!StringUtils.hasText(channel)) {
            return false;
        }
        try {
            redisTemplate.convertAndSend(channel, message);
            log.info("发送消息成功,channel:{},message:{}", channel, message);
            return true;
        } catch (Exception e) {
            log.info("发送消息失败,channel:{},message:{}", channel, message);
            e.printStackTrace();
        }
        return false;
    }

订阅方实现一:RedisMessageListener

@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        // 接收的topic
        log.info("channel:" + new String(pattern));

        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        MessageDto messageDto = (MessageDto) redisTemplate.getValueSerializer().deserialize(message.getBody());
        log.info(messageDto.getData()+","+messageDto.getContent());
    }
}

对使用RedisMessageListener 进行接收消息测试。
测试结果:
在这里插入图片描述

订阅方实现二:PrintMessageReceiver

@Slf4j
@Component
public class PrintMessageReceiver {
    @Autowired
    private RedisTemplate redisTemplate;

    public void receiveMessage(MessageDto messageDto , String channel) {

        // 接收的topic
        log.info("channel:" + channel);

        log.info("message:" + messageDto.getTitle());
    }
}

注意:该方法的接收参数类型以及顺序,查阅源码得知,该方法的参数可以是一个(只有消息message),也可是两个(message,channel)并且顺序不能变。
在测试类中将 redisUtils.convertAndSend(TOPIC_NAME1, dto);中的TOPIC_NAME1改为TOPIC_NAME2,
测试结果:
在这里插入图片描述

MessageListenerAdapter源码分析

  1. 构造函数
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
        this(delegate);
        setDefaultListenerMethod(defaultListenerMethod);
    }

其中this()方法中,初始化了序列化方式,该适配器默认的序列化方式是UTF-8的字符串序列化。
在这里插入图片描述
在这里插入图片描述
2.onMessage()

@Override
    public void onMessage(Message message, @Nullable byte[] pattern) {
        try {
            // Check whether the delegate is a MessageListener impl itself.
            // In that case, the adapter will simply act as a pass-through.
            if (delegate != this) {
                if (delegate instanceof MessageListener) {
                    ((MessageListener) delegate).onMessage(message, pattern);
                    return;
                }
            }

            // Regular case: find a handler method reflectively.
            Object convertedMessage = extractMessage(message);
            String convertedChannel = stringSerializer.deserialize(pattern);
            // Invoke the handler method with appropriate arguments.
            Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

            invokeListenerMethod(invoker.getMethodName(), listenerArguments);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

该方法当订阅频道有消息时默认执行,首先,if(delegate instanceof MessageListener)判断该对象的类是不是实现了MessageListener接口,如果是,就会执行它实现的onMessage()。很显然,我们是自定义的
PrintMessageReceiver 对象,,所以接着往下看。
Object convertedMessage = extractMessage(message);会将message反序列化,如未自定义序列化方式,就会用使用默认的字符串序列化,这就是为什么我们在RedisConfig类中注入listenerAdapter对象时,自定义了Jackson2JsonRedisSerializer 。
在这里插入图片描述

  1. invokeListenerMethod(invoker.getMethodName(), listenerArguments);
    通过反射查找定义对象中处理消息的方法。我们会看到如下的方法实现。
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {

            Object[] message = new Object[] { arguments[0] };

            for (Method m : methods) {

                Class<?>[] types = m.getParameterTypes();
                Object[] args = //
                        types.length == 2 //
                                && types[0].isInstance(arguments[0]) //
                                && types[1].isInstance(arguments[1]) ? arguments : message;

                if (!types[0].(args[0])) {
                    continue;
                }

                m.invoke(delegate, args);

                return;
            }
        }

在这里插入图片描述
从而得知我们自定义方法中参数个数可以是一个也可以是两个,如两个参数时,第一个参数接收消息(message),第二个参数接收频道(channel),也可得知为什么自定义方法中,接收消息参数类型我们可以直接写MessageDto。


以上内容如有不对之处,还望不吝赐教。

目录
相关文章
|
15天前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
5月前
|
NoSQL 安全 Java
深入理解 RedisConnectionFactory:Spring Data Redis 的核心组件
在 Spring Data Redis 中,`RedisConnectionFactory` 是核心组件,负责创建和管理与 Redis 的连接。它支持单机、集群及哨兵等多种模式,为上层组件(如 `RedisTemplate`)提供连接抽象。Spring 提供了 Lettuce 和 Jedis 两种主要实现,其中 Lettuce 因其线程安全和高性能特性被广泛推荐。通过手动配置或 Spring Boot 自动化配置,开发者可轻松集成 Redis,提升应用性能与扩展性。本文深入解析其作用、实现方式及常见问题解决方法,助你高效使用 Redis。
558 4
|
6月前
|
NoSQL Java 关系型数据库
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
271 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
|
2月前
|
NoSQL Java Redis
Redis基本数据类型及Spring Data Redis应用
Redis 是开源高性能键值对数据库,支持 String、Hash、List、Set、Sorted Set 等数据结构,适用于缓存、消息队列、排行榜等场景。具备高性能、原子操作及丰富功能,是分布式系统核心组件。
364 2
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
181 32
|
6月前
|
NoSQL Java API
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Spring Boot 集成 Redis
本文介绍了在Spring Boot中集成Redis的方法,包括依赖导入、Redis配置及常用API的使用。通过导入`spring-boot-starter-data-redis`依赖和配置`application.yml`文件,可轻松实现Redis集成。文中详细讲解了StringRedisTemplate的使用,适用于字符串操作,并结合FastJSON将实体类转换为JSON存储。还展示了Redis的string、hash和list类型的操作示例。最后总结了Redis在缓存和高并发场景中的应用价值,并提供课程源代码下载链接。
1642 0
|
6月前
|
NoSQL Java Redis
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 安装
本教程介绍在 VMware 虚拟机(CentOS 7)或阿里云服务器中安装 Redis 的过程,包括安装 gcc 编译环境、下载 Redis(官网或 wget)、解压安装、修改配置文件(如 bind、daemonize、requirepass 等设置)、启动 Redis 服务及测试客户端连接。通过 set 和 get 命令验证安装是否成功。适用于初学者快速上手 Redis 部署。
147 0
|
2月前
|
Java Spring 容器
SpringBoot自动配置的原理是什么?
Spring Boot自动配置核心在于@EnableAutoConfiguration注解,它通过@Import导入配置选择器,加载META-INF/spring.factories中定义的自动配置类。这些类根据@Conditional系列注解判断是否生效。但Spring Boot 3.0后已弃用spring.factories,改用新格式的.imports文件进行配置。
731 0
|
6月前
|
前端开发 Java 数据库
微服务——SpringBoot使用归纳——Spring Boot集成Thymeleaf模板引擎——Thymeleaf 介绍
本课介绍Spring Boot集成Thymeleaf模板引擎。Thymeleaf是一款现代服务器端Java模板引擎,支持Web和独立环境,可实现自然模板开发,便于团队协作。与传统JSP不同,Thymeleaf模板可以直接在浏览器中打开,方便前端人员查看静态原型。通过在HTML标签中添加扩展属性(如`th:text`),Thymeleaf能够在服务运行时动态替换内容,展示数据库中的数据,同时兼容静态页面展示,为开发带来灵活性和便利性。
312 0
|
2月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
386 0
第07课:Spring Boot集成Thymeleaf模板引擎