redis的发布/订阅(命令、普通工程、springboot实现)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)

一、背景介绍


公司最近有业务需求如下:


  • 1.小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)


  • 2.小明在上课的时候给小红的评论进行了点赞,此时小红会收到小明给你点赞这样的通知(点对点通信)


二、思路&方案


  • 基于背景中的需求,我想到了redis的发布/订阅模式;后端向前端发消息使用websocket建立长链接就好


  • 这里只介绍redis的发布/订阅模式实现


  • 通过命令、普通工程集成、springboot实现三种方式进行实现理解


三、过程


redis发布/订阅官方讲解


Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。


Redis 客户端可以订阅任意数量的频道。


下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:


efaa02dfee4648418287931ba309e5d7.png


当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:


e3f9ab35427341a6a9d0549d8406dd1f.png


命令行实现


1.以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:


redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1


2.现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。


redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by w3cschool.cc"
(integer) 1
# 订阅者的客户端会显示如下消息
1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by w3cschool.cc"


普通工程实现


1.pom文件引入redis包


<!--        redis的发布订阅-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>


2.发布者类Publisher


package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
 * 发布者类
 */
public class Publisher extends Thread{
    private final JedisPool jedisPool;
    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);   //从 mychannel 的频道上推送消息
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


3.订阅者类SubThread


package com.b0022redis发布订阅;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
 * 订阅者类
 */
public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "mychannel";
    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            jedis.subscribe(subscriber, channel);    //通过subscribe 的api去订阅,入参是订阅者和频道名
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}


4.消息监听回调类Subscriber


package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPubSub;
/**
 * redis消息监听回调类
 */
public class Subscriber extends JedisPubSub {
    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
}


5.客户端类Client


package com.b0022redis发布订阅;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
 * redis 发布订阅java版本,目前是一个订阅,一个发布
 * 参考文章:https://blog.csdn.net/fengyuyeguirenenen/article/details/123424105
 *
 */
public class Client {
    public static void main( String[] args )
    {
        // 连接redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));
        SubThread subThread = new SubThread(jedisPool);  //订阅者
        subThread.start();
        Publisher publisher = new Publisher(jedisPool);    //发布者
        publisher.start();
    }
}


springboot工程集成实现


1.pom文件引入的包


   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
<!--            <scope>compile</scope>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>


2.监听实现类CatListener


package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * 监听发送的消息
 */
@Component
public class CatListener extends MessageListenerAdapter implements MessageListener  {
    @Autowired
    RedisTemplate redisTemplate;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是Cat监听" + message.toString());
    }
}


3.redis消息配置,添加监听类RedisMessageConfig


package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            CatListener catAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫chat 的通道
        container.addMessageListener(catAdapter, new PatternTopic("cat"));
        //这个container 可以添加多个 messageListener
        return container;
    }
    /**
     * redis 读取内容的template
     */
    @Bean
    StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //定义value的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}


4.发送者类TestController


package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    RedisMessageListenerContainer container;
    @GetMapping("cat")
    public void sendCatMessage() {
        stringRedisTemplate.convertAndSend("cat", "猫");
    }
}


5.配置文件application.yml


server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    database: 12
    password:
    port: 6379


6.启动类Client


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
    public static void main(String[] args) {
        SpringApplication.run(Client.class, args);
    }
}


springboot工程集成改造实现动态点对点、广播


1.pom文件引入的包


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>


2.监听实现类StudentListener


package com.mark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * 监听发送的消息
 */
@Component
public class StudentListener extends MessageListenerAdapter implements MessageListener  {
    private String id;
    private String name;
    StudentListener(String id,String name){
        this.id = id;
        this.name = name;
    }
    StudentListener(){
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是监听者"+this.name+",我的id是:"+this.id+";我收到的消息是:" + message.toString());
    }
}


3.redis消息配置,添加监听类RedisMessageConfig


package com.mark;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisMessageConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //这个container 可以添加多个 messageListener
        return container;
    }
    /**
     * redis 读取内容的template
     */
    @Bean
    StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //定义value的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}


4.添加监听、移除监听、发送消息类TestController


package com.mark;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class TestController {
    ConcurrentHashMap<String,StudentListener> map = new ConcurrentHashMap<>();
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    RedisMessageListenerContainer container;
    @GetMapping("pushMany")
    public String pushMany(@RequestParam(value="courseId") String courseId,
                               @RequestParam(value="classId") String classId,
                               @RequestParam(value="message") String message) {
        stringRedisTemplate.convertAndSend(courseId+"/"+classId, message);
        return "广播发送成功";
    }
    @GetMapping("pushOne")
    public String pushOne(@RequestParam(value="id") String id,
                               @RequestParam(value="message") String message) {
        stringRedisTemplate.convertAndSend(id, message);
        return "点对点发送成功";
    }
    @GetMapping("addListener")
    public String addListener(@RequestParam(value="courseId") String courseId,
                            @RequestParam(value="classId") String classId,
                            @RequestParam(value="id") String id,
                            @RequestParam(value="name") String name){
        if(map.containsKey(id)){
            return name+"已经添加过监听";
        }else {
            StudentListener studentListener = new StudentListener(id,name);
            container.addMessageListener(studentListener,new PatternTopic(courseId+"/"+classId));
            container.addMessageListener(studentListener,new PatternTopic(id));
            map.put(id,studentListener);
        }
        return name + "监听添加成功";
    }
    @GetMapping("removeListener")
    public String removeListener(@RequestParam(value="courseId") String courseId,
                            @RequestParam(value="classId") String classId,
                            @RequestParam(value="id") String id,
                            @RequestParam(value="name") String name){
        if(map.containsKey(id)){
            container.removeMessageListener(map.get(id));
            map.remove(id);
        }else {
            return name + "没有进行监听,无须移除";
        }
        return name + "移除监听成功";
    }
}


5.配置文件application.yml


server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    database: 12
    password:
    port: 6379


6.启动类Client


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.mark")
public class Client {
    public static void main(String[] args) {
        SpringApplication.run(Client.class, args);
    }
}


7.实现的效果


b8d02005d8fc40e99f3882bd24dda9f8.png


四、总结


  • 1.通过对redis的发布/订阅的多场景分析,不同代码的实现,对于如何运用有了更加明确的理解
  • 2.以后再有类似的需求和框架明确了着力点;先找最本质的逻辑,再一点点去包装
  • 3.之前就陷入到和框架死磕的结果上,其中涉及到的封装层比较多,看起来就比较乱
  • 4.后续还需要再去查阅redis发布/订阅的内部实现是如何进行的?
  • 5.后续还需要结合websocket进行针对性的研究,从netty的角度来查看长链接通信


五、升华


  • 1.通过这个例子的整理,对于道和术的层面有了更加深刻的理解,在这里道就是要先了解它本质,然后再通过术一步步包装进行实现


  • 2.举一反三这种自信心的增加,一件事真正从道的角度去理解了,所谓术的层面就相当容易了


注:引用文章

https://www.redis.net.cn/tutorial/3514.html

https://blog.csdn.net/qq_32867467/article/details/82944209

https://blog.csdn.net/tttttt521/article/details/118612526

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
19天前
|
NoSQL Java API
springboot项目Redis统计在线用户
通过本文的介绍,您可以在Spring Boot项目中使用Redis实现在线用户统计。通过合理配置Redis和实现用户登录、注销及统计逻辑,您可以高效地管理在线用户。希望本文的详细解释和代码示例能帮助您在实际项目中成功应用这一技术。
27 3
|
21天前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
39 1
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
58 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
92 2
|
20天前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis的消息发布与订阅
本文介绍了Redis实现消息队列的两种场景:发布者订阅者模式和生产者消费者模式。其中,发布者订阅者模式通过channel频道进行解耦,订阅者监听特定channel的消息,当发布者向该channel发送消息时,所有订阅者都能接收到消息。文章还提供了相关操作命令及示例代码,展示了如何使用Redis实现消息的发布与订阅。
|
23天前
|
存储 NoSQL Java
Redis命令:列表模糊删除详解
通过本文的介绍,我们详细探讨了如何在Redis中实现列表的模糊删除。虽然Redis没有直接提供模糊删除命令,但可以通过组合使用 `LRANGE`和 `LREM`命令,并在客户端代码中进行模糊匹配,来实现这一功能。希望本文能帮助你在实际应用中更有效地操作Redis列表。
35 0
|
2月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
2月前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
31 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
|
2月前
|
缓存 NoSQL 测试技术
Redis如何解决频繁的命令往返造成的性能瓶颈!
Redis如何解决频繁的命令往返造成的性能瓶颈!
|
27天前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
36 0