maven依赖:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
rabbitmq yml文件配置:
#配置rabbitMq 服务器 rabbitmq: host: IP地址 port: 5672 username: guest password: guest #确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
消息的生产者:
package com.sinochem.agency.rabbitmq.provider; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @author : jiagang * @date : Created in 2021/9/26 14:03 */ @RestController public class SendMessageTestController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "hello word"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //将消息携带绑定键值:testRouting 发送到交换机directExchange rabbitTemplate.convertAndSend("directExchange", "testRouting", map); return "success"; } }
消息的消费者端:
package com.sinochem.agency.rabbitmq.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** * @author : jiagang * @date : Created in 2021/9/26 14:58 */ @Component @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("directExchange"), // 交换机(直连) key = "testRouting", // 路由key value = @Queue("testQueue"))) //监听队列的名称 public class TestReceiver{ @RabbitHandler public void process(@Payload Map message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception{ System.out.println("testReceiver消费者收到消息 : " + message.toString()); channel.basicAck(deliveryTag,true); } }
发送方回调配置文件:
package com.sinochem.agency.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbit 消息确认回调 * @author : jiagang * @date : Created in 2021/9/26 15:14 */ @Configuration public class RabbitConfirmCallbackConfig { @Autowired private RabbitTemplate rabbitTemplate; @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回应码:"+replyCode); System.out.println("ReturnCallback: "+"回应信息:"+replyText); System.out.println("ReturnCallback: "+"交换机:"+exchange); System.out.println("ReturnCallback: "+"路由键:"+routingKey); } }); return rabbitTemplate; } }
消费方配置文件(手动ack确认):
package com.sinochem.agency.rabbitmq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbit 消费者 消息确认机制配置 * @author : jiagang * @date : Created in 2021/9/26 15:26 */ @Configuration public class MessageListenerConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
文章持续更新,可以关注下方公众号或者微信搜一搜「 最后一支迷迭香 」第一时间阅读,获取更完整的链路资料。