其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。
封装相关的消息发布功能
消息的发送部分比较简单,直接通过redis往stream里面写入数据即可
package org.idea.mq.redis.framework.producer; /** * @Author linhao * @Date created in 12:23 下午 2022/2/10 */ public interface IStreamProducer { /** * 指定streamName发布消息 * @param streamName * @param json */ void sendMsg(String streamName, String json); }
消息的传输格式采用json字符串的方式写入到redis内部的stream当中。
package org.idea.mq.redis.framework.producer; import org.idea.mq.redis.framework.redis.IRedisService; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @Author linhao * @Date created in 12:19 下午 2022/2/10 */ public class StreamProducer implements IStreamProducer{ @Resource private IRedisService iRedisService; @Override public void sendMsg(String streamName,String json){ Map<String,String> map = new HashMap<>(); map.put("json",json); iRedisService.xAdd(streamName,map); } }
注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:
@Override public boolean xAdd(String streamName, Map<String, String> stringMap) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap); return true; } catch (Exception e) { throw new RuntimeException(e); } }
为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:
组件的测试
点对点发送测试
建立两套微服务工程,user-service
和 order-service
,其中user-service
部署两个服务节点,同属user-service-group
。order-service
也要部署两个服务节点,同属order-service-group
。
最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。
广播发送测试
使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。
具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。
问题思考
为何同一个StreamName需要采用双线程消费?
一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。
是否支持延迟重试
目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。
本文完整代码案例地址
推荐