1.SpringBoot集成RabbitMQ
1.1 依赖及配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: # 用于接收设备发送的数据 rabbitmq: host: xxx.xx.xxx.xxx port: 5672 username: guest password: guest mq-name: test # 确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated # 确认消息已发送到队列 publisher-returns: true
1.2 消息监听与发送
- 数据获取
@Component @Slf4j public class RabbitMessageQueueReceiver { @Autowired private ConfigProperties configProperties; @Autowired private AsyncConfig asyncConfig; @Autowired private DataGsmEquipComparisonManager dataGsmEquipComparisonManager; @RabbitListener(queuesToDeclare = {@Queue(name = "${spring.rabbitmq.mq-name}", durable = "true")}, ackMode = "MANUAL") @RabbitHandler() public void receive(String msg, Channel channel, Message message) throws IOException, InterruptedException { // 获取消息体 String jsonString = new String(message.getBody()); // 处理数据格式 Map<String, Object> dataMap = dealMessageData(jsonString); try { asyncConfig.taskExecutor().execute(() -> { // 根据数据类型处理消息【这里大家根据实际情况进行处理】 DealMessageByType.getInstance().dispose(dataMap); }); channel.basicQos(5); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e) { e.printStackTrace(); log.error("error message:" + jsonString); try { channel.basicQos(5); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e1) { e1.printStackTrace(); } } }
- 数据发送
@Component @Log4j public class RabbitMessageQueueSender { public RabbitTemplate rabbitTemplate; public boolean sendMessage(String exchange, String routingKey, String message) { try { rabbitTemplate.convertAndSend(exchange, routingKey, message); } catch (Exception e) { e.printStackTrace(); return false; } return true; } }
- 确认机制(消息发送到服务回调)
@Component @Slf4j public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { private RabbitTemplate rabbitTemplate; @PostConstruct public void run() { if (rabbitTemplate != null) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String failCause) { if (ack) { log.info("消息发送成功"); } else { log.info("消息发送失败,进行容错处理"); } log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause); } @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned); } }
2.设置RabbitMQ启动总开关
SpringBoot 项目集成了 RabbitMQ 但是有时候又用不到它,比如说:
- 开发跟 RabbitMQ 服务无关接口时,此时 MQ 服务如果未启动,会有报错信息不断打印出来。
- 不同的用户部署时,有可能用不到 RabbitMQ,此时没有部署 MQ,启动项目时不能报错。
核心报错信息:
WARN o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed Caused by: java.net.ConnectException: Connection refused: connect
详细报错信息:
[2023-03-16 11:18:11.456] traceId= [RMI TCP Connection(8)-xxx.xxx.xx.xxx] WARN o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61) at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725) at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:252) at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173) at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146) at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2126) at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49) at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44) at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130) at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74) at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468) at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829) at sun.reflect.GeneratedMethodAccessor212.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) at sun.rmi.transport.Transport$1.run(Transport.java:200) at sun.rmi.transport.Transport$1.run(Transport.java:197) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:196) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at java.net.Socket.connect(Socket.java:606) at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173) at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640) at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615) at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565) ... 50 common frames omitted
2.1 总开关配置
添加spring.rabbitmq.enable
配置作为总开关:
spring: # 用于接收设备发送的数据 rabbitmq: # rabbitmq 的自定义配置 enable 用于开启或关闭 rabbitmq 服务(false关闭,true开启) enable: true host: 172.81.205.216 port: 5672 username: guest password: guest mq-name: ZRTZ_QUEUE_EFENCE_DEVICE_OBTAIN_STATUS # 确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated # 确认消息已发送到队列 publisher-returns: true
2.2 关闭自动配置
@EnableRabbit @SpringBootApplication(exclude = {RabbitAutoConfiguration.class}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
2.3 根据开关进行配置
/** * 用于管理 RabbitAutoConfiguration 是否配置 */ @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true") public class RabbitMessageQueueEnableAutoConfig extends RabbitAutoConfiguration { }
2.4 消息监听与发送
- 监听开关
@Component @Slf4j @Data public class RabbitMessageQueueReceiverConfig { @Value("${spring.rabbitmq.enable}") private boolean enable; @Bean public RabbitMessageQueueReceiver initRabbitMessageQueueReceiver() { if (enable) { RabbitMessageQueueReceiver rabbitMessageQueueReceiver = new RabbitMessageQueueReceiver(); log.info("【------已启用------】RabbitMessageQueueReceiver"); return rabbitMessageQueueReceiver; } else { log.info("【------不启用------】RabbitMessageQueueReceiver"); return null; } } } // 监听代码【去掉@Component】 // @Component @Log4j public class RabbitMessageQueueSender { public RabbitTemplate rabbitTemplate; public boolean sendMessage(String exchange, String routingKey, String message) { try { rabbitTemplate.convertAndSend(exchange, routingKey, message); } catch (Exception e) { e.printStackTrace(); return false; } return true; } }
- 消息发送及回调【添加 (required = false) 防止接口被调用出错】
// 消息发送 @Component @Log4j public class RabbitMessageQueueSender { @Autowired(required = false) public RabbitTemplate rabbitTemplate; public boolean sendMessage(String exchange, String routingKey, String message) { try { if (rabbitTemplate != null) { rabbitTemplate.convertAndSend(exchange, routingKey, message); } else { return false; } } catch (Exception e) { e.printStackTrace(); return false; } return true; } } // 发送回调 @Component @Slf4j public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired(required = false) private RabbitTemplate rabbitTemplate; @PostConstruct public void run() { if (rabbitTemplate != null) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String failCause) { if (ack) { log.info("消息发送成功"); } else { log.info("消息发送失败,进行容错处理"); } log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause); } @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned); } }
3.总结
- 关闭自动配置。
- 根据自定义的标志进行bean对象装配。
- 防止未装配导致的报错。