项目业务使用【发布订阅模式】
1.maven
org.springframework.amqp
spring-rabbit
————————————————
2.mq连接信息
MQ连接
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#配置RabbitMq虚拟主机的路径(默认为“/" 可以省略)
virtual-host: /
3.RabbitMQConfig
————————————————
package com.test.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- MQ消息队列配置
- 1.创建交换机
- 2.创建队列
- 3.将交换机与队列进行绑定
* - @author wangwei
@date 2023-08-03 13:53:00
/
@Configuration
public class RabbitMQConfig {
/*定制json 格式的消息转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}/*消息队列 对象
- 1.创建名称为 fanout_queue_notify 的消息队列
- 2.目前集成mq做 任务下发通知的消息队列
@return
*/
@Bean
public Queue fanoutQueryNotify() {
return new Queue("fanout_queue_notify1");
}/*
- 交换机对象
1.fanout创建一个交换机Channels
*/
@Bean
public FanoutExchange fanoutExchange() {
//创建一个fanout模式的交换机(发布订阅模式)
return new FanoutExchange("fanout_exchange1");
}/*
- 将创建的队列绑定到对应的交换机上
*/
@Bean
public Binding bindingNotify() {
return BindingBuilder.bind(fanoutQueryNotify()).to(fanoutExchange());
}
}
4.消息发送者
package com.ruoyi.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- MQ消息队列配置
- 1.创建交换机
- 2.创建队列
- 3.将交换机与队列进行绑定
* - @author wangwei
@date 2023-08-03 13:53:00
/
@Configuration
public class RabbitMQConfig {
/*定制json 格式的消息转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}/*消息队列 对象
- 1.创建名称为 fanout_queue_notify 的消息队列
- 2.目前集成mq做 任务下发通知的消息队列
@return
*/
@Bean
public Queue fanoutQueryNotify() {
return new Queue("fanout_queue_notify1");
}/*
- 交换机对象
1.fanout创建一个交换机Channels
*/
@Bean
public FanoutExchange fanoutExchange() {
//创建一个fanout模式的交换机(发布订阅模式)
return new FanoutExchange("fanout_exchange1");
}/*
- 将创建的队列绑定到对应的交换机上
*/
@Bean
public Binding bindingNotify() {
return BindingBuilder.bind(fanoutQueryNotify()).to(fanoutExchange());
}
}
5.消息消费者
package com.ruoyi.consumption;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
- 消息消费者
- @author wangwei
@date 2023-08-03 13:53:00
*/
@Service
public class MQConsumptionService {//发布订阅模式 @RabbitListener 可以指定当前方法监听哪一个队列
@RabbitListener(queues = "fanout_queue_notify")
public void subConsumptionNotify(Message message){//消息内容在消息队列里面是以 字节形式存放的 byte[] body = message.getBody(); String stringMsg = new String(body); //todo 待详细集成松江反诈 的具体业务,这里只是测试接收消息的案例 System.out.printf("我是消息接受者/消费者,接收到的队列消息内容为: =======》"+stringMsg);
}
}