摄影:产品经理厨师:kingname
我们在使用Redis做消息队列的时候,常常使用列表这个数据结构,并写出如下的代码:
import redis import time client = redis.Redis() while True: data = client.lpop('key') if not data: time.sleep(5 * 60) continue parse(data)
这个意思是说,如果Redis的名为 key
的列表里面有数据,那么就不停取出来,并把取出的数据传入 parse
函数进行处理。如果列表里面没有数据,那么就等待5分钟再次检查。
这种方式,如果列表始终为空,那么将会每5分钟检查一次。
但使用这种方式有一个问题,例如刚刚检查完列表发现是空的,程序开始等待5分钟。但是检查完成的1秒以后,新信息抵达列表。此时信息无法被立刻消费,必需要等满5分钟才行。
为了解决实时性的问题,使用了列表的阻塞式弹出命令 blpop
,于是代码变为:
import redis client = redis.Redis() while True: data = client.blpop('key') parse(data[1])
如果列表里面没有数据,程序就会卡在 data=client.blpop('key')
这一行,直到列表里面有了新的数据,再立刻弹出送进后面的逻辑。
这种方法在测试的时候完美运行,于是就放到了生成环境使用。
一个周末过去了,周一上班时,同事反馈写到Redis里面的信息不能被正常消费了。
但是我发现程序正常运行,并没有出现任何异常。
于是我找 redis-py
的作者 AndyMcCurdy
询问原因,他给我的回复如下图所示。
意思是说,在使用 blpop
的时候,如果中途因为网络波动或者某些其他原因导致连接池失效,那么就永远接收不到信息了,虽然 redis-py
有连接状态检查的功能,但是由于程序是阻塞的, redis-py
的连接状态检查功能不能正常使用。
为了解决这个问题,就需要 blpop
的超时功能。让 blpop
每几分钟就断开,检查一下网络,再重新连上。
于是代码变为:
import redis client = redis.Redis() while True: data = client.blpop('key', timeout=5 * 60) if not data: continue parse(data[1])
设置超时时间为5分钟,如果5分钟内列表没有收到信息(无论是真的列表一直没有数据,还是连接池断开了),都会返回 None
,此时只需要重新执行 blpop
,在执行的瞬间会检查连接的状态,如果连接池有问题,那么它会重新连接。
如果5分钟内有数据,那么 blpop
就会立刻把数据弹出来。
通过添加超时时间,解决了数据实时性和网络连接丢失的问题。