redis的发布/订阅(命令、普通工程、springboot实现)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)

一、背景介绍


公司最近有业务需求如下:


  • 1.小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)


  • 2.小明在上课的时候给小红的评论进行了点赞,此时小红会收到小明给你点赞这样的通知(点对点通信)


二、思路&方案


  • 基于背景中的需求,我想到了redis的发布/订阅模式;后端向前端发消息使用websocket建立长链接就好


  • 这里只介绍redis的发布/订阅模式实现


  • 通过命令、普通工程集成、springboot实现三种方式进行实现理解


三、过程


redis发布/订阅官方讲解


Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。


Redis 客户端可以订阅任意数量的频道。


下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:


efaa02dfee4648418287931ba309e5d7.png


当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:


e3f9ab35427341a6a9d0549d8406dd1f.png


命令行实现


1.以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:


redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1


2.现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。


redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by w3cschool.cc"
(integer) 1
# 订阅者的客户端会显示如下消息
1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by w3cschool.cc"


普通工程实现


1.pom文件引入redis包


<!--        redis的发布订阅-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>


2.发布者类Publisher


package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
 * 发布者类
 */
public class Publisher extends Thread{
    private final JedisPool jedisPool;
    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);   //从 mychannel 的频道上推送消息
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


3.订阅者类SubThread


package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
 * 订阅者类
 */
public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "mychannel";
    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            jedis.subscribe(subscriber, channel);    //通过subscribe 的api去订阅,入参是订阅者和频道名
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}


4.消息监听回调类Subscriber


package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPubSub;
/**
 * redis消息监听回调类
 */
public class Subscriber extends JedisPubSub {
    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
}


5.客户端类Client


package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
 * redis 发布订阅java版本,目前是一个订阅,一个发布
 * 参考文章:https://blog.csdn.net/fengyuyeguirenenen/article/details/123424105
 *
 */
public class Client {
    public static void main( String[] args )
    {
        // 连接redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));
        SubThread subThread = new SubThread(jedisPool);  //订阅者
        subThread.start();
        Publisher publisher = new Publisher(jedisPool);    //发布者
        publisher.start();
    }
}


springboot工程集成实现


1.pom文件引入的包


   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
<!--            <scope>compile</scope>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>


2.监听实现类CatListener


package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * 监听发送的消息
 */
@Component
public class CatListener extends MessageListenerAdapter implements MessageListener  {
    @Autowired
    RedisTemplate redisTemplate;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是Cat监听" + message.toString());
    }
}


3.redis消息配置,添加监听类RedisMessageConfig


package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            CatListener catAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫chat 的通道
        container.addMessageListener(catAdapter, new PatternTopic("cat"));
        //这个container 可以添加多个 messageListener
        return container;
    }
    /**
     * redis 读取内容的template
     */
    @Bean
    StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //定义value的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}


4.发送者类TestController


package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    RedisMessageListenerContainer container;
    @GetMapping("cat")
    public void sendCatMessage() {
        stringRedisTemplate.convertAndSend("cat", "猫");
    }
}


5.配置文件application.yml


server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    database: 12
    password:
    port: 6379


6.启动类Client


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
    public static void main(String[] args) {
        SpringApplication.run(Client.class, args);
    }
}


springboot工程集成改造实现动态点对点、广播


1.pom文件引入的包


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>


2.监听实现类StudentListener


package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * 监听发送的消息
 */
@Component
public class StudentListener extends MessageListenerAdapter implements MessageListener  {
    private String id;
    private String name;
    StudentListener(String id,String name){
        this.id = id;
        this.name = name;
    }
    StudentListener(){
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是监听者"+this.name+",我的id是:"+this.id+";我收到的消息是:" + message.toString());
    }
}


3.redis消息配置,添加监听类RedisMessageConfig


package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //这个container 可以添加多个 messageListener
        return container;
    }
    /**
     * redis 读取内容的template
     */
    @Bean
    StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //定义value的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}


4.添加监听、移除监听、发送消息类TestController


package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class TestController {
    ConcurrentHashMap<String,StudentListener> map = new ConcurrentHashMap<>();
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    RedisMessageListenerContainer container;
    @GetMapping("pushMany")
    public String pushMany(@RequestParam(value="courseId") String courseId,
                               @RequestParam(value="classId") String classId,
                               @RequestParam(value="message") String message) {
        stringRedisTemplate.convertAndSend(courseId+"/"+classId, message);
        return "广播发送成功";
    }
    @GetMapping("pushOne")
    public String pushOne(@RequestParam(value="id") String id,
                               @RequestParam(value="message") String message) {
        stringRedisTemplate.convertAndSend(id, message);
        return "点对点发送成功";
    }
    @GetMapping("addListener")
    public String addListener(@RequestParam(value="courseId") String courseId,
                            @RequestParam(value="classId") String classId,
                            @RequestParam(value="id") String id,
                            @RequestParam(value="name") String name){
        if(map.containsKey(id)){
            return name+"已经添加过监听";
        }else {
            StudentListener studentListener = new StudentListener(id,name);
            container.addMessageListener(studentListener,new PatternTopic(courseId+"/"+classId));
            container.addMessageListener(studentListener,new PatternTopic(id));
            map.put(id,studentListener);
        }
        return name + "监听添加成功";
    }
    @GetMapping("removeListener")
    public String removeListener(@RequestParam(value="courseId") String courseId,
                            @RequestParam(value="classId") String classId,
                            @RequestParam(value="id") String id,
                            @RequestParam(value="name") String name){
        if(map.containsKey(id)){
            container.removeMessageListener(map.get(id));
            map.remove(id);
        }else {
            return name + "没有进行监听,无须移除";
        }
        return name + "移除监听成功";
    }
}


5.配置文件application.yml


server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    database: 12
    password:
    port: 6379


6.启动类Client


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
    public static void main(String[] args) {
        SpringApplication.run(Client.class, args);
    }
}


7.实现的效果


b8d02005d8fc40e99f3882bd24dda9f8.png


四、总结


  • 1.通过对redis的发布/订阅的多场景分析,不同代码的实现,对于如何运用有了更加明确的理解
  • 2.以后再有类似的需求和框架明确了着力点;先找最本质的逻辑,再一点点去包装
  • 3.之前就陷入到和框架死磕的结果上,其中涉及到的封装层比较多,看起来就比较乱
  • 4.后续还需要再去查阅redis发布/订阅的内部实现是如何进行的?
  • 5.后续还需要结合websocket进行针对性的研究,从netty的角度来查看长链接通信


五、升华


  • 1.通过这个例子的整理,对于道和术的层面有了更加深刻的理解,在这里道就是要先了解它本质,然后再通过术一步步包装进行实现


  • 2.举一反三这种自信心的增加,一件事真正从道的角度去理解了,所谓术的层面就相当容易了


注:引用文章

https://www.redis.net.cn/tutorial/3514.html

https://blog.csdn.net/qq_32867467/article/details/82944209

https://blog.csdn.net/tttttt521/article/details/118612526

相关文章
|
1月前
|
NoSQL Java 网络安全
SpringBoot启动时连接Redis报错:ERR This instance has cluster support disabled - 如何解决?
通过以上步骤一般可以解决由于配置不匹配造成的连接错误。在调试问题时,一定要确保服务端和客户端的Redis配置保持同步一致。这能够确保SpringBoot应用顺利连接到正确配置的Redis服务,无论是单机模式还是集群模式。
204 5
|
6月前
|
存储 缓存 监控
Redis设计与实现——Redis命令参考与高级特性
Redis 是一个高性能的键值存储系统,支持丰富的数据类型(字符串、列表、哈希、集合等)和多种高级功能。本文档涵盖 Redis 的核心命令分类,包括数据类型操作、事务与脚本、持久化、集群管理、系统监控等。特别介绍了事务的原子性特性、Lua 脚本的执行方式及优势、排序机制、发布订阅模型以及慢查询日志和监视器工具的使用方法。适用于开发者快速掌握 Redis 常用命令及其应用场景,优化系统性能与可靠性。
|
2月前
|
存储 缓存 NoSQL
Redis基础命令与数据结构概览
Redis是一个功能强大的键值存储系统,提供了丰富的数据结构以及相应的操作命令来满足现代应用程序对于高速读写和灵活数据处理的需求。通过掌握这些基础命令,开发者能够高效地对Redis进行操作,实现数据存储和管理的高性能方案。
116 12
|
2月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
211 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
2月前
|
存储 消息中间件 NoSQL
【Redis】常用数据结构之List篇:从常用命令到典型使用场景
本文将系统探讨 Redis List 的核心特性、完整命令体系、底层存储实现以及典型实践场景,为读者构建从理论到应用的完整认知框架,助力开发者在实际业务中高效运用这一数据结构解决问题。
|
3月前
|
存储 缓存 人工智能
Redis六大常见命令详解:从set/get到过期策略的全方位解析
本文将通过结构化学习路径,帮助读者实现从命令语法掌握到工程化实践落地的能力跃迁,系统性提升 Redis 技术栈的应用水平。
|
4月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
205 8
|
6月前
|
存储 缓存 NoSQL
Redis中的常用命令-get&set&keys&exists&expire&ttl&type的详细解析
总的来说,这些Redis命令提供了处理存储在内存中的键值对的便捷方式。通过理解和运用它们,你可以更有效地在Redis中操作数据,使其更好地服务于你的应用。
457 17
|
6月前
|
消息中间件 NoSQL Linux
Redis的基本介绍和安装方式(包括Linux和Windows版本),以及常用命令的演示
Redis(Remote Dictionary Server)是一个高性能的开源键值存储数据库。它支持字符串、列表、散列、集合等多种数据类型,具有持久化、发布/订阅等高级功能。由于其出色的性能和广泛的使用场景,Redis在应用程序中常作为高速缓存、消息队列等用途。
920 16
|
6月前
|
JSON NoSQL Redis
在Rocky9系统上安装并使用redis-dump和redis-load命令的指南
以上步骤是在Rocky9上使用redis-dump和redis-load命令顺利出行的秘籍。如果在实行的过程中,发现了新的冒险和挑战,那么就像一个勇敢的航海家,本着探索未知的决心,解决问题并前进。
244 14