18)Redis 的发布订阅模型

简介: 18)Redis 的发布订阅模型

楔子



Redis 虽然是一个缓存,但它也可以用作消息队列。比如我们前面介绍 List 类型的时候说过,基于 LPUSH 和 BRPOP 可以实现一个简易版的消息队列,但它有两个缺点:

  • 不支持多个消费者:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据;
  • 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了;

而 Redis 提供的发布订阅模型,正好可以解决第一个问题:重复消费,即多个消费者消费同一批数据的场景。下面来看一下。


普通订阅与发布



消息队列有两个重要的角色,一个是发布者(或者说生产者),另一个就是订阅者(或者说消费者),对应的命令如下:

  • 发布消息:publish channel "message";
  • 订阅消息:subscribe channel;

除了生产者和消费者,还有一个重要的概念:channel(频道),可以理解为消息队列的名称。首先消费者先要订阅某个 channel,然后当生产者把消息发送到 channel 当中时,消费者就可以正常接收到消息了,如下图所示:

下面我们来看具体的命令实现。

订阅消息

# 可以同时订阅多个频道
127.0.0.1:6379> subscribe channel1 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2

注意:当我们订阅某个频道的时候,就阻塞在这里了。

就类似于微信公众号一样,你关注了某个公众号,那么当公众号上面发表文章的时候,你就可以收到。此时操作公众号的人就是消息发布者,你就是消息订阅者,公众号就是消息队列,往公众号上面发表的文章就是消息。

此外一个消费者可以同时订阅多个 channel,正如一个微信用户可以关注多个公众号;一个 channel 也可以被多个消费者订阅,正如一个公众号可以被多个微信用户关注。

发送消息

我们上面的订阅者在订阅之后,就处于阻塞状态,因此我们需要再开一个终端。

127.0.0.1:6379> publish channel1 "satori"
(integer) 1
127.0.0.1:6379> publish channel2 "koishi"
(integer) 1

返回值表示成功发给了几个订阅者,所以这里的 1 就表示成功发给了一个订阅者,如果有两个订阅者,那么返回值就是 2。因此返回值可以是 0~n,由订阅者的数量决定。

然后我们来看看订阅者:

127.0.0.1:6379> subscribe channel1 channel2
...
1) "message"
2) "channel1" # 从 channel1 收到消息
3) "satori"
1) "message"
2) "channel2" # 从 channel2 收到消息
3) "koishi"

成功收到消息,所以每个消费者可以同时订阅多个 channel,并且每个 channel 也可以被多个消费者订阅。

因此使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者消费同一批数据的业务需求。除此之外,Pub/Sub 还提供了「主题订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。


主题订阅



主题订阅说白了,和模糊匹配是类似的。假设我们需要订阅好几个队列,但它们都是以 log 开头的,那么我们就可以通过 psubscribe log* 来自动订阅所有以 log 开头的队列。

比如上面的 channel1、channel2,我们就可以通过 psubscribe channel* 实现,至于消息发布者则不需要变。

当然主题订阅也可以是多个,比如:psubscribe log* db*,订阅所有以 log 开头、db 开头的消息队列。


取消订阅



既然有订阅,那么就有取消订阅,就类似于取关(o(╥﹏╥)o)。

使用 unsubscribe channel1 channel2 可以取消订阅多个channel,如果是主题订阅,那么也可以通过 punsubscribe ch* 取消订阅指定的主题。比较简单,不再赘述。


Python 操作 Redis 的发布订阅



订阅者(生产者):

# 订阅者
import redis
client = redis.Redis(host="...",
                     decode_responses="utf-8")
# 调用 pubsub 方法返回一个订阅者
sub = client.pubsub()
# 订阅两个队列
sub.subscribe("ch1", "ch2")
# 监听,此时处于阻塞状态
for item in sub.listen():
    # 一旦发布者发布消息,这里就可以接收到
    # item["channel"]是队列,item["data"]是内容
    print(item["channel"], item["data"])

发布者(消费者):

# 订阅者
import redis
client = redis.Redis(host="...",
                     decode_responses="utf-8")
# 发布者很简单,直接发布消息接口
client.publish("ch1", "7ki7ki棒棒1")
client.publish("ch1", "7ki7ki棒棒2")
client.publish("ch2", "7ki7ki棒棒3")

当执行发布者的时候,会发现订阅者多了几条输出,至于内容显然是发布者发布的内容。

另外 Python 操作订阅者还有几种方式。

# 订阅者
import redis
client = redis.Redis(host="...",
                     decode_responses="utf-8")
sub = client.pubsub()
sub.subscribe("ch1", "ch2")
while True:
    # 这种方式会瞬间返回,如果有消息得到消息
    # 没有消息会返回None
    item = sub.get_message()
    if item:
        print(item["channel"], item["data"])

显然该方法有一个缺陷,就是会造成 CPU 空转,因此我们不推荐使用该方法。

也可以开启一个新的线程去监听。

# 订阅者
import redis
client = redis.Redis(host="...",
                     decode_responses="utf-8")
sub = client.pubsub()
sub.subscribe("ch1", "ch2")
def handler(item):
    print(item["channel"], item["data"])
# 给每一个频道注册一个处理函数
# 当频道有消息时,会自动将消息传递给处理函数
sub.channels.update({"ch1": handler, "ch2": handler})
# 开启一个线程运行,会返回新开启的线程对象
# 注意:因为是单独开了一个线程,所以这里不会阻塞
th = sub.run_in_thread()
print("xxx")
print("yyy")
print("zzz")
# 先启动订阅者,再启动发布者,程序输出如下
"""
xxx
yyy
zzz
ch1 7ki7ki棒棒1
ch1 7ki7ki棒棒2
ch2 7ki7ki棒棒3
"""
# 注意:这里程序依旧会卡住,因为开启的线程是非守护线程
# 所以即便主线程执行完毕,也依旧会等待子线程
# 如果不想主线程等待,解决的办法有两种:
# 一种是在run_in_thread中加上一个参数daemon=True
# 将其设置为守护线程,这样主线程就不会等待了
# 另一种是手动停止,因为sub.run_in_thread会返回新开启的线程
# 调用其stop方法即可让它停止,通过这种方式,可以在任意时刻停止监听
# th.stop()

还有主题订阅,发布者代码依旧不用变,只需要将订阅者的 sub.subscribe 换成 sub.psubscribe 即可。

# 订阅者
sub = client.pubsub()
# 订阅以 ch 开头、log 开头的队列
sub.psubscribe("ch*", "log*")

如果是开启新的线程的话:

# 订阅者
import redis
client = redis.Redis(host="...",
                     decode_responses="utf-8")
sub = client.pubsub()
sub.psubscribe("ch*", "log*")
def handler(item):
    print(item["channel"], item["data"])
# 对于开启新的线程去监听
# 要将之前的 sub.channels 换成 sub.patterns
sub.patterns.update({"ch*": handler, 
                     "log*": handler})
sub.run_in_thread()

以上就是 Python 操作 Redis 发布订阅相关的内容。


发布订阅作为消息队列有什么缺点?



Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。但除了这一个优点之外,剩下的都是缺点了。

1)发布订阅模式是"发后既忘"的工作模式,如果订阅者离线,那么重连之后不能消费之前的历史消息。

因为 Pub/Sub 的实现原理非常简单,它没有基于任何数据类型,也没有做任何的数据存储,只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

在 Redis 中,一个完整的发布、订阅消息处理流程是这样的:

  • 1)消费者订阅指定队列,Redis 就会记录一个映射关系:队列 -> 消费者
  • 2)生产者向这个队列发布消息,然后 Redis 就会从映射关系中找出对应的消费者,把消息转发给它;

整个过程没有任何的数据存储,一切都是实时转发的。我们上面在发完消息后返回的 1 就是订阅当前队列的消费者个数,或者说 Redis 将消息转发给了多少个消费者。

因此,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息是接收不到的。如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会直接「丢弃」。

所以在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

2)无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失

因为 Pub/Sub 没有基于任何数据类型实现,所以它不具备「数据持久化」的能力,也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

3)积压的消息不能太多,否则也会丢数据

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。如果采用 List 作为队列,消息积压时会导致底层的链表很长,最直接的影响就是 Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失。至于原因,还是和 Pub/Sub 的实现原理有关。

每个消费者订阅一个队列时,Redis 都会在 Server 端为该消费者分配一个「缓冲区」,这个缓冲区其实就是一块内存。当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中,之后消费者不断地从缓冲区读取消息,处理消息。

但问题就出在这个缓冲区上,因为这个缓冲区的大小是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。如果超过了缓冲区配置的上限,Redis 就会「强制」把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。

Redis 的配置文件里有这么这一个配置:client-output-buffer-limit pubsub 32mb 8mb 60,它的参数含义如下:

  • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线;
  • 8mb 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线;

所以 Pub/Sub 作为消息队列和 List 之间的差异比较大的,并且从推拉模型不难看出,List 是属于「拉」模型,而 Pub/Sub 属于「推」模型。List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

但当消费者跟不上生产者的速度时,就会导致缓冲区的内存开始膨胀,而 Redis 为了控制缓冲区的上限,就会把消费者踢下线。

总结一下 Pub/Sub 模型的优缺点:

除了第一个是优点之外,剩下的都是缺点,所以很多人都觉得 Pub/Sub 很鸡肋。也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。

最后还有一点没有说,就是 Pub/Sub 和 List 一样,数据一旦取走,就会从 Redis 当中删除,无法重复消费。而一个成熟的消息队列,应该具备如下功能:

  • 支持阻塞等待拉取消息;
  • 支持发布 / 订阅模式;
  • 消费者下线之后重新上线,仍能消费下线期间生产者发送的消息;
  • 消费者消费失败,可重新消费,也就是支持消息被同一个消费者消费多次;
  • 实例宕机,消息不丢失,数据可持久化;
  • 即使消息大量堆积,也不会丢数据;

显然 Pub/Sub 只能满足前两个要求。

于是 Redis 在 5.0 的时候引入 Stream 类型,将 Pub/Sub 的问题全部解决,而 Stream 相关的内容后续详细介绍。


本文参考自:

  • 水滴与银弹:《把Redis当作队列来用,真的合适吗?》
相关文章
|
2月前
|
NoSQL Redis
Redis 发布订阅
10月更文挑战第18天
40 1
Redis 发布订阅
|
5月前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
3月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
103 20
剖析 Redis List 消息队列的三种消费线程模型
|
6月前
|
NoSQL Java Redis
Redis系列学习文章分享---第十八篇(Redis原理篇--网络模型,通讯协议,内存回收)
Redis系列学习文章分享---第十八篇(Redis原理篇--网络模型,通讯协议,内存回收)
89 0
|
6月前
|
存储 消息中间件 缓存
Redis系列学习文章分享---第十七篇(Redis原理篇--数据结构,网络模型)
Redis系列学习文章分享---第十七篇(Redis原理篇--数据结构,网络模型)
110 0
|
2月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
60 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
4月前
|
存储 NoSQL Redis
Redis存储原理与数据模型
Redis存储原理与数据模型
|
7月前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
129 0
|
4月前
|
存储 缓存 NoSQL
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
|
5月前
|
存储 消息中间件 NoSQL
中间件键值存储模型Redis
【7月更文挑战第11天】
57 3