最通俗易懂的Redis发布订阅及代码实战

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

最通俗易懂的Redis发布订阅及代码实战

发布订阅简介
除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:

Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。

发布订阅相关命令
在Redis中,发布订阅相关命令有:

发布消息
订阅频道
取消订阅
按照模式订阅
按照模式取消订阅
查询订阅信息
发布消息
发布消息的命令是publish,语法是:

publish 频道名称 消息
比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:

publish channel:one-more-study:demo "I am One More Study."
(integer) 0

返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。

订阅消息
订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:

subscribe 频道名称 [频道名称 ...]
比如,订阅一个channel:one-more-study:demo频道,命令如下:

subscribe channel:one-more-study:demo

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:

1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."
同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。

新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。

取消订阅
取消订阅的命令是unsubscribe,可以取消一个或者多个频道的订阅,语法是:

unsubscribe [频道名称 [频道名称 ...]]
比如,取消订阅channel:one-more-study:demo频道,命令如下:

unsubscribe channel:one-more-study:demo

1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。

按模式订阅消息
按模式订阅消息的命令是psubscribe,订阅一个或多个符合给定模式的频道,语法是:

psubscribe 模式 [模式 ...]
每个模式以 作为匹配符,比如 channel 匹配所有以 channel 开头的频道,命令如下:

psubscribe channel:*

Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:

1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."
返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。

按模式取消订阅
按模式取消订阅的命令是punsubscribe,可以取消一个或者多个模式的订阅,语法是:

punsubscribe [模式 [模式 ...]]
每个模式以 作为匹配符,比如 channel: 匹配所有以 channel 开头的频道,命令如下:

1> punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。

查询订阅信息
查看活跃频道
活跃频道指的是至少有一个订阅者的频道,语法是:

pubsub channels [模式]
比如:

pubsub channels

1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"

pubsub channels *demo

1) "channel:one-more-study:demo"
2) "channel:demo"

pubsub channels one-more-study

1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看频道订阅数
pubsub numsub [频道名称 ...]
比如:

pubsub numsub channel:one-more-study:demo

1) "channel:one-more-study:demo"
2) (integer) 1
查看模式订阅数

pubsub numpat

(integer) 1
代码实战
光说不练假把式,我们使用Java语言写一个简单的发布订阅示例。

Jedis集群示例
Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。

package onemore.study;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

/**

  • Jedis集群
    *
  • @author 万猫学社
    */

public enum Cluster {

INSTANCE;

//为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。
private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
private JedisCluster jedisCluster;

Cluster() {
    JedisPoolConfig poolConfig = new JedisPoolConfig();

    //最大连接数
    poolConfig.setMaxTotal(20);
    //最大空闲数
    poolConfig.setMaxIdle(10);
    //最小空闲数
    poolConfig.setMinIdle(2);

    //从jedis连接池获取连接时,校验并返回可用的连接
    poolConfig.setTestOnBorrow(true);
    //把连接放回jedis连接池时,校验并返回可用的连接
    poolConfig.setTestOnReturn(true);

    Set<HostAndPort> nodes = new HashSet<>();
    String[] hosts = hostAndPorts.split(";");
    for (String hostport : hosts) {
        String[] ipport = hostport.split(":");
        String ip = ipport[0];
        int port = Integer.parseInt(ipport[1]);
        nodes.add(new HostAndPort(ip, port));
    }
    jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
}

public JedisCluster getJedisCluster() {
    return jedisCluster;
}

}
发布者示例
package onemore.study;

import redis.clients.jedis.JedisCluster;

/**

  • 发布者
    *
  • @author 万猫学社
    */

public class Publisher implements Runnable {

private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";

@Override
public void run() {
    JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
    for (int i = 1; i <= 3; i++) {
        String message = "第" + i + "消息";
        System.out.println(Thread.currentThread().getName() + " 发布:" + message);
        jedisCluster.publish(CHANNEL_NAME, message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------------------");
    }
    jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
}

}
订阅者示例
package onemore.study;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

/**

  • 订阅者
    *
  • @author 万猫学社
    */

public class Subscriber implements Runnable {

private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";

private final JedisPubSub jedisPubSub = new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(Thread.currentThread().getName() + " 接收:" + message);
        if (QUIT_COMMAND.equals(message)) {
            unsubscribe(CHANNEL_NAME);
        }
    }
};

@Override
public void run() {
    JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
    jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
}

}
综合示例
package onemore.study;

public class App {

public static void main(String[] args) throws InterruptedException {
    //创建3个订阅者
    new Thread(new Subscriber()).start();
    new Thread(new Subscriber()).start();
    new Thread(new Subscriber()).start();
    Thread.sleep(1000);

    //创建发布者
    new Thread(new Publisher()).start();
}

}
运行结果如下:

Thread-6 发布:第1消息
Thread-0 接收:第1消息
Thread-1 接收:第1消息

Thread-2 接收:第1消息

Thread-6 发布:第2消息
Thread-0 接收:第2消息
Thread-1 接收:第2消息

Thread-2 接收:第2消息

Thread-6 发布:第3消息
Thread-0 接收:第3消息
Thread-2 接收:第3消息

Thread-1 接收:第3消息

Thread-0 接收:quit
Thread-1 接收:quit
Thread-2 接收:quit
作者:万猫学社
出处:http://www.cnblogs.com/heihaozi/

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
29天前
|
NoSQL Redis
Redis 发布订阅
10月更文挑战第18天
29 1
Redis 发布订阅
|
26天前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
56 5
|
1月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
75 2
|
1月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
2月前
|
缓存 NoSQL 应用服务中间件
Redis实战篇
Redis实战篇
|
1月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
37 0
|
2月前
|
缓存 NoSQL PHP
使用PHP-redis实现键空间通知监听key失效事件的技术与代码示例
通过上述方法,你可以有效地在PHP中使用Redis来监听键空间通知,特别是针对键失效事件。这可以帮助你更好地管理缓存策略,及时响应键的变化。
96 3
|
3月前
|
NoSQL 安全 Java
Redis6入门到实战------ 三、常用五大数据类型(字符串 String)
这篇文章深入探讨了Redis中的String数据类型,包括键操作的命令、String类型的命令使用,以及String在Redis中的内部数据结构实现。
Redis6入门到实战------ 三、常用五大数据类型(字符串 String)
|
2月前
|
消息中间件 存储 NoSQL
18)Redis 的发布订阅模型
18)Redis 的发布订阅模型
33 0
|
3月前
|
运维 监控 NoSQL
【Redis】哨兵(Sentinel)原理与实战全解~炒鸡简单啊
Redis 的哨兵模式(Sentinel)是一种用于实现高可用性的机制。它通过监控主节点和从节点,并在主节点故障时自动进行切换,确保集群持续提供服务。哨兵模式包括主节点、从节点和哨兵实例,具备监控、通知、自动故障转移等功能,能显著提高系统的稳定性和可靠性。本文详细介绍了哨兵模式的组成、功能、工作机制以及其优势和局限性,并提供了单实例的安装和配置步骤,包括系统优化、安装、配置、启停管理和性能监控等。此外,还介绍了如何配置主从复制和哨兵,确保在故障时能够自动切换并恢复服务。