楔子
Redis 虽然是一个缓存,但它也可以用作消息队列。比如我们前面介绍 List 类型的时候说过,基于 LPUSH 和 BRPOP 可以实现一个简易版的消息队列,但它有两个缺点:
- 不支持多个消费者:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据;
- 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了;
而 Redis 提供的发布订阅模型,正好可以解决第一个问题:重复消费,即多个消费者消费同一批数据的场景。下面来看一下。
普通订阅与发布
消息队列有两个重要的角色,一个是发布者(或者说生产者),另一个就是订阅者(或者说消费者),对应的命令如下:
- 发布消息:publish channel "message";
- 订阅消息:subscribe channel;
除了生产者和消费者,还有一个重要的概念:channel(频道),可以理解为消息队列的名称。首先消费者先要订阅某个 channel,然后当生产者把消息发送到 channel 当中时,消费者就可以正常接收到消息了,如下图所示:
下面我们来看具体的命令实现。
订阅消息
# 可以同时订阅多个频道 127.0.0.1:6379> subscribe channel1 channel2 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel1" 3) (integer) 1 1) "subscribe" 2) "channel2" 3) (integer) 2
注意:当我们订阅某个频道的时候,就阻塞在这里了。
就类似于微信公众号一样,你关注了某个公众号,那么当公众号上面发表文章的时候,你就可以收到。此时操作公众号的人就是消息发布者,你就是消息订阅者,公众号就是消息队列,往公众号上面发表的文章就是消息。
此外一个消费者可以同时订阅多个 channel,正如一个微信用户可以关注多个公众号;一个 channel 也可以被多个消费者订阅,正如一个公众号可以被多个微信用户关注。
发送消息
我们上面的订阅者在订阅之后,就处于阻塞状态,因此我们需要再开一个终端。
127.0.0.1:6379> publish channel1 "satori" (integer) 1 127.0.0.1:6379> publish channel2 "koishi" (integer) 1
返回值表示成功发给了几个订阅者,所以这里的 1 就表示成功发给了一个订阅者,如果有两个订阅者,那么返回值就是 2。因此返回值可以是 0~n,由订阅者的数量决定。
然后我们来看看订阅者:
127.0.0.1:6379> subscribe channel1 channel2 ... 1) "message" 2) "channel1" # 从 channel1 收到消息 3) "satori" 1) "message" 2) "channel2" # 从 channel2 收到消息 3) "koishi"
成功收到消息,所以每个消费者可以同时订阅多个 channel,并且每个 channel 也可以被多个消费者订阅。
因此使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者消费同一批数据的业务需求。除此之外,Pub/Sub 还提供了「主题订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。
主题订阅
主题订阅说白了,和模糊匹配是类似的。假设我们需要订阅好几个队列,但它们都是以 log 开头的,那么我们就可以通过 psubscribe log* 来自动订阅所有以 log 开头的队列。
比如上面的 channel1、channel2,我们就可以通过 psubscribe channel* 实现,至于消息发布者则不需要变。
当然主题订阅也可以是多个,比如:psubscribe log* db*,订阅所有以 log 开头、db 开头的消息队列。
取消订阅
既然有订阅,那么就有取消订阅,就类似于取关(o(╥﹏╥)o)。
使用 unsubscribe channel1 channel2 可以取消订阅多个channel,如果是主题订阅,那么也可以通过 punsubscribe ch* 取消订阅指定的主题。比较简单,不再赘述。
Python 操作 Redis 的发布订阅
订阅者(生产者):
# 订阅者 import redis client = redis.Redis(host="...", decode_responses="utf-8") # 调用 pubsub 方法返回一个订阅者 sub = client.pubsub() # 订阅两个队列 sub.subscribe("ch1", "ch2") # 监听,此时处于阻塞状态 for item in sub.listen(): # 一旦发布者发布消息,这里就可以接收到 # item["channel"]是队列,item["data"]是内容 print(item["channel"], item["data"])
发布者(消费者):
# 订阅者 import redis client = redis.Redis(host="...", decode_responses="utf-8") # 发布者很简单,直接发布消息接口 client.publish("ch1", "7ki7ki棒棒1") client.publish("ch1", "7ki7ki棒棒2") client.publish("ch2", "7ki7ki棒棒3")
当执行发布者的时候,会发现订阅者多了几条输出,至于内容显然是发布者发布的内容。
另外 Python 操作订阅者还有几种方式。
# 订阅者 import redis client = redis.Redis(host="...", decode_responses="utf-8") sub = client.pubsub() sub.subscribe("ch1", "ch2") while True: # 这种方式会瞬间返回,如果有消息得到消息 # 没有消息会返回None item = sub.get_message() if item: print(item["channel"], item["data"])
显然该方法有一个缺陷,就是会造成 CPU 空转,因此我们不推荐使用该方法。
也可以开启一个新的线程去监听。
# 订阅者 import redis client = redis.Redis(host="...", decode_responses="utf-8") sub = client.pubsub() sub.subscribe("ch1", "ch2") def handler(item): print(item["channel"], item["data"]) # 给每一个频道注册一个处理函数 # 当频道有消息时,会自动将消息传递给处理函数 sub.channels.update({"ch1": handler, "ch2": handler}) # 开启一个线程运行,会返回新开启的线程对象 # 注意:因为是单独开了一个线程,所以这里不会阻塞 th = sub.run_in_thread() print("xxx") print("yyy") print("zzz") # 先启动订阅者,再启动发布者,程序输出如下 """ xxx yyy zzz ch1 7ki7ki棒棒1 ch1 7ki7ki棒棒2 ch2 7ki7ki棒棒3 """ # 注意:这里程序依旧会卡住,因为开启的线程是非守护线程 # 所以即便主线程执行完毕,也依旧会等待子线程 # 如果不想主线程等待,解决的办法有两种: # 一种是在run_in_thread中加上一个参数daemon=True # 将其设置为守护线程,这样主线程就不会等待了 # 另一种是手动停止,因为sub.run_in_thread会返回新开启的线程 # 调用其stop方法即可让它停止,通过这种方式,可以在任意时刻停止监听 # th.stop()
还有主题订阅,发布者代码依旧不用变,只需要将订阅者的 sub.subscribe 换成 sub.psubscribe 即可。
# 订阅者 sub = client.pubsub() # 订阅以 ch 开头、log 开头的队列 sub.psubscribe("ch*", "log*")
如果是开启新的线程的话:
# 订阅者 import redis client = redis.Redis(host="...", decode_responses="utf-8") sub = client.pubsub() sub.psubscribe("ch*", "log*") def handler(item): print(item["channel"], item["data"]) # 对于开启新的线程去监听 # 要将之前的 sub.channels 换成 sub.patterns sub.patterns.update({"ch*": handler, "log*": handler}) sub.run_in_thread()
以上就是 Python 操作 Redis 发布订阅相关的内容。
发布订阅作为消息队列有什么缺点?
Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。但除了这一个优点之外,剩下的都是缺点了。
1)发布订阅模式是"发后既忘"的工作模式,如果订阅者离线,那么重连之后不能消费之前的历史消息。
因为 Pub/Sub 的实现原理非常简单,它没有基于任何数据类型,也没有做任何的数据存储,只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。
在 Redis 中,一个完整的发布、订阅消息处理流程是这样的:
- 1)消费者订阅指定队列,Redis 就会记录一个映射关系:队列 -> 消费者;
- 2)生产者向这个队列发布消息,然后 Redis 就会从映射关系中找出对应的消费者,把消息转发给它;
整个过程没有任何的数据存储,一切都是实时转发的。我们上面在发完消息后返回的 1 就是订阅当前队列的消费者个数,或者说 Redis 将消息转发给了多少个消费者。
因此,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息是接收不到的。如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会直接「丢弃」。
所以在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
2)无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失
因为 Pub/Sub 没有基于任何数据类型实现,所以它不具备「数据持久化」的能力,也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。
3)积压的消息不能太多,否则也会丢数据
当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。如果采用 List 作为队列,消息积压时会导致底层的链表很长,最直接的影响就是 Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失。至于原因,还是和 Pub/Sub 的实现原理有关。
每个消费者订阅一个队列时,Redis 都会在 Server 端为该消费者分配一个「缓冲区」,这个缓冲区其实就是一块内存。当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中,之后消费者不断地从缓冲区读取消息,处理消息。
但问题就出在这个缓冲区上,因为这个缓冲区的大小是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。如果超过了缓冲区配置的上限,Redis 就会「强制」把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。
Redis 的配置文件里有这么这一个配置:client-output-buffer-limit pubsub 32mb 8mb 60,它的参数含义如下:
- 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线;
- 8mb 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线;
所以 Pub/Sub 作为消息队列和 List 之间的差异比较大的,并且从推拉模型不难看出,List 是属于「拉」模型,而 Pub/Sub 属于「推」模型。List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。
但当消费者跟不上生产者的速度时,就会导致缓冲区的内存开始膨胀,而 Redis 为了控制缓冲区的上限,就会把消费者踢下线。
总结一下 Pub/Sub 模型的优缺点:
除了第一个是优点之外,剩下的都是缺点,所以很多人都觉得 Pub/Sub 很鸡肋。也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。
目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。
最后还有一点没有说,就是 Pub/Sub 和 List 一样,数据一旦取走,就会从 Redis 当中删除,无法重复消费。而一个成熟的消息队列,应该具备如下功能:
- 支持阻塞等待拉取消息;
- 支持发布 / 订阅模式;
- 消费者下线之后重新上线,仍能消费下线期间生产者发送的消息;
- 消费者消费失败,可重新消费,也就是支持消息被同一个消费者消费多次;
- 实例宕机,消息不丢失,数据可持久化;
- 即使消息大量堆积,也不会丢数据;
显然 Pub/Sub 只能满足前两个要求。
于是 Redis 在 5.0 的时候引入 Stream 类型,将 Pub/Sub 的问题全部解决,而 Stream 相关的内容后续详细介绍。
本文参考自:
- 水滴与银弹:《把Redis当作队列来用,真的合适吗?》