108.【RabbitsMQ】(八)

简介: 108.【RabbitsMQ】
(3).消费者

email

1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:45
 * @PackageName:com.jsxs.service.faout
 * @ClassName: EmailConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"email.direct.queue"})  // 这个客户端的队列是哪个?
public class EmailConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("email接收到的信息是:->"+message);
    }
}

message

1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: MessageConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"message.direct.queue"})  // 这个客户端的队列是哪个?
public class MessageConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("Message接收到的信息是:->"+message);
    }
}

sms

1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: SmsConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"sms.direct.queue"})  // 这个客户端的队列是哪个?
public class SmsConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("sms接收到的信息是:->"+message);
    }
}

wechat

1. 设置监听的那个队列?
2. 监听到的数据,输出在哪?
package com.jsxs.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: WechatConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"wechat.direct.queue"})  // 这个客户端的队列是哪个?
public class WechatConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("wechat接收到的信息是:->"+message);
    }
}

服务端先提供消息

客户端接收消息: 接受各自的消息,通过路由key进行区分的

4.主题模式 (Topic)

所有的模式都可以使用注解配置和配置类配置,这里我们用注解进行配置

(1).生产者
package com.jsxs.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
/**
 * @Author Jsxs
 * @Date 2023/4/2 11:24
 * @PackageName:com.jsxs.service
 * @ClassName: OrderService
 * @Description: TODO :  分别给对应的路由Key发送消息.
 * @Version 1.0
 */
@Service
public class OrderService {
    @Resource   //  获取rabbitMQ的服务
    private RabbitTemplate rabbitTemplate;
    /**
     *
     * @param userId
     * @param productID
     * @param num
     */
   public void makeOrder(String userId,String productID,int num){
       //1. 生成订单
       String orderID = UUID.randomUUID().toString().replace("-","");
       System.out.println("订单号已经生产成功-"+orderID);
       //2. 设置交换机名字和路由
       String exchangeName="topic_order_producer";
       //3. 发送消息
       // 参数: (交换机、路由key或队列名、消息内容)
       rabbitTemplate.convertAndSend(exchangeName,"sms","1");
       rabbitTemplate.convertAndSend(exchangeName,"sms","2");
       rabbitTemplate.convertAndSend(exchangeName,"sms","3");
       rabbitTemplate.convertAndSend(exchangeName,"sms","4");
   }
}
(2).消费者

email

1. 绑定
2. 生命队列
3. 声明交换机
4. 路由key
package com.jsxs.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:45
 * @PackageName:com.jsxs.service.faout
 * @ClassName: EmailConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(bindings = @QueueBinding(
        //  利用注解声明队列
        value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
        //  利用注解声明交换机
        exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),
        //  路由key是  "#.sms.#"
        key = "#.sms.#"
))
public class EmailConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("email接收到的信息是:->"+message);
    }
}

message

package com.jsxs.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: MessageConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(bindings = @QueueBinding(
        //  利用注解声明队列
        value = @Queue(value = "message.topic.queue",durable = "true",autoDelete = "false"),
        //  利用注解声明交换机
        exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),
        //  路由key是  "#.sms.#"
        key = "#.sms.#"
))
public class MessageConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("Message接收到的信息是:->"+message);
    }
}

sms

package com.jsxs.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: SmsConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(bindings = @QueueBinding(
        //  利用注解声明队列
        value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
        //  利用注解声明交换机
        exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),
        //  路由key是  "#.sms.#"
        key = "#.sms.#"
))
public class SmsConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("sms接收到的信息是:->"+message);
    }
}

wechat

package com.jsxs.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: WechatConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
//  监听的同时 我们对其进行绑定
@RabbitListener(bindings = @QueueBinding(
        //  利用注解声明队列
        value = @Queue(value = "wechat.topic.queue",durable = "true",autoDelete = "false"),
        //  利用注解声明交换机
        exchange = @Exchange(value = "topic_order_producer",type = ExchangeTypes.TOPIC),
        //  路由key是  "#.sms.#"
        key = "#.sms.#"
))
public class WechatConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("wechat接收到的信息是:->"+message);
    }
}

我们模糊查询的是只要包含的有 sms ,就发送信息,

(五)、RabbitMQ高级

1.过期时间TTL (队列)

概述

过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置 x-message-ttl

  1. 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
  2. 第二种方法是对消息进行单独设置,每条消息 TTL可以不同

如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息.

1. 设置队列TTL

(1).生产者
package com.jsxs.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
/**
 * @Author Jsxs
 * @Date 2023/4/2 11:24
 * @PackageName:com.jsxs.service
 * @ClassName: OrderService
 * @Description: TODO :  分别给对应的路由Key发送消息.
 * @Version 1.0
 */
@Service
public class OrderService {
    @Resource   //  获取rabbitMQ的服务
    private RabbitTemplate rabbitTemplate;
    /**
     *
     * @param userId
     * @param productID
     * @param num
     */
   public void makeOrder(String userId,String productID,int num){
       //1. 生成订单
       String orderID = UUID.randomUUID().toString().replace("-","");
       System.out.println("订单号已经生产成功-"+orderID);
       //2. 设置交换机名字和路由
       String exchangeName="ttl_order_producer";
       //3. 发送消息
       // 参数: (交换机、路由key或队列名、消息内容)
       rabbitTemplate.convertAndSend(exchangeName,"ttl","1");
       rabbitTemplate.convertAndSend(exchangeName,"ttl","2");
       rabbitTemplate.convertAndSend(exchangeName,"ttl","3");
       rabbitTemplate.convertAndSend(exchangeName,"ttl","4");
   }
}
(2).配置文件
1. 声明交换机: (假如交换机已经被定义了,我们通过代码对其进行修改属性,那么我们的代码一定会报错的。)
2. 我们声明队列的同时: 要通过HashMap定义他的过期时间和传参 (队列名、是否持久化、是否自动删除、是否、参数)
3. 将队列绑定我们的交换机
4. "x-message-ttl 这是key值
package com.jsxs.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author Jsxs
 * @Date 2023/4/3 16:16
 * @PackageName:com.jsxs.config
 * @ClassName: TTLRabbitMQConfig
 * @Description: TODO
 * @Version 1.0
 */
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import java.util.HashMap;
@Configuration
public class TTLRabbitMQConfig {
        // 1. 声明注册direct模式的交换机
    @Bean
    public DirectExchange directExchange(){
        //  (交换机的名字、是否持久化。是否自动删除)
        return new DirectExchange("ttl_order_producer",true,false);
    }
        // 2. 声明队列: 以及过期时间
    @Bean
    public Queue SmsQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("sms.ttl.queue",true,false,false,args);
    }
        @Bean
    public Queue MessageQueue(){
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-message-ttl",5000);
        return new Queue("message.ttl.queue",true,false,false,args);
    }
    @Bean
    public Queue EmailQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("email.ttl.queue",true,false,false,args);
    }
    @Bean
    public Queue WeChatQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("wechat.ttl.queue",true,false,false,args);
    }
        // 3. 将队列与交换机进行绑定的操作
    @Bean
    public Binding SmsBind(){
        return BindingBuilder.bind(SmsQueue()).to(directExchange()).with("ttl");
    }
    @Bean
    public Binding MessageBind(){
        return BindingBuilder.bind(MessageQueue()).to(directExchange()).with("ttl");
    }
    @Bean
    public Binding EmailBind(){
        return BindingBuilder.bind(EmailQueue()).to(directExchange()).with("ttl");
    }
    @Bean
    public Binding WechatBind(){
        return BindingBuilder.bind(WeChatQueue()).to(directExchange()).with("ttl");
    }
}
(3).消费者

1 .email

package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:45
 * @PackageName:com.jsxs.service.faout
 * @ClassName: EmailConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"email.ttl.queue"})  // 这个客户端的队列是哪个?
public class EmailConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("email接收到的信息是:->"+message);
    }
}
  1. message
package com.jsxs.service.ttl;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
 * @Author Jsxs
 * @Date 2023/4/2 13:44
 * @PackageName:com.jsxs.service.faout
 * @ClassName: MessageConsumer
 * @Description: TODO
 * @Version 1.0
 */
@Service
@RabbitListener(queues = {"message.ttl.queue"})  // 这个客户端的队列是哪个?
public class MessageConsumer {
    @RabbitHandler // 接收到的消息放在这
    public void receiveMessage(String message){
        System.out.println("Message接收到的信息是:->"+message);
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
缓存 网络协议 算法
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
152 0
|
机器学习/深度学习 人工智能 自然语言处理
|
机器学习/深度学习 人工智能 算法
【保姆级教程】用PAI-DSW修复亚运历史老照片
本教程整合了来自开源社区的高质量图像修复、去噪、上色等算法,并使用 Stable Diffusion WebUI 进行交互式图像修复。参与者可以根据需要进行参数调整,组合不同的处理方式以获得最佳修复效果。参与者还可以在活动页面上传修复后的成果图片,参与比赛,获胜者将有机会获得丰厚的奖品。
44345 189
【保姆级教程】用PAI-DSW修复亚运历史老照片
|
5月前
|
运维 Shell Linux
第十四章 Python发送邮件(常见四种邮件内容)
第十四章 Python发送邮件(常见四种邮件内容)
|
存储 运维 Dubbo
SAE急速部署
Serverless应用引擎 2.0的推出,您是否用过?更快速的部署,更低的成本,SAE 2.0 极简体验、极致弹性,助力企业降本增效!
10027 14
SAE急速部署
|
消息中间件 Serverless Kafka
基于 EventBridge 轻松搭建消息集成应用
基于 EventBridge 轻松搭建消息集成应用
22173 8
|
小程序
微信小程序-单个插槽
什么是插槽 • 插槽就是一个开放的接口,和现实生活中的 USB 插槽一样 • 通过插槽开放接口之后,接口要连接什么内容由使用者决定 • 所以小程序中,插槽就是一块待替换占位区域,等待使用者使用的时候替换
92 0
|
5月前
Servlet使用适配器模式进行增删改查案例(DeptServiceImpl.java)
Servlet使用适配器模式进行增删改查案例(DeptServiceImpl.java)
|
人工智能 人机交互 语音技术
INTERSPEECH2023论文解读|BAT一种低延迟低内存消耗的RNN-T模型
INTERSPEECH2023论文解读|BAT一种低延迟低内存消耗的RNN-T模型
225 0
|
IDE 编译器 开发工具
善用 vs 中的错误列表和输出窗口,高效查找 C++ 多工程编译错误
善用 vs 中的错误列表和输出窗口,高效查找 C++ 多工程编译错误