概述
本文主要介绍如何通过 Spring boot 连接阿里云AMQP服务。
操作步骤
1、示例程序下载,下载地址。
2、参数配置,AMQP管理控制台获取。
- resources -> application.properties
spring.application.name=rabbitmq-demo
spring.rabbitmq.host=18********617278.mq-amqp.cn-hangzhou-a.aliyuncs.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=******
spring.rabbitmq.password=******
spring.rabbitmq.virtual-host=******
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
- RabbitConfig -> RESOURCE_OWNER_ID
private static final long RESOURCE_OWNER_ID =18********617278L;//资源owner账户 ID 信息
3、代码调整(默认demo仅做了发送未被路由的消息的测试,为了测试相对完整,调整了部分代码及注释,方便进一步理解,也可以直接跳过此步骤直接运行测试即可)
- SenderWithCallback.class
package com.alibaba.rabbit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.UUID;
@Component
public class SenderWithCallback {
Logger log= LoggerFactory.getLogger(SenderWithCallback.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 设置生产者消息确认
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
}
public void send() {
String exchange = "exchange-rabbit-springboot-advance5";
String routingKey = "product";
String unRoutingKey = "norProduct";
//1.发送一条未被路由的消息 触发returncallback、ConfirmCallback
String message = LocalDateTime.now().toString() + "发送一条消息.";
rabbitTemplate.convertAndSend(exchange, unRoutingKey, message, new CorrelationData("unRouting-" + UUID.randomUUID().toString()));
log.info("发送一条消息,exchange:[{}],routingKey:[{}],message:[{}]", exchange, unRoutingKey, message);
//2.发送一条路由的消息 触发ConfirmCallback
rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData("Routing-" + UUID.randomUUID().toString()));
log.info("发送一条消息,exchange:[{}],routingKey:[{}],message:[{}]", exchange, routingKey, message);
//3.直接向queue中发送测试 供监听消费测试
rabbitTemplate.convertAndSend("queue", "test queue message.");
}
}
- RabbitReturnCallback.class
package com.alibaba.rabbit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 设置 ReturnCallback 回调
* 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调 在ConfirmCallback之前执行
*/
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
Logger log= LoggerFactory.getLogger(RabbitReturnCallback.class);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("message:"+message+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
}
}
- RabbitConfirmCallback.class
package com.alibaba.rabbit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。
* 这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
* Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。
*/
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
Logger log= LoggerFactory.getLogger(RabbitConfirmCallback.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.error("correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); //ack结果为true,表明正常接收到了消息
}
}
测试运行
- 启动程序浏览器请求send操作: http://localhost:8080/test/send
- 运行效果
019-06-07 12:42:43.693 INFO 2752 --- [nio-8080-exec-1] com.alibaba.rabbit.SenderWithCallback : 发送一条消息,exchange:[exchange-rabbit-springboot-advance5],routingKey:[norProduct],message:[2019-06-07T12:42:43.612发送一条消息.]
2019-06-07 12:42:43.727 ERROR 2752 --- [124.156.22:5672] com.alibaba.rabbit.RabbitReturnCallback : message:(Body:'2019-06-07T12:42:43.612发送一条消息.' MessageProperties [headers={spring_returned_message_correlation=unRouting-75d1580a-c93d-4d9c-bf06-2ed05ad8e464}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),replyCode:312,replyText:NO_ROUTE,exchange:exchange-rabbit-springboot-advance5,routingKey:norProduct
2019-06-07 12:42:43.728 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:CorrelationData [id=unRouting-75d1580a-c93d-4d9c-bf06-2ed05ad8e464],ack:true,cause:null
2019-06-07 12:42:43.753 INFO 2752 --- [nio-8080-exec-1] com.alibaba.rabbit.SenderWithCallback : 发送一条消息,exchange:[exchange-rabbit-springboot-advance5],routingKey:[product],message:[2019-06-07T12:42:43.612发送一条消息.]
2019-06-07 12:42:43.891 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:CorrelationData [id=Routing-9ee37a35-0f8e-4a0d-b00b-2fc0fab574b4],ack:true,cause:null
2019-06-07 12:42:43.892 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:null,ack:true,cause:null
Receiver : test queue message.