Redis Stream
Redis最新的大版本5.0已经RC1了,其中最重要的Feature莫过于Redis Stream
了,关于Redis Stream的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。Redis Stream
本质上是在Redis内核上(非Redis Module)实现的一个消息发布订阅功能组件。相比于现有的PUB/SUB
、BLOCKED LIST
,其虽然也可以在简单的场景下作为消息队列来使用,但是Redis Stream
无疑要完善很多。Redis Stream
提供了消息的持久化和主备复制功能、新的RadixTree数据结构来支持更高效的内存使用和消息读取、甚至是类似于Kafka
的Consumer Group
功能。今天我们重点关注怎么在实际业务场景下去使用Redis Stream
。
Redis Stream实战——IRC系统
相信大家对IRC都比较了解了(还记得被和谐掉的xx聊天室吗:-)),很多知名的开源项目(包括Redis)都有自己的IRC频道,方便开发者和使用者实时的进行思想火花的碰撞,我们今天介绍的主角——Redis Stream,本身就是起源于IRC中一个用户的idea。IRC的模型如下,
在某个IRC频道中的用户,既可以向所有的其他用户自由的发送消息,也可以接收其他所有用户发送的消息。如果要基于Redis来构建一个IRC系统,那我们不由自主的会想到使用Redis的PUB/SUB
功能,
可以看到,基于PUB/SUB
,只需要所有的用户(client)都订阅(subscribe
)同一个IRC频道(channel1),就可以接收所有用户发出的消息了。发出消息时,只需使用发布命令(publish
)命令即可。整个业务逻辑非常的清晰简单,这也是Redis强大和流行的重要原因——提供的功能和数据结构能尽可能提升开发者的开发效率。
但是基于PUB/SUB
构建的IRC,有一个问题是PUB/SUB
的消息模型是Fire and Forgot
。也就是说Redis本身并不保存任何历史消息,如果IRC中某个用户的网络连接出现异常,重新加入IRC后,他是看不到断链期间的聊天记录的,新加入的用户同样也看不到最近一段时间的历史记录,这个对用户迅速的理解当前讨论的问题非常不便。此外,如果Redis发生了重启,所有的用户也需要重新订阅频道。
那如果基于Redis Stream来构建IRC呢?
- 创建频道
# 目前Redis还不支持创建空的stream,所以我们可以添加一个特殊消息,
# 来创建一个新的stream(频道)
ip:7000> xadd channel1 * create-channel null
1528702126345-0
- 发送消息
# 发送一条消息,只需要使用xadd命令即可,我们可以给每条消息命名,顺便带上消息来源,方便业务逻辑处理。
# 我们也可以一次发送多条消息,可以作为优化网络开销的一种手段。
ip:7000> xadd channel1 * msg1-tony "Hello everyone."
1528702503377-0
ip:7000> xadd channel1 * msg2-tony "I am a big Redis fan." msg3-tony "Hope we can learn from each other.:-)"
1528702573546-0
- 接收消息
# 新用户初次加入频道时,指定'$'作为一个特殊起始ID读取消息,表示只接收最新的频道消息
# 之后如果新消息,只需从上一次的返回结果ID继续读取即可
# 当没有新消息时,xread命令返回空集
ip:7000> xread BLOCK 100 STREAMS channel1 $
1) 1) "channel1"
2) 1) 1) 1528703048021-0
2) 1) "msg1-tony"
2) "Hello everyone."
ip:7000> xread BLOCK 100 STREAMS channel1 1528703048021-0
1) 1) "channel1"
2) 1) 1) 1528703061087-0
2) 1) "msg2-tony"
2) "I am a big Redis fan."
3) "msg3-tony"
4) "Hope we can learn from each other.:-)"
ip:7000> xread BLOCK 100 STREAMS channel1 1528703061087-0
(nil)
- 获取历史消息
前面我们提到了,Redis Stream
和PUB/SUB
相比,一个重要的区别是,Redis Stream
可以获取历史发送的消息,所以当一个用户断开连接重新加入IRC时,可以通过如下方式获取历史消息:
# 1528703061087-0 为用户记录的最后接收的消息的ID
ip:7000> xrange channel1 1528703061087-0 +
1) 1) 1528706457462-0
2) 1) "msg1-andy"
2) "Nice to meet you guys."
2) 1) 1528706497200-0
2) 1) "msg4-tony"
2) "When will Redis 5.0 GA comes out?"
3) 1) 1528706601973-0
2) 1) "msg1-antirez"
2) "I think it will arrive in the second half of 2018."
Redis Stream实战——IoT数据采集
Redis除了强大而且丰富的数据结构支持,还有一个很重要的能力是跨平台,甚至是作为一个嵌入式的存储系统跑在基于ARM的平台上,比如作者之前就宣称,Redis成功的跑在了“树莓派”上。
试想一下,在IoT时代,会有无数随时随地可以接入互联网的智能设备,你家里的冰箱会实时的汇报,冰箱里面有哪些食物,数量多少,新鲜程度如何,空调会汇报现在温度多少,空气质量如何,你的车会不断的汇报发动机的各项数据,变速箱的各项数据,车内空气的各项数据。这么多的IoT设备会形成巨大的数据洪流,采集完成后在云端进行分析,产生巨大的用户价值。
这些数据虽然内容各个不同,但是都有一个共同的特点,都是一种时序数据。看到这里,你可能会突然发现,Redis Stream从设计初就是为了支持时间序列数据而生(见第一部分Redis Stream介绍),Redis又成功的跑在了ARM平台,而未来物联网会有万亿级的设备基于ARM平台。所以,我们不由自主的可以猜想,除了现在在各种互联网服务中作为Cache和KV存储广泛应用,Redis下一个大放异彩的领域也许就在物联网。
上面这个图,就是一个典型的物联网设备信息采集,分析,展示的架构。Redis作为一个嵌入式的存储系统跑在各个IoT设备上,各个设备使用Redis Stream
暂存产生的时序数据,然后再异步的推送到云端。云上部署的各个业务程序,会读取推送的原始数据,基于一定的规则进行分析,然后将结果写入可靠的数据存储系统。用户读取结果,在APP或者web页面上进行展示,从而整个系统形成一个闭环。
作者简介
夏德军,花名夏周,阿里云Redis技术专家,负责阿里云Redis内核开发和维护。活跃于开源社区,Redis Contributor,设计并实现了阿里云Redis开源项目ApsaraCache的部分核心feature,如时间点恢复,binlog同步等。