redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的。PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了。
redis5.0开始引入了stream这个数据结构,stream可以很好地用于消息队列,它支持消息持久化,同时可以记录消费者的位置,即使客户端断开重连,也不会丢失消息。
本篇文章我们就来聊一聊基于stream的消息队列使用。
stream队列简介
基于stream实现的消息队列有4个角色,我们来看一下:
stream:消息队列
last delivered ID:消费者组在消息队列中的offset
consumer group:消费者组,可以包含多个消费者,同时有一个last delivered ID
pending entries list (PEL):消费者已经读取但是没有ACK的消息
根据上面的描述,stream的消息队列结构如下图:
注意:消费者组内的消费者是不会重复消费消息的,比如一个stream包括1、2、3、4这4条消息,消费者组内有2个消费者,如果其中一个消费者消费了1、2,则第二个消费者就只能消费3、4了。
命令介绍
本文使用测试环境如下:
redis版本:6.0.7
springboot-redis版本:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.6.RELEASE</version> </dependency>
里面使用到的spring-data-redis版本:2.1.9.RELEASE
里面使用到的lettuce连接池版本:5.1.7.RELEASE
本文使用的redis客户端是lettuce,lettuce提供了RedisStreamCommands和RedisStreamAsyncCommands支持stream操作,本文只介绍RedisStreamCommands。
- 创建Commands
我们创建lettuce中的RedisStreamCommands,代码如下:
RedisURI redisURI = RedisURI.builder() .withSentinelMasterId("master") .withPassword("foobared") .withSentinelMasterId("master") .withSentinel("192.168.59.146",26379) .withSentinel("192.168.59.141",26379) .withSentinel("192.168.59.141",26389).build(); RedisClient client = RedisClient.create(redisURI); StatefulRedisConnection<String, String> connection = client.connect(); RedisStreamCommands streamCommands = connection.sync();
- 创建stream
当我们使用XADD命令往stream里面写数据时,如果stream不存在,就会创建一个,命令如下:
192.168.59.146:6379> XADD mystream * name Sara surname OConnor "1607996267360-0"
我们看下Java示例代码:
/** * 命令:XADD * * 时间复杂度:O(1) * @param streamKey 队列名称 * @param data 数据 */ public void xadd(String streamKey, Map<String, String> data){ streamCommands.xadd(streamKey, data); }
- 创建消费组
要消费stream队列中的数据,首先我们需要创建一个消费组,命令如下:
XGROUP CREATE mystream mygroup $ MKSTREAM
上面的$表示group的offset是队列中的最后一个元素,MKSTREAM这个参数会判断stream是否存在,如果不存在会创建一个我们指定名称的stream,不加这个参数,stream不存在会报错。
java代码如下:
/** * XGROUP * * 时间复杂度:O(1) * @param streamKey 队列名称 * @param groupName 消费组名称 */ public void createGroup(String streamKey, String groupName){ streamCommands.xgroupCreate(XReadArgs.StreamOffset.latest(streamKey), groupName); }
- 删除消费组
删除消费组我们使用下面的命令:
XGROUP DESTROY mystream consumer-group-name
java示例代码如下:
/** * XGROUP * * 时间复杂度:O(M),M是pending entries list长度 * @param streamKey 队列名称 * @param groupName 消费组名称 */ public void deleteGroup(String streamKey, String groupName){ streamCommands.xgroupDestroy(streamKey, groupName); }
- 消费消息
消息的消费有2种方式,XREAD和XREADGROUP:
XREAD是消费组读取消息,我们看下面这个命令: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
注意:上面这个示例是从mystream和writers这2个stream中读取消息,offset都是0,COUNT参数指定了每个队列中读取的消息数量不多余2个。
下面的java代码是从testStream这个stream中读取消息,offset是0:
//复杂度 O(N),N是要返回的消息个数 List<StreamMessage<String, String>> list4 = streamCommands.xread(XReadArgs.StreamOffset.from("testStream", "0-0"))
XREADGROUP使用消费者来消费消息,我们看下面这个命令:
XREADGROUP GROUP mygroup Alice BLOCK 2000 COUNT 1 STREAMS mystream >
这个命令是使用消费组mygroup的Alice这个消费者从mystream这个stream中读取1条消息。
注意:
- 上面使用了BLOCK,表示是阻塞读取,如果读不到数据,会阻塞等待2s,不加这个条件默认是不阻塞的
- ">"表示只接受其他消费者没有消费过的消息
- 如果没有">",消费者会消费比指定id偏移量大并且没有被自己确认过的消息,这样就不用关系是否ACK过或者是否BLOCK了。
java示例代码如下:
//复杂度 O(N),N是要返回的消息个数 List<StreamMessage<String, String>> list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"), XReadArgs.StreamOffset.lastConsumed("testStream"));
这段代码是使用group1消费组中的consumer1这个消费者从testStream这个stream中使用最后一个元素作为offset来消费消息。
- 确认消息
使用XACK命令可以对消息进行确认,命令如下:
XACK mystream mygroup 1526569495631-0
这里表示消费组mygroup确认mystream这个stream中1526569495631-0这条消息
下面的java代码是对收到的消息依次打印后进行确认,如下:
for (StreamMessage<String, String> message : list1){ System.out.println(message.getId() + ":" + message.getBody()); streamCommands.xack("streamTest", "group1", message.getId()); }
使用案例
这里我提供一个案例,生产者每隔1s向队列中写入1条消息,代码如下:
public void xgroupCreate(){ //调用上面的createGroup方法 createGroup("testStream", "group2"); System.out.println("----------------------------"); int i = 0; for (;;){ Map<String, String> body = Collections.singletonMap("message" + i, "value" + i); String key = streamCommands.xadd("testStream", body); System.out.println(key); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
启动程序后打印如下:
---------------------------- 1608014871896-0 1608014872899-0 1608014873900-0 1608014874903-0 1608014875907-0 1608014876910-0 1608014877920-0 1608014878923-0 1608014879925-0 1608014880930-0
消费者每隔2s从队列中拉取一次消息,打印后执行XACK,代码如下:
public void xgroupRead(){ List<StreamMessage<String, String>> list1; while (true){ list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"), XReadArgs.StreamOffset.lastConsumed("testStream")); if (list1.isEmpty()) { System.out.println("=============================="); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } for (StreamMessage<String, String> message : list1){ System.out.println(message.getId() + ":" + message.getBody()); streamCommands.xack("streamTest", "group2", message.getId()); } } }
启动任务后打印如下:
============================== 1608014871896-0:{message0=value0} 1608014872899-0:{message0=value0} ============================== 1608014873900-0:{message0=value0} 1608014874903-0:{message0=value0} ============================== 1608014875907-0:{message0=value0} 1608014876910-0:{message0=value0} ============================== 1608014877920-0:{message0=value0} 1608014878923-0:{message0=value0} ============================== 1608014879925-0:{message0=value0} 1608014880930-0:{message0=value0} ==============================
使用场景
redis的特点是读写速度快,所以对于实时性高要求效率的场景是一个不错的选择。
但是AOF+RDB的数据持久化方案可能会丢失1s的数据(AOF持久化策略使用everysec),所以对于数据一致性要求高的场景要跳过。
在物联网场景中,有大规模的传感器数据需要采集,这些数据对实时性的要求高过了一致性,使用redis是一个很好的选择
总结
使用redis的stream可以实现简单的队列,跟rabbitmq等非常成熟的消息队列相比,功能还是比较薄弱的,比如不支持exchange。
redis读写速度快的特点对实时性要求高的场景还是一个不错的选择,但是如果对数据一致性要求很高,需要绕过。