创建项目引入依赖
1. 创建maven项目
2. 引入相应的依赖以及配置文件
3. rabbitmq依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.0.1.RELEASE</version> </dependency>
4. 配置文件配置
server: port: 1111#端口号 spring: rabbitmq: host: 127.0.0.1 #rabbitmq主机 port: 5672 #端口 username: guest #用户名 password: guest #密码 virtual-host: / #权限 listener: type: simple #容器类型.simple或direct simple: default-requeue-rejected: false #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) #手动接收消息 acknowledge-mode: manual #ack模式
声明交换机和队列
1. 声明常量作为交换机以及队列的名称
声明这里在类添加configuration注解 把声明的Bean添加到spring容器当中
Ctrl+shift+u 转换大小写
这里队列声明的后面带个数字就是随便写的,本人队列有点多所有后面添了一个标识。名称自己换一换就行
//延迟交换机fanout public static final String DELAY_EXCHANGE_NAME_30="delay_exchange_name_30"; //延时队列A public static final String DELAY_QUEUEA_NAME_30="delay_queuea_name_30"; //延时队列B public static final String DELAY_QUEUEB_NAME_30="delay_queueb_name_30"; //死信交换机 public static final String DEAD_LETTER_EXCHANGE_NAME_30="dead_letter_exchange_name_30"; //死信队列A public static final String DEAD_LETTER_QUEUEA_NAME_30="dead_letter_queuea_name_30"; //死信队列B public static final String DEAD_LETTER_QUEUEB_NAME_30="dead_letter_queueb_name_30";
2. 声明延时交换机以及死信交换机(其实就是普通的交换机)
/** * 声明延时交换机 fanout * @return */ @Bean(DELAY_EXCHANGE_NAME_30) public Exchange DELAY_EXCHANGE_NAME_30(){ return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME_30).durable(true).build(); } /** * 声明死信交换机 direct * @return */ @Bean(DEAD_LETTER_EXCHANGE_NAME_30) public Exchange DEAD_LETTER_EXCHANGE_NAME_30(){ return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME_30).durable(true).build(); }
延时交换机我们设置为fanout,当然也可以是别的
而死信交换机不能设置为fanout,否则一个私信队列能接收到所有延时队列的消息
还不明白的话可以自己测试一下
3. 声明延时队列并指定对应的死信队列以及交换机(等设置延时时间过期之后发送到对应绑定的死信队列)
/** * 声明延时队列A * 并绑定死信交换机 和 死信队列A 根据路由key * @return */ @Bean(DELAY_QUEUEA_NAME_30) public Queue DELAY_QUEUEA_NAME_30(){ Map map=new HashMap(); map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30); map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_NAME_30); //设置此队列延时时间 6秒 map.put("x-message-ttl",6000); return QueueBuilder.durable(DELAY_QUEUEA_NAME_30).withArguments(map).build(); } /** * 声明延时队列B * 并绑定死信交换机 和 死信队列B 根据路由key * @return */ @Bean(DELAY_QUEUEB_NAME_30) public Queue DELAY_QUEUEB_NAME_30(){ Map map=new HashMap(); map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME_30); map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_NAME_30); //设置此队列延时时间 12秒 map.put("x-message-ttl",12000); return QueueBuilder.durable(DELAY_QUEUEB_NAME_30).withArguments(map).build(); }
map参数解释
x-dead-letter-exchange 绑定的死信交换机
x-dead-letter-routing-key 死信队列的路由key
x-message-ttl 延时时间,也就是过期时间 毫秒单位
声明队列进行绑定,druable参数是声明队列的名称
4. 声明死信队列 (就是正常队列)
/** * 声明死信队列A * @return */ @Bean(DEAD_LETTER_QUEUEA_NAME_30) public Queue DEAD_LETTER_QUEUEA_NAME_30(){ return new Queue(DEAD_LETTER_QUEUEA_NAME_30); } /** * 声明死信队列B * @return */ @Bean(DEAD_LETTER_QUEUEB_NAME_30) public Queue DEAD_LETTER_QUEUEB_NAME_30(){ return new Queue(DEAD_LETTER_QUEUEB_NAME_30); }
5. 延时队列绑定交换机
/** * 延时队列A绑定交换机 * @param queue * @param exchange * @return */ @Bean public Binding delayQueueABinding( @Qualifier(DELAY_QUEUEA_NAME_30)Queue queue, @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange ){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); } /** * 延时队列B绑定交换机 * @param queue * @param exchange * @return */ @Bean public Binding delayQueueBBinding( @Qualifier(DELAY_QUEUEB_NAME_30)Queue queue, @Qualifier(DELAY_EXCHANGE_NAME_30)Exchange exchange ){ return BindingBuilder.bind(queue).to(exchange).with("").noargs(); }
. 因为延时交换机是fanout类型的所以路由key可以不用设置,设置了也不会用到
6. 死信队列绑定交换机
/** * 死信队列A绑定交换机 * @param queue * @param exchange * @return */ @Bean public Binding deadLetterQueueABinding( @Qualifier(DEAD_LETTER_QUEUEA_NAME_30)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange ){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_NAME_30).noargs(); } /** * 死信队列B绑定交换机 * @param queue * @param exchange * @return */ @Bean public Binding deadLetterQueueBBinding( @Qualifier(DEAD_LETTER_QUEUEB_NAME_30)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME_30)Exchange exchange ){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_NAME_30).noargs(); }
.死信交换机类型是direct,我们要指定路由key,在我们声明延时队列已经指定相应的路由key了,所以这里要和声明延时队列指定的路由key要一致哦
7. 绑定参数详解
方法两个参数,队列以及交换机
@Qualifier(name)里就是我们声明@Bean(name)的队列,实现注入,交换机也一样
返回值里的 bind参数就是我们要绑定的队列
to就是绑定到哪个交换机
with就是路由key
生产者发送消息
1. 生产者发送消息(我们在测试类写)
具体看自己业务的需求写在哪里
@Resource private RabbitTemplate rabbitTemplate; @Test public void test1(){ Map map=new HashMap(); map.put("name","张三"); map.put("age","18"); //第一个参数发送给哪个交换机 第二个路由key 我们延时交换机是fanout所以路由key为空 第三个发送对象 rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME_30,"",map); }
注入对应模板对象,依赖里面的
conVerAndSend发送消息
消费者监听死信队列
1. 监听死信队列
@Component public class RabbitmqDeadLetterListener { /** * 监听死信队列A * @param map * @param message * @param channel */ @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEA_NAME_30}) public void receiverA(Map map, Message message, Channel channel) throws IOException { System.out.println("死信队列A接收到了消息--------"); System.out.println(map); //接收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } /** * 监听死信队列B * @param map * @param message * @param channel */ @RabbitListener(queues = {RabbitmqConfig.DEAD_LETTER_QUEUEB_NAME_30}) public void receiverB(Map map, Message message, Channel channel) throws IOException { System.out.println("死信队列B接收到了消息--------"); System.out.println(map); //接收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
@RabbitListener注解里面的参数就是监听的队列,这里我们是直接拿配置类里面的使用的
receiverA里面的第一个参数就是我们发送消息的类型,可以直接输出打印
第二个参数也包含发送的对象,获得的是一个字节数组,需要转型,也包含了发送消息的id标识等
第三个就是通道
-----------------↓↓↓↓↓↓↓-----------------
写完是个延时队列的功能,当然也可以变成死信队列…
使用场景
1. 死信队列使用场景
一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息(没错,以前就是这么干的= =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。
2. 延时队列使用场景
1.订单在十分钟之内未支付则自动取消。
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.账单在一周内未支付,则自动结算。
4.用户注册成功后,如果三天内没有登陆则进行短信提醒。
5.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
6.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
结束OVER
欢迎各路神人看完的感受或者有差的地方多多评论