4.SpringBoot整合Spring-AMPQ
4.1.什么是Spring-AMQP
Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等. 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发. 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter.
4.2.引入AMQP-starter依赖
<!-- 代码库 --> <repositories> <repository> <id>maven-ali</id> <url>http://maven.aliyun.com/nexus/content/groups/public//</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>fail</checksumPolicy> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>public</id> <name>aliyun nexus</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories>
<!--引入AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.3.SpringBoot整合RabbitMQ编码
yml配置文件修改
#消息队列 spring: rabbitmq: host: 10.211.63.14 port: 5672 virtual-host: /dev password: 123456 username: admin
RabbitMQConfig配置类
@Configuration public class RabbitMQConfig{ public static final String EXCHANGE_NAME = "exchange_order"; public static final String QUEUE = "order_queue"; /** * 交换机 * @return */ @Bean public Exchange orderExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(ture).build(); } /** * 队列 * @return */ @Bean public Queue orderQueue(){ return QueueBuilder.durable(QUEUE).build(); } /** * 交换机和队列绑定关系 */ @Bean public Binding orderBinding(Queue queue,Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs(); } }
生产者发送消息
rabbitTemplate.converAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单");
消费者监听消息
@Component @RabbitMQListener(queue = "order_queue") public class OrderMQListener{ /** * RabbitHandler 会自动匹配 消息类型(消息自动确认) * @param msg * @param message * @throws IOException */ @RabbitHandler public void messageHandler(String body,Message message){ long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message.toString()); System.out.println("监听到消息:消息内容:"+message.getBody()); } }
5.RabbitMQ的消息可靠性投递
5.1.什么是RabbitMQ的消息可靠性投递
- 保证mq节点成功接收到消息,消息发送端需要接收到mq服务端接受到消息的确认应答,完善消息补偿机制,发送失败的消息可以二次感知,并进行二次处理。
5.2.RabbitMQ消息投递路径
- 生产者->交换机->队列->消费者
5.3.通过两个点控制消息的可靠性投递
- 生产者到交换机:confirmCallback
- 交换机到队列:returnCallback
**注意:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体的新跟那个效率会变低,吞吐量下降严重,不是非常重要的消息不建议开启消息确认机制
5.4.RabbitMQ消费可靠性投递confirmCallback实战
生产者到交换机
通过confirmCallback
生产者投递消息后,如果Broker收到消费后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种法师是方式可靠性投递的核心。
开启confirmCallback
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发回调 spring.rabbitmq.publisher-confirms=true #新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值事发布消息成功到交换机后触发回调方法 spring.rabbitmq.publisher-confirm-type: correlated
编码实战
核心API:setConfirmCallback() 、confirm(配置,是否接到消息,失败的原因)
@Test void testConfirmCallback(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback======>"); System.out.println("correlationData======>"+correlationData); System.out.println("ack======>"+ack); System.out.println("cause======>"+cause); if(ack){ System.out.println("发送成功"); //更新数据库的状态,状态为成功 }else { System.out.println("发送失败,记录到日志或者数据库"); //更新数据库的状态,状态为失败 } } }); //数据库新增一个消息记录,状态是发送,发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单"); }
**模拟异常:**修改交换机名称
5.5.RabbitMQ消费可靠性投递returnCallback实战
交换机到队列
- 通过returnCallback
- 消息从交换机发送到对应的队列失败时,触发
- 两种模式:
- 交换机到队列不成功,则丢弃消息(默认)
- 交换机到队列不成功,发挥生产者,触发returnCallback
#配置为true,则交换机处理消息到路由失败,会返回给生产者 spring.rabbitmq.template.mandatory=true #或者在temlate对象上设置 template.setMandatory(true);
第一步:开启returnCallback配置
#新版 spring.rabbitmq.publisher-returns=true
第二步:修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败会返回给生产者 spring.rabbitmq.template.mandatory=true
编码实战
@Test void testReturnCallback(){ //publisher-returns为true则交换机处理消息到路由失败,返回给生产者 //mandatory为true则消息未被路由到任何一个queue,则回退一条消息给生产者 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("ReturnedMessage:" + returned.toString()); int replyCode = returned.getReplyCode(); System.out.println("_______________________"); System.out.println("replyCode:" + replyCode); } }); //数据库新增一个消息记录,状态是发送,发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new", "中台收到一条新订单"); } }
5.6.Rabbitmq的消息确机制ACK讲解
背景:消费者从broker中监听消息,需要确保消息被合理的处理掉
RabbitMQ的ACK介绍
- 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
- 消费者在处理消息出现了网络不稳定、服务器异常等现象,按摩就不会有ACK反馈,RabbitMQ回认为i这个消息没有正常消费,会将消息重新放入队列中。
- 只有当消费者正确的发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
- 消息的ACK确认机制默认是打开的,消息如违背进行ACK消息确认机制,这条消息将被锁定Unacked
确认方式
- 自动确认(默认)
- 手动确认(manual)
spring: rabbitmq: #开启手动确认消息,如果消息重新入队,进行重试 listener: simple: acknowledge-mode: manual
5.7.编码实战
@Component @RabbitListener(queues = "order_queue") public class OrderMQListener { /** * 处理器,适配器,加上@RabbitHandler注解 * 加上Channel这个参数 */ @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long tag = message.getMessageProperties().getDeliveryTag(); System.out.println("message:"+message.toString()); System.out.println("=============="); System.out.println("消息标识tag:"+tag); System.out.println("消息体body:"+body); //第一个参数是该消息的index,第二个是是否批量操作 channel.basicAck(tag,false); //第一个参数是index,第二个是否批量,第三个是失败后是否重新返回给生产者重新投递 //channel.basicNack(msgTag,false,true); } }
deliveryTage介绍
- 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
basicNack和basicReject介绍
- basicReject一次只能拒绝接收一个消息,可以设置是否requeue
- basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue