1.概念
Redis的发布订阅模式是一种消息通信机制,其中发布者(Publisher)可以向一个或多个订阅者(Subscriber)发送消息。订阅者可以订阅一个或多个主题,以便只接收与这些主题相关的消息。
2.组成
在发布订阅模式中,通常有以下几个组件:
- 发布者(Publisher):负责向一个或多个订阅者发送消息。
- 订阅者(Subscriber):订阅一个或多个主题,并接收与这些主题相关的消息。
- 消息主题(Topic):用于区分不同类型的消息。
当发布者发送消息时,该消息会被发送到相应的主题。然后,订阅了该主题的订阅者会接收到该消息。
这就像是一个发布/订阅关系,发布者发布消息,订阅者订阅主题并接收消息。
3.集成
spring: profiles: active: dev redis: host: 127.0.0.1 port: 6379 password:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
4.单一主题
package com.lp.redis; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * @author liu pei * @date 2023年12月13日 下午7:05 * @Description: */ @Configuration public class RedisConfig { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); //订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener container.addMessageListener(listener, new ChannelTopic("redis.life")); container.addMessageListener(listener, new ChannelTopic("redis.news")); return container; } }
package com.lp.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; /** * @author liu pei * @date 2023年12月13日 下午7:01 * @Description: 这个在其他项目实现就可以 */ @Component public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { // 获取消息 byte[] messageBody = message.getBody(); // 使用值序列化器转换 Object msg = redisTemplate.getValueSerializer().deserialize(messageBody); // 获取监听的频道 byte[] channelByte = message.getChannel(); // 使用字符串序列化器转换 Object channel = redisTemplate.getStringSerializer().deserialize(channelByte); // 渠道名称转换 String patternStr = new String(pattern); System.out.println(patternStr); System.out.println("---频道---: " + channel); System.out.println("---消息内容---: " + msg); } }
package com.lp.redis; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.junit4.SpringRunner; /** * @author liu pei * @date 2023年12月13日 下午7:07 * @Description: */ @ExtendWith(SpringExtension.class) @SpringBootTest public class TestDemo { @Autowired private RedisTemplate redisTemplate; @Test public void publish() { // 使用convertAndSend方法向频道redisChat发布消息 redisTemplate.convertAndSend("redis.life", "aaa"); redisTemplate.convertAndSend("redis.news", "bbb"); redisTemplate.convertAndSend("test.news", "ccc"); } }
5.模糊匹配多个主题
package com.lp.redis.redis; import org.springframework.stereotype.Component; /** * @author liu pei * @date 2023年12月13日 下午7:29 * @Description: */ @Component public class MessageReceiver { public void receiveMessage(String message,String channel){ System.out.println("MessageReceiver ---频道---: " + channel); System.out.println("MessageReceiver ---消息内容---: " + message); } }
package com.lp.redis; import com.lp.redis.redis.MessageReceiver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * @author liu pei * @date 2023年12月13日 下午7:05 * @Description: */ @Configuration public class RedisConfig { /** * 订阅所有 redis.开头多个频道 */ @Bean public PatternTopic patternTopic() { return new PatternTopic("redis.*"); } @Bean public MessageListenerAdapter listenerAdapter(MessageReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory factory, MessageListenerAdapter adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); // 一次订阅多个匹配的频道 container.addMessageListener(adapter, patternTopic()); return container; } }
package com.lp.redis; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.junit4.SpringRunner; /** * @author liu pei * @date 2023年12月13日 下午7:07 * @Description: */ @ExtendWith(SpringExtension.class) @SpringBootTest public class TestDemo { @Autowired private RedisTemplate redisTemplate; @Test public void publish() { // 使用convertAndSend方法向频道redisChat发布消息 redisTemplate.convertAndSend("redis.life", "aaa"); redisTemplate.convertAndSend("redis.news", "bbb"); redisTemplate.convertAndSend("test.news", "ccc"); } }