RabbitMQ整合Spring AMQP
添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ配置类,注意:rabbitAdmin与rabbitTemplate方法变量中的ConnectionFactory connectionFactory需要与ConnectionFactory连接工厂的名称保持一致,否则会注入失败。
@Configuration @ComponentScan({"com.bfxy.spring.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("192.168.11.76:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding003() { return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } }
RabbitAdmin使用案例
该类封装了对 RabbitMQ 的管理操作可以用来创建队列,交换机,以及他们之间的绑定
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false)) //直接创建队列 .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系 .with("user.#")); //指定路由Key rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); }
消息模板RabbitTemplate类
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作
该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCalallback、返回值确认接口ReturnCallback等等。 同样我们需要进行注入到Spring容器中,然后直接使
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); } @Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
SimpleMessageListenerContainer简单消息监听器
该类可以监听多个队列,设置事物特性,事物管理器,事物属性,事物容量(并发),回滚消息等
设置消费者标签生成策略,是否独占模式,消费者属性等。
设置具体的监听器、消息转换器等。
配置类中注入
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1);//当前消费者数量 container.setMaxConcurrentConsumers(5);//最大消费者数量 container.setDefaultRequeueRejected(false);//是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO);//自动签收 container.setExposeListenerChannel(true); //消费端标签策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //监听消息 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); } }); }
MessageListenerAdapter消息监听适配器
//1 适配器方式. 默认是有自己的方法名字的:handleMessage // 可以自己指定一个方法的名字: consumeMessage // 也可以添加一个转换器: 从字节数组转换为String MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage");//修改默认方法 adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter);
转换器
public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默认方法, 消息内容:" + new String(messageBody)); } }
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
发送消息测试方法
@Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); }
可以让适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配,MessageDilegate写上对应的方法。使用setQueueOrTagToMethodName方法即可。
//2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setMessageConverter(new TextMessageConverter()); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter);
MessageConverter消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter。
Json转换器: Jackson2JsonMessageConverter: 可以进行java对象的转换功能
DefaultJackson2Java TypeMapper映射器:可以进行java对象的映射关系
自定义二进制转换器:比如图片类型、PDF、PPT、 流媒体
Json转换器:
// 1.1 支持json格式的转换器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
MessagDelegate添加对应的方法
1. public void consumeMessage(Map messageBody) { 2. System.err.println("map方法, 消息内容:" + messageBody); 3. }
测试类,首先创建Order类
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("消息订单"); order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
java对象转换只要添加这两句就可以了
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
测试方法添加
messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");
全局转换器
//全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container;