通过上文可以使用rabbitmq了,但是有这样一个特殊需求,想不使用通用的配置,而是该队列使用特殊的配置,那么就可以使用SimpleMessageListener,来自定义消费者。
1.代码实现
可以看到下文两个方法配置了两种应答方式,方法一为手动应答,方法二为自动应答。
1.手动应答SimpleMessageListener
如正规方式一样,使用手动应答,需要在接收到消息后手动发送ack/nack。核心代码如下:container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import org.springframework.context.annotation.Configuration; @Configuration public class SimpleMessageListener { @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 添加队列 可以添加多个队列 container.setExposeListenerChannel(true); // 设置当前的消费者数量 // container.setMaxConcurrentConsumers(5); container.setConcurrentConsumers(1); // 设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("1 receive msg : " + JSONObject.parseObject(new String(body))); // 确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } }); return container; } }
2.自动应答SimpleMessageListener
该方法会直接应答,核心代码如下:container.setAcknowledgeMode(AcknowledgeMode.AUTO);
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import org.springframework.context.annotation.Configuration; @Configuration public class SimpleMessageListener { @Bean public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 设置当前消费者数量 container.setConcurrentConsumers(1); // 设置最大的消费者数量 container.setMaxConcurrentConsumers(5); // 设置不要重回队列 container.setDefaultRequeueRejected(false); // 设置自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置消费端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 设置监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息处理 String msg = new String(message.getBody(), "UTF-8"); System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);; } }); return container; } }