使用List这个数据类型,底层就是一个列表,头部/尾部操作元素的时间复杂度都是O(1)
,
生产者使用LPUSH发布消息,消费者使用RPOP获取消息
当然,使用RPUSH和LPOP也是可以的,只要保证生产者和消费者不会同时操作一个方向的元素即可
这里的问题是:如果队列中已经没有消息了,消费者在执行RPOP操作的时候,会返回NULL
而一般消费者的逻辑是不断地从队列中获取消息进行处理,如果此时队列为空,消费者依旧会频繁获取消息,造成CPU空转,不仅仅浪费CPU资源,还给Redis造成压力
for {
result, err := redisCli.RPop(context.Background(), key).Result()
if err == redis.Nil {
continue
}
//处理业务逻辑
}
解决方案:
- 当队列为空的时候,休眠一会,再尝试拉取消息
for {
result, err := redisCli.RPop(context.Background(), key).Result()
if err == redis.Nil {
time.Sleep(time.Minute)
continue
}
//处理业务逻辑
}
优点是:简单的解决了CPU空转问题,提高了CPU的使用效率
缺点是:如果休眠的时候有消息需要处理,该消息必须要等待,会造成消息的延迟处理。对于实时性要求不强的场景,可以和业务方沟通获取到消息延迟和CPU空转的平衡点
2. 能否在阻塞等待的同时,还能立刻接收到新消息呢?可以使用Redis的BRPOP命令
for {
result, err := redisCli.BRPop(context.Background(), time.Minute, key).Result()
//处理业务逻辑
}
BRPOP表示阻塞式拉取新消息,可以传入一个超时时间,如果设置为0,表示不设置超时,直到有新消息才返回;否则,超时时间后返回NULL
还有一个要注意的点就是:如果设置的超时时间太长,这个链接太久没有活跃,会被Redis服务器判定为无效链接,强制把这个客户端踢下线。所以,客户端需要准备重连机制
总结一下,这种队列模型的缺点如下:
不支持重复消费:当某个消费者获取到消息以后,这条消息就被删除了,无法被其他消费者消费
消息丢失:消费者获取到消息以后,如果宕机或是执行业务逻辑出错,这条消息就丢失了
发布/订阅模型:Pub/Sub
可以解决刚刚的第一个问题:不支持重复消费
发布/订阅模型是多组生产者/消费者的模式,使用Publish/Subscribe
命令
假如想开启多个消费者,同时消费同一批消息,可以使用该模型。
具体如下:
使用
Subscribe
命令,启动多个消费者,订阅同一个队列此时,这些消费者都会被阻塞住,等待新消息的到来
启动一个生产者,发布一条消息
这些消费者会解除阻塞,收到生产者的新消息
而且,消费者还可以根据一定的规则去订阅生产者,即支持通配符匹配 ,对应命令PSubscribe
该模型最大的缺点就是会丢数据,如果发生消费者下线/Redis宕机/消息堆积,都会导致数据丢失。
这和实现方式有关系,Pub/Sub在实现的时候很简单,没有基于任何的数据类型,也没有做任何的数据存储,它只是为生产者和消费者建立了数据转发通道。当消费者订阅了指定的队列以后,Redis会记录一个从队列到消费者的映射关系,当生产者往这个队列里发布消息的时候,Redis就从映射关系里找到对应的消费者把消息转发过去。
如果消费者下线,在这期间生产者发布的消息,因为找不到消费者而被丢弃掉,消费者上线以后也不会重新发送
因为Pub/Sub没有对数据进行存储,也就不具备数据持久化的能力,不会被写入到AOF或RDB里,当Redis宕机以后,也就无法恢复数据
每个消费者订阅了一个队列以后,都会被分配一个缓冲区,生产者会把数据写入到该缓冲区上,消费者不断从缓冲区里读取消息。这里的缓冲区其实也就类似一块内存,内存的大小是有限的,如果消费者读取消息的速度过慢导致消息有积压,内存持续增长,如果超过了缓冲区配置的上限,Redis就会强制把这个消费者下线。具体的配置信息为
client-output-buffer-limit pubsub 32mb 8mb 60
,含义为 缓冲区一旦超过32MB,或是 超过8MB且持续60秒,直接强制把消费者下线
对比List而言,Pub/Sub相当于推模型,而List相当于拉模型
哨兵集群和Redis实例通信的时候,采用了Pub/Sub的方案,也就是即时通讯的业务场景