如未看第一章的建议先看第一章基本用法:SpringBoot使用RabbitMQ,这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。
一、yml文件配置
server:
port: 8080
spring:
application:
name: rabbit-confirm
rabbitmq:
template:
# 使用return-callback时必须设置mandatory为true
mandatory: true
# 消息发送到交换机确认机制,是否确认回调
publisher-confirms: true
# 消息发送到交换机确认机制,是否返回回调
publisher-returns: true
listener:
simple:
# 并发消费者初始化值
concurrency: 5
# 最大值
max-concurrency: 10
# 每个消费者每次监听时可拉取处理的消息数量
prefetch: 20
# 确认模式设置为手动签收
acknowledge-mode: manual
二、定义配置类
/**
* @author Gjing
**/
@Configuration
public class ConfirmConfiguration {
/**
* 声明confirm.message队列
*/
@Bean
public Queue confirmQueue() {
return new Queue("confirm.message");
}
/**
* 声明一个名为exchange-2的交换机
*/
@Bean
public TopicExchange exchange2() {
return new TopicExchange("exchange-2");
}
/**
* 将confirm.message的队列绑定到exchange-2交换机
*/
@Bean
public Binding bindMessage1() {
return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
}
}
三、定义生产者
/**
* @author Gjing
**/
@Component
@Slf4j
public class ConfirmProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败:correlationData: {},cause: {}", correlationData, cause);
}else {
log.info("消息发送成功:correlationData: {},ack: {}", correlationData, ack);
}
};
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message) {
// 构建回调返回的数据
CorrelationData correlationData = new CorrelationData();
correlationData.setId(TimeUtil.localDateTimeToStamp(LocalDateTime.now()) + "");
Message message1 = MessageBuilder.withBody(message.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
// 将CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,然后人工处理
.setCorrelationId(correlationData.getId())
.build();
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
}
}
四、定义消费者
/**
* @author Gjing
**/
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true")
,exchange = @Exchange(value = "exchange-2",type = "topic")
,key = "confirm.message"))
public void receive(String message, Message message1, Channel channel) throws IOException {
log.info("消费者收到消息:{}", message);
long deliverTag = message1.getMessageProperties().getDeliveryTag();
//第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型
//为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
channel.basicAck(deliverTag, false);
}
}
五、创建controller调用
/**
* @author Gjing
**/
@RestController
public class ConfirmController {
@Resource
private ConfirmProducer confirmProducer;
@PostMapping("/confirm-message")
public void confirmMessage() {
confirmProducer.send("hello confirm message");
}
}
六、执行结果
以上为个人理解,如有误欢迎各位指正