如果不想用通用的rabbitmq配置 那么就可以使用SimpleMessageListener自定义消费者
在此队列中 配置为此方法中的配置 而不是走rabbitmq的通用配置
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import org.springframework.context.annotation.Configuration; @Configuration public class SimpleMessageListener { @Autowired BusinessConfig businessConfig; @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 添加队列 可以添加多个队列 //container.setQueues(businessConfig.Queue()); container.setExposeListenerChannel(true); // 设置当前的消费者数量 // container.setMaxConcurrentConsumers(5); container.setConcurrentConsumers(1); // 设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("1 receive msg : " + JSONObject.parseObject(new String(body))); // 不读取消息并且将当前消息抛弃掉,消息队列中删除当前消息 // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 不读取消息,消息队列中保留当前消息未被查看状态 // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 确认消息成功消费,删除消息队列中的消息 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } }); return container; } @Bean public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 监听多个queue //container.setQueues(businessConfig.Queue()); // 设置当前消费者数量 container.setConcurrentConsumers(1); // 设置最大的消费者数量 container.setMaxConcurrentConsumers(5); // 设置不要重回队列 container.setDefaultRequeueRejected(false); // 设置自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置消费端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 设置监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息处理 String msg = new String(message.getBody(), "UTF-8"); System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);; } }); return container; } }