基于Stream实现消息队列
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
- 消息ID的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:
下边的部分我们直接来进入关于Redis XStream相关的实战环节。
封装消息监听功能
首先是定义一个MQ相关的接口:
public interface RedisStreamListener { /** * 处理正常消息 */ HandlerResult handleMsg(StreamEntry streamEntry); } 复制代码
接着是基于这套接口做消息发送的实现:
package org.idea.mq.redis.framework.listener; import com.alibaba.fastjson.JSON; import org.idea.mq.redis.framework.bean.HandlerResult; import org.idea.mq.redis.framework.config.StreamListener; import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener; import org.idea.mq.redis.framework.redis.IRedisService; import org.idea.mq.redis.framework.utils.PayMsg; import redis.clients.jedis.StreamEntry; import javax.annotation.Resource; import java.util.Map; import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS; /** * @Author linhao * @Date created in 10:07 下午 2022/2/9 */ @StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer") public class OrderPayedListener implements RedisStreamMQListener { @Resource private IRedisService iRedisService; @Override public HandlerResult handleMsg(StreamEntry streamEntry) { Map<String, String> map = streamEntry.getFields(); String json = map.get("json"); PayMsg payMsg = JSON.parseObject(json, PayMsg.class); System.out.println("pending payMsg is : " + payMsg); return SUCCESS; } } 复制代码
自定义消息注解
package org.idea.mq.redis.framework.config; import org.springframework.stereotype.Component; import java.lang.annotation.*; /** * @Author linhao * @Date created in 10:04 下午 2022/2/9 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface StreamListener { String streamName() default ""; String groupName() default ""; String consumerName() default ""; } 复制代码
代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中。
为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:
package org.idea.mq.redis.framework.config; import org.idea.mq.redis.framework.bean.HandlerResult; import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener; import org.idea.mq.redis.framework.redis.IRedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.StreamPendingEntry; import javax.annotation.Resource; import java.util.List; import java.util.Map; import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS; /** * @Author linhao * @Date created in 3:25 下午 2022/2/7 */ @Configuration public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> { @Resource private ApplicationContext applicationContext; @Resource private IRedisService iRedisService; private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class); @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class); beanMap.values().forEach(redisStreamMQListener -> { StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class); ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName()); Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService)); Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService)); handleThread.start(); pendingHandleThread.start(); logger.info("{} load successed ", redisStreamMQListener); }); } class PendingMsgHandlerThread implements Runnable { private ListenerInitWrapper listenerInitWrapper; private RedisStreamMQListener redisStreamMQListener; private IRedisService iRedisService; public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) { this.redisStreamMQListener = redisStreamMQListener; this.listenerInitWrapper = listenerInitWrapper; this.iRedisService = iRedisService; } @Override public void run() { String startId = "0-0"; while (true) { List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1); //如果该集合非空,则触发监听行为 if (!CollectionUtils.isEmpty(streamConsumersInfos)) { for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) { StreamEntryID streamEntryID = streamConsumersInfo.getID(); //比当前pending的streamId小1 String streamIdStr = streamEntryID.toString(); String[] items = streamIdStr.split("-"); Long timestamp = Long.valueOf(items[0]) - 1; String beforeId = timestamp + "-" + "0"; List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName()); for (Map.Entry<String, List<StreamEntry>> streamInfo : result) { List<StreamEntry> streamEntries = streamInfo.getValue(); for (StreamEntry streamEntry : streamEntries) { try { //业务处理 HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry); if (SUCCESS.equals(handlerResult)) { startId = streamEntryID.toString(); iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId)); } } catch (Exception e) { logger.error("[PendingMsgHandlerThread] e is ", e); } } } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class CoreMsgHandlerThread implements Runnable { private ListenerInitWrapper listenerInitWrapper; private RedisStreamMQListener redisStreamMQListener; private IRedisService iRedisService; public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) { this.redisStreamMQListener = redisStreamMQListener; this.listenerInitWrapper = listenerInitWrapper; this.iRedisService = iRedisService; } @Override public void run() { while (true) { List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName()); for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) { List<StreamEntry> streamEntries = streamInfo.getValue(); for (StreamEntry streamEntry : streamEntries) { //业务处理 try { HandlerResult result = redisStreamMQListener.handleMsg(streamEntry); if (SUCCESS.equals(result)) { iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID()); } } catch (Exception e) { logger.error("[CoreMsgHandlerThread] e is ", e); } } } } } } } 复制代码
其原理是在Spring容器启动好了之后,监听Spring容器内部发出的
ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。
封装相关的消息发布功能
消息的发送部分比较简单,直接通过redis往stream里面写入数据即可
package org.idea.mq.redis.framework.producer; /** * @Author linhao * @Date created in 12:23 下午 2022/2/10 */ public interface IStreamProducer { /** * 指定streamName发布消息 * @param streamName * @param json */ void sendMsg(String streamName, String json); } 复制代码
消息的传输格式采用json字符串的方式写入到redis内部的stream当中。
package org.idea.mq.redis.framework.producer; import org.idea.mq.redis.framework.redis.IRedisService; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @Author linhao * @Date created in 12:19 下午 2022/2/10 */ public class StreamProducer implements IStreamProducer{ @Resource private IRedisService iRedisService; @Override public void sendMsg(String streamName,String json){ Map<String,String> map = new HashMap<>(); map.put("json",json); iRedisService.xAdd(streamName,map); } } 复制代码
注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:
@Override public boolean xAdd(String streamName, Map<String, String> stringMap) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap); return true; } catch (Exception e) { throw new RuntimeException(e); } } 复制代码
为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:
组件的测试
点对点发送测试
建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group。
最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。
广播发送测试
使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。
具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。
问题思考
为何同一个StreamName需要采用双线程消费?
一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。
是否支持延迟重试
目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5…分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。
本文完整代码案例地址