实战干货:基于Redis6.2 部署迷你版本消息队列(下)

简介: 实战干货:基于Redis6.2 部署迷你版本消息队列(下)

其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。


封装相关的消息发布功能


消息的发送部分比较简单,直接通过redis往stream里面写入数据即可


package org.idea.mq.redis.framework.producer;
/**
 * @Author linhao
 * @Date created in 12:23 下午 2022/2/10
 */
public interface IStreamProducer {
    /**
     * 指定streamName发布消息
     * @param streamName
     * @param json
     */
    void sendMsg(String streamName, String json);
}


消息的传输格式采用json字符串的方式写入到redis内部的stream当中。


package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * @Author linhao
 * @Date created in 12:19 下午 2022/2/10
 */
public class StreamProducer implements IStreamProducer{
    @Resource
    private IRedisService iRedisService;
    @Override
    public void sendMsg(String streamName,String json){
        Map<String,String> map = new HashMap<>();
        map.put("json",json);
        iRedisService.xAdd(streamName,map);
    }
}


注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:



@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}


为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:


image.png


组件的测试


点对点发送测试


建立两套微服务工程,user-serviceorder-service,其中user-service部署两个服务节点,同属user-service-grouporder-service也要部署两个服务节点,同属order-service-group


最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。


image.png


广播发送测试


使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。


image.png


具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。


问题思考


为何同一个StreamName需要采用双线程消费?


一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。


是否支持延迟重试


目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。


本文完整代码案例地址


https://gitee.com/IdeaHome_admin/mq-framework


推荐


主流Java进阶技术(学习资料分享)

Java面试题宝典

加入Spring技术开发社区

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
30天前
|
消息中间件 NoSQL Java
springboot redis 实现消息队列
springboot redis 实现消息队列
34 1
|
2月前
|
消息中间件 NoSQL Java
Redis List:打造高效消息队列的秘密武器【redis实战 一】
Redis List:打造高效消息队列的秘密武器【redis实战 一】
110 0
|
2月前
|
消息中间件 存储 NoSQL
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
62 0
|
2月前
|
消息中间件 NoSQL Java
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
187 1
|
4天前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
1月前
|
消息中间件 存储 缓存
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
45 0