实战干货:基于Redis6.2 部署迷你版本消息队列(下)

简介: 实战干货:基于Redis6.2 部署迷你版本消息队列(下)

其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。


封装相关的消息发布功能


消息的发送部分比较简单,直接通过redis往stream里面写入数据即可


package org.idea.mq.redis.framework.producer;
/**
 * @Author linhao
 * @Date created in 12:23 下午 2022/2/10
 */
public interface IStreamProducer {
    /**
     * 指定streamName发布消息
     * @param streamName
     * @param json
     */
    void sendMsg(String streamName, String json);
}


消息的传输格式采用json字符串的方式写入到redis内部的stream当中。


package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * @Author linhao
 * @Date created in 12:19 下午 2022/2/10
 */
public class StreamProducer implements IStreamProducer{
    @Resource
    private IRedisService iRedisService;
    @Override
    public void sendMsg(String streamName,String json){
        Map<String,String> map = new HashMap<>();
        map.put("json",json);
        iRedisService.xAdd(streamName,map);
    }
}


注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:



@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}


为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:


image.png


组件的测试


点对点发送测试


建立两套微服务工程,user-serviceorder-service,其中user-service部署两个服务节点,同属user-service-grouporder-service也要部署两个服务节点,同属order-service-group


最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。


image.png


广播发送测试


使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。


image.png


具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。


问题思考


为何同一个StreamName需要采用双线程消费?


一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。


是否支持延迟重试


目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。


本文完整代码案例地址


https://gitee.com/IdeaHome_admin/mq-framework


推荐


主流Java进阶技术(学习资料分享)

Java面试题宝典

加入Spring技术开发社区

相关文章
|
2月前
|
NoSQL 安全 Linux
如何在phpStudy环境中升级Redis版本
以上流程详尽覆盖从准备工作至实际操作再至事后检查各个阶段, 遵循此方案可大幅度减少因技术操作失误导致业务影响风险发生概率, 同时也为未来进一步扩展提供坚实基础支撑点 。
115 15
|
5月前
|
消息中间件 NoSQL Linux
Redis的基本介绍和安装方式(包括Linux和Windows版本),以及常用命令的演示
Redis(Remote Dictionary Server)是一个高性能的开源键值存储数据库。它支持字符串、列表、散列、集合等多种数据类型,具有持久化、发布/订阅等高级功能。由于其出色的性能和广泛的使用场景,Redis在应用程序中常作为高速缓存、消息队列等用途。
873 16
|
12月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
264 1
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
210 6
|
JavaScript NoSQL Redis
Vue中实现修改邮箱、手机号等流程的大致过程、验证码由后端的redis生成验证(版本1.0)
这篇文章记录了在Vue中实现修改手机号和邮箱的大致流程,包括使用过滤器部分隐藏展示的手机号和邮箱,以及通过点击触发路由跳转的便捷方式。文章还描述了旧号码和新号码验证的界面实现,其中验证码由后端生成并通过弹窗展示给用户,未来可以接入真正的手机验证码接口。此外,还提供了修改邮箱的页面效果截图,并强调了学习是一个永无止境的过程。
Vue中实现修改邮箱、手机号等流程的大致过程、验证码由后端的redis生成验证(版本1.0)
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
179 0
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
剖析 Redis List 消息队列的三种消费线程模型
|
11月前
|
消息中间件 运维 UED
消息队列运维实战:攻克消息丢失、重复与积压难题
消息队列(MQ)作为分布式系统中的核心组件,承担着解耦、异步处理和流量削峰等功能。然而,在实际应用中,消息丢失、重复和积压等问题时有发生,严重影响系统的稳定性和数据的一致性。本文将深入探讨这些问题的成因及其解决方案,帮助您在运维过程中有效应对这些挑战。
247 1
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
125 2
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
295 2
下一篇
oss教程