redis灵魂拷问:如何使用stream实现消息队列

简介: redis灵魂拷问:如何使用stream实现消息队列

redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的。PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了。


redis5.0开始引入了stream这个数据结构,stream可以很好地用于消息队列,它支持消息持久化,同时可以记录消费者的位置,即使客户端断开重连,也不会丢失消息。


本篇文章我们就来聊一聊基于stream的消息队列使用。


stream队列简介


基于stream实现的消息队列有4个角色,我们来看一下:

stream:消息队列

last delivered ID:消费者组在消息队列中的offset

consumer group:消费者组,可以包含多个消费者,同时有一个last delivered ID

pending entries list (PEL):消费者已经读取但是没有ACK的消息


根据上面的描述,stream的消息队列结构如下图:

微信图片_20221212172145.png

注意:消费者组内的消费者是不会重复消费消息的,比如一个stream包括1、2、3、4这4条消息,消费者组内有2个消费者,如果其中一个消费者消费了1、2,则第二个消费者就只能消费3、4了。


命令介绍


本文使用测试环境如下

redis版本:6.0.7

springboot-redis版本:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>

里面使用到的spring-data-redis版本:2.1.9.RELEASE

里面使用到的lettuce连接池版本:5.1.7.RELEASE


本文使用的redis客户端是lettuce,lettuce提供了RedisStreamCommands和RedisStreamAsyncCommands支持stream操作,本文只介绍RedisStreamCommands。


  • 创建Commands


我们创建lettuce中的RedisStreamCommands,代码如下:

RedisURI redisURI = RedisURI.builder()
                .withSentinelMasterId("master")
                .withPassword("foobared")
                .withSentinelMasterId("master")
                .withSentinel("192.168.59.146",26379)
                .withSentinel("192.168.59.141",26379)
                .withSentinel("192.168.59.141",26389).build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands streamCommands = connection.sync();
  • 创建stream


当我们使用XADD命令往stream里面写数据时,如果stream不存在,就会创建一个,命令如下:

192.168.59.146:6379> XADD mystream * name Sara surname OConnor
"1607996267360-0"

我们看下Java示例代码:

/**
 * 命令:XADD
 *
 * 时间复杂度:O(1)
 * @param streamKey 队列名称
 * @param data 数据
 */
public void xadd(String streamKey, Map<String, String> data){
    streamCommands.xadd(streamKey, data);
}
  • 创建消费组


要消费stream队列中的数据,首先我们需要创建一个消费组,命令如下:


XGROUP CREATE mystream mygroup $ MKSTREAM

上面的$表示group的offset是队列中的最后一个元素,MKSTREAM这个参数会判断stream是否存在,如果不存在会创建一个我们指定名称的stream,不加这个参数,stream不存在会报错。


java代码如下:

/**
 * XGROUP
 *
 * 时间复杂度:O(1)
 * @param streamKey 队列名称
 * @param groupName 消费组名称
 */
public void createGroup(String streamKey, String groupName){
    streamCommands.xgroupCreate(XReadArgs.StreamOffset.latest(streamKey), groupName);
}
  • 删除消费组


删除消费组我们使用下面的命令:


XGROUP DESTROY mystream consumer-group-name

java示例代码如下:

/**
 * XGROUP
 *
 * 时间复杂度:O(M),M是pending entries list长度
 * @param streamKey 队列名称
 * @param groupName 消费组名称
 */
public void deleteGroup(String streamKey, String groupName){
    streamCommands.xgroupDestroy(streamKey, groupName);
}
  • 消费消息


消息的消费有2种方式,XREAD和XREADGROUP:

XREAD是消费组读取消息,我们看下面这个命令:
XREAD COUNT 2 STREAMS mystream writers 0-0 0-0

注意:上面这个示例是从mystream和writers这2个stream中读取消息,offset都是0,COUNT参数指定了每个队列中读取的消息数量不多余2个。


下面的java代码是从testStream这个stream中读取消息,offset是0:

//复杂度 O(N),N是要返回的消息个数
List<StreamMessage<String, String>> list4 = streamCommands.xread(XReadArgs.StreamOffset.from("testStream", "0-0"))

XREADGROUP使用消费者来消费消息,我们看下面这个命令:


XREADGROUP GROUP mygroup Alice BLOCK 2000 COUNT 1 STREAMS mystream >

这个命令是使用消费组mygroup的Alice这个消费者从mystream这个stream中读取1条消息。


注意:

  • 上面使用了BLOCK,表示是阻塞读取,如果读不到数据,会阻塞等待2s,不加这个条件默认是不阻塞的
  • ">"表示只接受其他消费者没有消费过的消息
  • 如果没有">",消费者会消费比指定id偏移量大并且没有被自己确认过的消息,这样就不用关系是否ACK过或者是否BLOCK了。

java示例代码如下:

//复杂度 O(N),N是要返回的消息个数
List<StreamMessage<String, String>> list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
                XReadArgs.StreamOffset.lastConsumed("testStream"));

这段代码是使用group1消费组中的consumer1这个消费者从testStream这个stream中使用最后一个元素作为offset来消费消息。


  • 确认消息


使用XACK命令可以对消息进行确认,命令如下:


XACK mystream mygroup 1526569495631-0

这里表示消费组mygroup确认mystream这个stream中1526569495631-0这条消息


下面的java代码是对收到的消息依次打印后进行确认,如下:

for (StreamMessage<String, String> message : list1){
    System.out.println(message.getId() + ":" + message.getBody());
    streamCommands.xack("streamTest", "group1", message.getId());
}

使用案例


这里我提供一个案例,生产者每隔1s向队列中写入1条消息,代码如下:

public void xgroupCreate(){
    //调用上面的createGroup方法
    createGroup("testStream", "group2");
    System.out.println("----------------------------");
    int i = 0;
    for (;;){
        Map<String, String> body =  Collections.singletonMap("message" + i, "value" + i);
        String key = streamCommands.xadd("testStream", body);
        System.out.println(key);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

启动程序后打印如下:

----------------------------
1608014871896-0
1608014872899-0
1608014873900-0
1608014874903-0
1608014875907-0
1608014876910-0
1608014877920-0
1608014878923-0
1608014879925-0
1608014880930-0

消费者每隔2s从队列中拉取一次消息,打印后执行XACK,代码如下:

public void xgroupRead(){
    List<StreamMessage<String, String>> list1;
    while (true){
        list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
                XReadArgs.StreamOffset.lastConsumed("testStream"));
        if (list1.isEmpty()) {
            System.out.println("==============================");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        for (StreamMessage<String, String> message : list1){
            System.out.println(message.getId() + ":" + message.getBody());
            streamCommands.xack("streamTest", "group2", message.getId());
        }
    }
}

启动任务后打印如下:

==============================
1608014871896-0:{message0=value0}
1608014872899-0:{message0=value0}
==============================
1608014873900-0:{message0=value0}
1608014874903-0:{message0=value0}
==============================
1608014875907-0:{message0=value0}
1608014876910-0:{message0=value0}
==============================
1608014877920-0:{message0=value0}
1608014878923-0:{message0=value0}
==============================
1608014879925-0:{message0=value0}
1608014880930-0:{message0=value0}
==============================

使用场景


redis的特点是读写速度快,所以对于实时性高要求效率的场景是一个不错的选择。


但是AOF+RDB的数据持久化方案可能会丢失1s的数据(AOF持久化策略使用everysec),所以对于数据一致性要求高的场景要跳过。


在物联网场景中,有大规模的传感器数据需要采集,这些数据对实时性的要求高过了一致性,使用redis是一个很好的选择


总结


使用redis的stream可以实现简单的队列,跟rabbitmq等非常成熟的消息队列相比,功能还是比较薄弱的,比如不支持exchange。


redis读写速度快的特点对实时性要求高的场景还是一个不错的选择,但是如果对数据一致性要求很高,需要绕过。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
114
分享
相关文章
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
92 6
Redis Stream:实时数据流的处理与存储
通过上述分析和具体操作示例,您可以更好地理解和应用 Redis Stream,满足各种实时数据处理需求。
77 14
Redis Stream
10月更文挑战第20天
50 2
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
40 2
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
60 0
19)消息队列的终极解决方案 Stream
19)消息队列的终极解决方案 Stream
50 0
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
172 85
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
云端问道21期实操教学-应对高并发,利用云数据库 Tair(兼容 Redis®)缓存实现极速响应
本文介绍了如何通过云端问道21期实操教学,利用云数据库 Tair(兼容 Redis®)缓存实现高并发场景下的极速响应。主要内容分为四部分:方案概览、部署准备、一键部署和完成及清理。方案概览中,展示了如何使用 Redis 提升业务性能,降低响应时间;部署准备介绍了账号注册与充值步骤;一键部署详细讲解了创建 ECS、RDS 和 Redis 实例的过程;最后,通过对比测试验证了 Redis 缓存的有效性,并指导用户清理资源以避免额外费用。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等