场景介绍
云数据库 Redis 版也提供了与 Redis 相同的消息发布(pub)与订阅(sub)功能。即一个 client 发布消息,其他多个 client 订阅消息。
需要注意的是,云数据库 Redis 版发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外,消息发布者(即 publish 客户端)无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如 List 操作等)。但是,消息订阅者(即 subscribe 客户端)需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
代码示例
消息发布者 (即 publish client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
private Jedis jedis;//
public KVStorePubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//KVStore的实例密码
String authString = jedis.auth(password);//password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message){
System.out.println(" >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");
//消息发布者结束发送,即发送一个“quit”消息;
jedis.publish(channel, "quit");
}
}
消息订阅者 (即 subscribe client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//ApsaraDB for Redis的实例密码
String authString = jedis.auth(password);//password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> 订阅(SUBSCRIBE) > Channel:"+channel);
System.out.println();
//接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
System.out.println();
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
System.out.println();
System.out.println("----------订阅消息SUBSCRIBE 开始-------");
subscribe();
System.out.println("----------订阅消息SUBSCRIBE 结束-------");
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
}
消息监听者
package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
System.out.println();
//当接收到的message为quit时,取消订阅(被动方式)
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}
示例主程序
package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
//ApsaraDB for Redis的连接信息,从控制台可以获得
static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password="password";//password
public static void main(String[] args) throws Exception{
KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
final String channel = "KVStore频道-A";
//消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收
pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)");
//消息接收者
KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
JedisPubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener, channel);
//消息接收者开始订阅
subClient.start();
//消息发送者继续发消息
for (int i = 0; i < 5; i++) {
String message=UUID.randomUUID().toString();
pubClient.pub(channel, message);
Thread.sleep(1000);
}
//消息接收者主动取消订阅
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)");
//消息发布者结束发送,即发送一个“quit”消息;
//此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
pubClient.close(channel);
}
}
运行结果
在输入了正确的云数据库 Redis 版实例访问地址和密码之后,运行以上 Java 程序,输出结果如下。
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
>>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A
----------订阅消息SUBSCRIBE 结束-------
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)
>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
以上示例中仅演示了一个发布者与一个订阅者的情况,实际上发布者与订阅者都可以为多个,发送消息的频道(channel)也可以是多个,对以上代码稍作修改即可。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。