最通俗易懂的Redis发布订阅及代码实战-阿里云开发者社区

开发者社区> 数据库> 正文

最通俗易懂的Redis发布订阅及代码实战

简介:

最通俗易懂的Redis发布订阅及代码实战

发布订阅简介
除了使用List实现简单的消息队列功能以外,Redis还提供了发布订阅的消息机制。在这种机制下,消息发布者向指定频道(channel)发布消息,消息订阅者可以收到指定频道的消息,同一个频道可以有多个消息订阅者,如下图:

Redis也提供了一些命令支持这个机制,接下来我们详细介绍一下这些命令。

发布订阅相关命令
在Redis中,发布订阅相关命令有:

发布消息
订阅频道
取消订阅
按照模式订阅
按照模式取消订阅
查询订阅信息
发布消息
发布消息的命令是publish,语法是:

publish 频道名称 消息
比如,要向channel:one-more-study:demo频道发布一条消息“I am One More Study.”,命令如下:

publish channel:one-more-study:demo "I am One More Study."
(integer) 0

返回的结果是订阅者的个数,上例中没有订阅者,所以返回结果为0。

订阅消息
订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:

subscribe 频道名称 [频道名称 ...]
比如,订阅一个channel:one-more-study:demo频道,命令如下:

subscribe channel:one-more-study:demo

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(订阅成功)、订阅的频道名称、目前已订阅的频道数量。当订阅者接受到消息时,就会显示:

1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."
同样也是3条结果,分别表示:返回值的类型(信息)、消息来源的频道名称、消息内容。

新开启的订阅者,是无法收到该频道之前的历史消息的,因为Redis没有对发布的消息做持久化。

取消订阅
取消订阅的命令是unsubscribe,可以取消一个或者多个频道的订阅,语法是:

unsubscribe [频道名称 [频道名称 ...]]
比如,取消订阅channel:one-more-study:demo频道,命令如下:

unsubscribe channel:one-more-study:demo

1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(取消订阅成功)、取消订阅的频道名称、目前已订阅的频道数量。

按模式订阅消息
按模式订阅消息的命令是psubscribe,订阅一个或多个符合给定模式的频道,语法是:

psubscribe 模式 [模式 ...]
每个模式以 作为匹配符,比如 channel 匹配所有以 channel 开头的频道,命令如下:

psubscribe channel:*

Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
返回结果中有3条,分别表示:返回值的类型(按模式订阅成功)、订阅的模式、目前已订阅的模式数量。当订阅者接受到消息时,就会显示:

1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."
返回结果中有4条,分别表示:返回值的类型(信息)、消息匹配的模式、消息来源的频道名称、消息内容。

按模式取消订阅
按模式取消订阅的命令是punsubscribe,可以取消一个或者多个模式的订阅,语法是:

punsubscribe [模式 [模式 ...]]
每个模式以 作为匹配符,比如 channel: 匹配所有以 channel 开头的频道,命令如下:

1> punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0
返回结果中有3条,分别表示:返回值的类型(按模式取消订阅成功)、取消订阅的模式、目前已订阅的模式数量。

查询订阅信息
查看活跃频道
活跃频道指的是至少有一个订阅者的频道,语法是:

pubsub channels [模式]
比如:

pubsub channels

1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"

pubsub channels *demo

1) "channel:one-more-study:demo"
2) "channel:demo"

pubsub channels one-more-study

1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看频道订阅数
pubsub numsub [频道名称 ...]
比如:

pubsub numsub channel:one-more-study:demo

1) "channel:one-more-study:demo"
2) (integer) 1
查看模式订阅数

pubsub numpat

(integer) 1
代码实战
光说不练假把式,我们使用Java语言写一个简单的发布订阅示例。

Jedis集群示例
Jedis是Redis官方推荐的Java连接开发工具,我们使用Jedis写一个简单的集群示例。

package onemore.study;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

/**

  • Jedis集群
    *
  • @author 万猫学社
    */

public enum Cluster {

INSTANCE;

//为了简单,把IP和端口直接写在这里,实际开发中写在配置文件会更好。
private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
private JedisCluster jedisCluster;

Cluster() {
    JedisPoolConfig poolConfig = new JedisPoolConfig();

    //最大连接数
    poolConfig.setMaxTotal(20);
    //最大空闲数
    poolConfig.setMaxIdle(10);
    //最小空闲数
    poolConfig.setMinIdle(2);

    //从jedis连接池获取连接时,校验并返回可用的连接
    poolConfig.setTestOnBorrow(true);
    //把连接放回jedis连接池时,校验并返回可用的连接
    poolConfig.setTestOnReturn(true);

    Set<HostAndPort> nodes = new HashSet<>();
    String[] hosts = hostAndPorts.split(";");
    for (String hostport : hosts) {
        String[] ipport = hostport.split(":");
        String ip = ipport[0];
        int port = Integer.parseInt(ipport[1]);
        nodes.add(new HostAndPort(ip, port));
    }
    jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
}

public JedisCluster getJedisCluster() {
    return jedisCluster;
}

}
发布者示例
package onemore.study;

import redis.clients.jedis.JedisCluster;

/**

  • 发布者
    *
  • @author 万猫学社
    */

public class Publisher implements Runnable {

private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";

@Override
public void run() {
    JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
    for (int i = 1; i <= 3; i++) {
        String message = "第" + i + "消息";
        System.out.println(Thread.currentThread().getName() + " 发布:" + message);
        jedisCluster.publish(CHANNEL_NAME, message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------------------");
    }
    jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
}

}
订阅者示例
package onemore.study;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

/**

  • 订阅者
    *
  • @author 万猫学社
    */

public class Subscriber implements Runnable {

private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";

private final JedisPubSub jedisPubSub = new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(Thread.currentThread().getName() + " 接收:" + message);
        if (QUIT_COMMAND.equals(message)) {
            unsubscribe(CHANNEL_NAME);
        }
    }
};

@Override
public void run() {
    JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
    jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
}

}
综合示例
package onemore.study;

public class App {

public static void main(String[] args) throws InterruptedException {
    //创建3个订阅者
    new Thread(new Subscriber()).start();
    new Thread(new Subscriber()).start();
    new Thread(new Subscriber()).start();
    Thread.sleep(1000);

    //创建发布者
    new Thread(new Publisher()).start();
}

}
运行结果如下:

Thread-6 发布:第1消息
Thread-0 接收:第1消息
Thread-1 接收:第1消息

Thread-2 接收:第1消息

Thread-6 发布:第2消息
Thread-0 接收:第2消息
Thread-1 接收:第2消息

Thread-2 接收:第2消息

Thread-6 发布:第3消息
Thread-0 接收:第3消息
Thread-2 接收:第3消息

Thread-1 接收:第3消息

Thread-0 接收:quit
Thread-1 接收:quit
Thread-2 接收:quit
作者:万猫学社
出处:http://www.cnblogs.com/heihaozi/

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章