【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  


我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)

直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的  于是我想起了之前学过的技术栈


Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的


Redis作为消息队列的优缺点:


使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点:


  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。
  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。
  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。
  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。
  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。


缺点也很明显:


  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。
  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。


应用场景:


适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑


如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。


Redis实现消息队列系统 实现步骤:


配置Redis:


  1. 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
  redis:
    host: 
    port: 6379
    password: 
    lettuce:
      pool:
        max-active: 1000
        max-idle: 1000
        min-idle: 0
        time-between-eviction-runs: 10s
        max-wait: 10000
  1. 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:
 @Bean
    public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultSerializer(new StringRedisSerializer());
        return template;
    }


设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

redisTemplate.setDefaultSerializer(new StringRedisSerializer());


实现消息的发布和订阅功能。


  • 发布消息:
redisTemplate.convertAndSend("channel_name", "message_payload");
  • 在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。
  • 订阅消息:
  • 首先,创建一个MessageListener实现类来处理接收到的消息:
public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理接收到的消息
        String channel = new String(message.getChannel());
        String payload = new String(message.getBody());
        // 执行自定义的逻辑
    }
}

创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:

LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:

RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();


通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。


以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增


于是就有了第二种写法 :


实战与改良


/***
 * @title MessageManager
 * @author SUZE
 * @Date 2-17
 **/
@Component
public class ReservedMessageManager {
    private String ListenerId;
    private String UserId;
    private String message;
    private final RedisTemplate<String, String> redisTemplate;
 
    @Autowired
    public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        subscribeToChannel("reserved");
    }
    @Resource
    private SmsServer smsServer;
 
    public void publishMessage(String channel, reserveMessage message) {
        String  Message=serialize(message);
        redisTemplate.convertAndSend("channel_name", "message_payload");
        redisTemplate.convertAndSend(channel, Message);
    }
    // 接收到消息时触发的事件
    private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
        if (reserveMessage != null) {
            String userId = reserveMessage.getUserId();
            String ListenerId=reserveMessage.getListenerId();
            String message = reserveMessage.getMessage();
            //TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容
            switch (message){
                //TODO 消息要给两边都发 所以要发两份 发信息的文案
                case "wait":
 
                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "agree":
 
                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "refuse":
 
                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "over":
                    //这里要操作文档系统了
 
                    //拒绝的话 那就要监听一下
                    smsServer.sendSms(userId,ListenerId,message);
                    break;
 
            }
            //smsServer.sendSms(userId,ListenerId,message);
            // 其他处理逻辑...
        }
    }
 
    public void subscribeToChannel(String channel) {
        redisTemplate.execute((RedisCallback<Object>) (connection) -> {
            connection.subscribe((message, pattern) -> {
                String channelName = new String(message.getChannel());
                byte[] body = message.getBody();
                // 解析接收到的消息
                switch (channelName){
                    case "reserved":
                        reserveMessage reserveMessage = deserializeMessage(new String(body));
                        handleReserveMessage(channelName, reserveMessage);
                        break;
                    //还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由
                }
            }, channel.getBytes());
            return null;
        });
    }
    // 反序列化
    private reserveMessage deserializeMessage(String body) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(body, reserveMessage.class);
        } catch (IOException e) {
            // 处理反序列化异常
            e.printStackTrace();
            return null;
        }
    }
 
    // 序列化
    public String serialize(reserveMessage reserveMessage) throws SerializationException {
        if (reserveMessage == null) {
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString(reserveMessage);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }
 
}


代码解释


  1. subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
  2. redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
  3. 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
  4. 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
  5. 在消息回调函数中,首先从message对象中获取通道名称和消息体。
  6. 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
  7. 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
  8. 在switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
  9. 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
  10. 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
  11. handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userId和listenerId发送短信。


我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)


但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低


那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  


遇到的问题:


对了 中途遇到了这样一个错误


错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)


原因与分析:


reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。


为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。


在你的reserveMessage类中


这个是改好的封装类:

 

@Data
public class reserveMessage {
    private String UserId;
    private String ListenerId;
    private String message;
 
 
    public reserveMessage() {
        // 默认构造函数
    }
    public reserveMessage(String userId, String ListenerId,String message) {
        this.UserId = userId;
        this.ListenerId = ListenerId;
        this.message=message;
    }
 
 
}


实际业务中的测试


发布服务


订阅服务(监听服务)


测试结果


成功


这里面的打印是代替了原本业务中的短信发送 也算是成了

相关文章
|
25天前
|
存储 NoSQL 前端开发
Redis专题-实战篇一-基于Session和Redis实现登录业务
本项目基于SpringBoot实现黑马点评系统,涵盖Session与Redis两种登录方案。通过验证码登录、用户信息存储、拦截器校验等流程,解决集群环境下Session不共享问题,采用Redis替代Session实现数据共享与自动续期,提升系统可扩展性与安全性。
143 3
Redis专题-实战篇一-基于Session和Redis实现登录业务
|
25天前
|
存储 缓存 NoSQL
Redis专题-实战篇二-商户查询缓存
本文介绍了缓存的基本概念、应用场景及实现方式,涵盖Redis缓存设计、缓存更新策略、缓存穿透问题及其解决方案。重点讲解了缓存空对象与布隆过滤器的使用,并通过代码示例演示了商铺查询的缓存优化实践。
113 1
Redis专题-实战篇二-商户查询缓存
|
25天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
281 1
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
6月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
6月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
4月前
|
缓存 监控 NoSQL
Redis 实操要点:Java 最新技术栈的实战解析
本文介绍了基于Spring Boot 3、Redis 7和Lettuce客户端的Redis高级应用实践。内容包括:1)现代Java项目集成Redis的配置方法;2)使用Redisson实现分布式可重入锁与公平锁;3)缓存模式解决方案,包括布隆过滤器防穿透和随机过期时间防雪崩;4)Redis数据结构的高级应用,如HyperLogLog统计UV和GeoHash处理地理位置。文章提供了详细的代码示例,涵盖Redis在分布式系统中的核心应用场景,特别适合需要处理高并发、分布式锁等问题的开发场景。
303 40
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2532 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
4月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1097 7
|
4月前
|
机器学习/深度学习 存储 NoSQL
基于 Flink + Redis 的实时特征工程实战:电商场景动态分桶计数实现
本文介绍了基于 Flink 与 Redis 构建的电商场景下实时特征工程解决方案,重点实现动态分桶计数等复杂特征计算。通过流处理引擎 Flink 实时加工用户行为数据,结合 Redis 高性能存储,满足推荐系统毫秒级特征更新需求。技术架构涵盖状态管理、窗口计算、Redis 数据模型设计及特征服务集成,有效提升模型预测效果与系统吞吐能力。
380 2
下一篇
日志分析软件