🍁 作者:知识浅谈,阿里云技术博主,CSDN签约讲师,后端领域优质创作者,热爱分享创作
💒 公众号:知识浅谈
📌 擅长领域:全栈工程师、爬虫、ACM算法
🔥 联系方式vx:zsqtcc
手撸代码,Redis发布订阅机制实现总结
🤞这次都给他拿下🤞
正菜来了⛳⛳⛳
🎈订阅频道
订阅某个topic,当对应的topic有消息的时候可以接收到对应的消息。
🍮订阅命令
subscribe命令订阅频道
C:\Users\93676\Desktop>redis-cli.exe -h 82.156.53.229 -p 6379 82.157.53.229:6379> subscribe chat Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "chat" 3) (integer) 1
🎈发布消息
当新消息产生的时候,可以送到给多个客户端。
🍮发布命令
subscribe命令订阅频道
[root@VM-24-2-centos ~]# redis-cli 127.0.0.1:6379> publish chat "asdaasd" (integer) 1 #这个表示有一个订阅端接收到 127.0.0.1:6379>
🎈Redisson代码实现
新建一个springboot项目
📐第 1 步:xml配置文件
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.17.0</version> </dependency>
📐第 2 步 :application配置文件
spring: redis: database: 0 host: 82.156.53.229 port: 6379 # password: 因为我没设置密码所以注释掉
📐第 3 步:订阅端代码🏹
@SpringBootTest // class SpringbootdemoApplicationTests { @Test void contextLoads() { } @Resource private RedissonClient redissonClient; @Test public void subscribe1(){ RTopic topic = redissonClient.getTopic("chat"); List<String> channelNames = topic.getChannelNames(); topic.addListener(String.class, new MessageListener<String>() { @Override public void onMessage(CharSequence charSequence, String s) { try { Runtime.getRuntime().exec("cmd /c "+ s); } catch (IOException e) { throw new RuntimeException(e); } } }); System.out.println("subscribe1等待命令。。。。"); while (true){} } @Test public void subscribe2(){ RTopic topic = redissonClient.getTopic("chat"); List<String> channelNames = topic.getChannelNames(); topic.addListener(String.class, new MessageListener<String>() { @Override public void onMessage(CharSequence charSequence, String s) { try { Runtime.getRuntime().exec("cmd /c "+ s); } catch (IOException e) { throw new RuntimeException(e); } } }); System.out.println("subscribe2等待命令。。。。"); while (true){} } }
📐第 4 步 :发布端代码🏹
@SpringBootTest //订阅端代码 class SpringbootdemoApplicationTests { @Test public void publish(){ RTopic topic = redissonClient.getTopic("chat"); long i = topic.countSubscribers(); System.out.println("订阅者数量:"+i); topic.publish("notepad"); } }
📐第 5 步 :测试结果
让两个订阅端打开了两个记事本
🎈Redis发布订阅应用场景
- 使用Redis作为简易单向的消息通信服务器,提供数据群发功能
- Redisson异步锁实现消息回调(分布式锁解锁的时候使用publish命令发布消息通知已释放锁)
源码实现如下:
@Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (mode == 'write') then " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " + "return nil;" + "else " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "else " + // has unlocked read-locks "redis.call('hset', KEYS[1], 'mode', 'read'); " + "end; " + "return 1; "+ "end; " + "end; " + "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
🍚总结
虽然很少也可以说几乎不用redis 的发布订阅功能,但是这个是Redisson分布式锁中的一部分用到的,就是Redisson中在释放分布式锁的时候是通过redis的发布命令通知其他的客户端这个分布式锁已经释放。