一、背景介绍
公司最近有业务需求如下:
- 1.小美老师给五年级三班上数学课的时候,实现给所在班级进行实时推送数学课程的活动(广播通信)
- 2.小明在上课的时候给小红的评论进行了点赞,此时小红会收到小明给你点赞这样的通知(点对点通信)
二、思路&方案
- 基于背景中的需求,我想到了redis的发布/订阅模式;后端向前端发消息使用websocket建立长链接就好
- 这里只介绍redis的发布/订阅模式实现
- 通过命令、普通工程集成、springboot实现三种方式进行实现理解
三、过程
redis发布/订阅官方讲解
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
命令行实现
1.以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:
redis 127.0.0.1:6379> SUBSCRIBE redisChat Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "redisChat" 3) (integer) 1
2.现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。
redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique" (integer) 1 redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by w3cschool.cc" (integer) 1 # 订阅者的客户端会显示如下消息 1) "message" 2) "redisChat" 3) "Redis is a great caching technique" 1) "message" 2) "redisChat" 3) "Learn redis by w3cschool.cc"
普通工程实现
1.pom文件引入redis包
<!-- redis的发布订阅--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
2.发布者类Publisher
package com.b0022redis发布订阅; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * 发布者类 */ public class Publisher extends Thread{ private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接 while (true) { String line = null; try { line = reader.readLine(); if (!"quit".equals(line)) { jedis.publish("mychannel", line); //从 mychannel 的频道上推送消息 } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }
3.订阅者类SubThread
package com.b0022redis发布订阅; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * 订阅者类 */ public class SubThread extends Thread { private final JedisPool jedisPool; private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) { super("SubThread"); this.jedisPool = jedisPool; } @Override public void run() { System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel)); Jedis jedis = null; try { jedis = jedisPool.getResource(); //取出一个连接 jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名 } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { jedis.close(); } } } }
4.消息监听回调类Subscriber
package com.b0022redis发布订阅; import redis.clients.jedis.JedisPubSub; /** * redis消息监听回调类 */ public class Subscriber extends JedisPubSub { public Subscriber(){} @Override public void onMessage(String channel, String message) { //收到消息会调用 System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用 System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用 System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } }
5.客户端类Client
package com.b0022redis发布订阅; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * redis 发布订阅java版本,目前是一个订阅,一个发布 * 参考文章:https://blog.csdn.net/fengyuyeguirenenen/article/details/123424105 * */ public class Client { public static void main( String[] args ) { // 连接redis服务端 JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379); System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379)); SubThread subThread = new SubThread(jedisPool); //订阅者 subThread.start(); Publisher publisher = new Publisher(jedisPool); //发布者 publisher.start(); } }
springboot工程集成实现
1.pom文件引入的包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- <scope>compile</scope>--> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>
2.监听实现类CatListener
package com.mark; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * 监听发送的消息 */ @Component public class CatListener extends MessageListenerAdapter implements MessageListener { @Autowired RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] bytes) { System.out.println("我是Cat监听" + message.toString()); } }
3.redis消息配置,添加监听类RedisMessageConfig
package com.mark; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; @Configuration public class RedisMessageConfig { /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, CatListener catAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat 的通道 container.addMessageListener(catAdapter, new PatternTopic("cat")); //这个container 可以添加多个 messageListener return container; } /** * redis 读取内容的template */ @Bean StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); //定义value的序列化方式 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashKeySerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
4.发送者类TestController
package com.mark; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class TestController { @Resource StringRedisTemplate stringRedisTemplate; @Resource RedisMessageListenerContainer container; @GetMapping("cat") public void sendCatMessage() { stringRedisTemplate.convertAndSend("cat", "猫"); } }
5.配置文件application.yml
server: port: 8080 spring: redis: host: 127.0.0.1 database: 12 password: port: 6379
6.启动类Client
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.mark") public class Client { public static void main(String[] args) { SpringApplication.run(Client.class, args); } }
springboot工程集成改造实现动态点对点、广播
1.pom文件引入的包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>
2.监听实现类StudentListener
package com.mark; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * 监听发送的消息 */ @Component public class StudentListener extends MessageListenerAdapter implements MessageListener { private String id; private String name; StudentListener(String id,String name){ this.id = id; this.name = name; } StudentListener(){ } @Override public void onMessage(Message message, byte[] bytes) { System.out.println("我是监听者"+this.name+",我的id是:"+this.id+";我收到的消息是:" + message.toString()); } }
3.redis消息配置,添加监听类RedisMessageConfig
package com.mark; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; @Configuration public class RedisMessageConfig { /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //这个container 可以添加多个 messageListener return container; } /** * redis 读取内容的template */ @Bean StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); //定义value的序列化方式 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashKeySerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
4.添加监听、移除监听、发送消息类TestController
package com.mark; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.ConcurrentHashMap; @RestController public class TestController { ConcurrentHashMap<String,StudentListener> map = new ConcurrentHashMap<>(); @Resource StringRedisTemplate stringRedisTemplate; @Resource RedisMessageListenerContainer container; @GetMapping("pushMany") public String pushMany(@RequestParam(value="courseId") String courseId, @RequestParam(value="classId") String classId, @RequestParam(value="message") String message) { stringRedisTemplate.convertAndSend(courseId+"/"+classId, message); return "广播发送成功"; } @GetMapping("pushOne") public String pushOne(@RequestParam(value="id") String id, @RequestParam(value="message") String message) { stringRedisTemplate.convertAndSend(id, message); return "点对点发送成功"; } @GetMapping("addListener") public String addListener(@RequestParam(value="courseId") String courseId, @RequestParam(value="classId") String classId, @RequestParam(value="id") String id, @RequestParam(value="name") String name){ if(map.containsKey(id)){ return name+"已经添加过监听"; }else { StudentListener studentListener = new StudentListener(id,name); container.addMessageListener(studentListener,new PatternTopic(courseId+"/"+classId)); container.addMessageListener(studentListener,new PatternTopic(id)); map.put(id,studentListener); } return name + "监听添加成功"; } @GetMapping("removeListener") public String removeListener(@RequestParam(value="courseId") String courseId, @RequestParam(value="classId") String classId, @RequestParam(value="id") String id, @RequestParam(value="name") String name){ if(map.containsKey(id)){ container.removeMessageListener(map.get(id)); map.remove(id); }else { return name + "没有进行监听,无须移除"; } return name + "移除监听成功"; } }
5.配置文件application.yml
server: port: 8080 spring: redis: host: 127.0.0.1 database: 12 password: port: 6379
6.启动类Client
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.mark") public class Client { public static void main(String[] args) { SpringApplication.run(Client.class, args); } }
7.实现的效果
四、总结
- 1.通过对redis的发布/订阅的多场景分析,不同代码的实现,对于如何运用有了更加明确的理解
- 2.以后再有类似的需求和框架明确了着力点;先找最本质的逻辑,再一点点去包装
- 3.之前就陷入到和框架死磕的结果上,其中涉及到的封装层比较多,看起来就比较乱
- 4.后续还需要再去查阅redis发布/订阅的内部实现是如何进行的?
- 5.后续还需要结合websocket进行针对性的研究,从netty的角度来查看长链接通信
五、升华
- 1.通过这个例子的整理,对于道和术的层面有了更加深刻的理解,在这里道就是要先了解它本质,然后再通过术一步步包装进行实现
- 2.举一反三这种自信心的增加,一件事真正从道的角度去理解了,所谓术的层面就相当容易了
注:引用文章
https://www.redis.net.cn/tutorial/3514.html