库
spring-boot-starter-amqp
注解
@RabbitListener
@RabbitHandler
代码
配置
RabbitMqConfig
/** * 消息队列配置 * <p> * http://47.94.169.13:15675/#/ * <p> * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Configuration public class RabbitMqConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 所绑定的交换机 * 订单延迟队列队列 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键 .build(); } /** * 将订单队列绑定到交换机 */ @Bean Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) { return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) { return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } //https://www.jianshu.com/p/2c5eebfd0e95 // @Bean // public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConnectionFactory(connectionFactory); // factory.setMessageConverter(new Jackson2JsonMessageConverter()); // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack // return factory; // } // @Bean // public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { // SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); // container.setConnectionFactory(connectionFactory); // container.setQueueNames("consumer_queue"); // 监听的队列 // container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息 // container.setMessageListener((MessageListener) (message) -> { // System.out.println("====接收到消息====="); // System.out.println(new String(message.getBody())); // //抛出NullPointerException异常则重新入队列 // //throw new NullPointerException("消息消费失败"); // //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false // //throw new AmqpRejectAndDontRequeueException("消息消费失败"); // //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认 // throw new ImmediateAcknowledgeAmqpException("消息消费失败"); // }); // return container; // } }
发送类
TestSendMessage
package com.ityu.studystreamrmq.rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.util.UUID; @Component @Slf4j public class TestSendMessage implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private final RabbitTemplate rabbitTemplate; public TestSendMessage(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 rabbitTemplate.setReturnCallback(this::returnedMessage); rabbitTemplate.setMandatory(true); } public void sendMsg(String exchange, String routkey, Object content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange, routkey, content, correlationId); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info(" 消息确认的id: " + correlationData); if (ack) { log.info("消息发送成功"); //发送成功 删除本地数据库存的消息 } else { log.info("消息发送失败:id " + correlationData + "消息发送失败的原因" + cause); // 根据本地消息的状态为失败,可以用定时任务去处理数据 } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage [消息从交换机到队列失败] message:" + message); } }
监听
package com.ityu.studystreamrmq.rabbitmq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = "ityu.order.cancel") @Slf4j public class Customer1 { @RabbitHandler public void handle(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println(System.currentTimeMillis()); System.out.println(message); // try { // /** // * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据 // * 1、有数据则不消费,直接应答处理 // * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答 // * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能) // */ // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // } catch (IOException e) { // e.printStackTrace(); // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // } } }
测试发送
MyTest
package com.ityu.studystreamrmq; import com.ityu.studystreamrmq.rabbitmq.QueueEnum; import com.ityu.studystreamrmq.rabbitmq.TestSendMessage; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class MyTest { @Autowired private TestSendMessage amqpTemplate; @Autowired private RabbitTemplate amqpTemplate2; @Test public void sendMsg() { amqpTemplate.sendMsg(QueueEnum.QUEUE_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_CANCEL.getName(), "你好啊不错哦"); } @Test public void sendMsg2() { System.out.println(System.currentTimeMillis()); amqpTemplate2.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), "我是延迟消息你能发现不50000", message -> { message.getMessageProperties().setExpiration("500000"); return message; }); } }