使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写。
- 监听多个队列
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // container.setQueueNames 接收的是一个字符串数组对象 container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info"); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; }
- 运行时动态添加监听队列
@ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.error"); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.debug"); TimeUnit.SECONDS.sleep(20); context.close(); } }
- SimpleMessageListenerContainer纳入容器
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.debug"); container.setMessageListener((MessageListener) message -> { if("zhihao.debug".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.error".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.info".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; }
- 运行时动态移除监听队列
// 运行时动态的移除监听队列 container.removeQueueNames("zhihao.debug");
- 后置处理器
增加后置处理
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //后置处理器,接收到的消息都添加了Header请求头 container.setAfterReceivePostProcessors(message -> { message.getMessageProperties().getHeaders().put("desc",10); return message; }); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; }
- 启动
@ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); System.out.println(container.getQueueNames()[0]); TimeUnit.SECONDS.sleep(30); context.close(); } }
- 控制台打印
====接收到消息===== MessageProperties [headers={desc=10}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=zhihao.miao.order, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-2xCE8upxgGgf-u1haCwt6A, consumerQueue=zhihao.miao.order] 消息2
setAfterReceivePostProcessors 方法可以对消息进行后置处理
- 设置消费者的Consumer_tag和Arguments
int count=0; @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //设置消费者的consumerTag_tag container.setConsumerTagStrategy(queue -> "order_queue_"+(++count)); //设置消费者的Arguments Map<String, Object> args = new HashMap<>(); args.put("module","订单模块"); args.put("fun","发送消息"); container.setConsumerArguments(args); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; }
container.setConsumerTagStrategy
可以设置消费者的Consumer_tag
,container.setConsumerArguments
可以设置消费者的Arguments
- setConcurrentConsumers设置并发消费者
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(10); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; }
启动
@ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); container.setConcurrentConsumers(7); TimeUnit.SECONDS.sleep(30); context.close(); } }
setConcurrentConsumers
设置多个并发消费者一起消费,并支持运行时动态修改。setMaxConcurrentConsumers
设置最多的并发消费者。