五分钟带你玩转rabbitmq(六)SimpleMessageListener

简介: 五分钟带你玩转rabbitmq(六)SimpleMessageListener


如果不想用通用的rabbitmq配置 那么就可以使用SimpleMessageListener自定义消费者

在此队列中 配置为此方法中的配置 而不是走rabbitmq的通用配置

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SimpleMessageListener {
    @Autowired
    BusinessConfig businessConfig;
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 添加队列 可以添加多个队列
        //container.setQueues(businessConfig.Queue());
        container.setExposeListenerChannel(true);
        // 设置当前的消费者数量
        // container.setMaxConcurrentConsumers(5);
        container.setConcurrentConsumers(1);
        // 设置确认模式手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("1  receive msg : " + JSONObject.parseObject(new String(body)));
                // 不读取消息并且将当前消息抛弃掉,消息队列中删除当前消息
                // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                // 不读取消息,消息队列中保留当前消息未被查看状态
                // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                // 确认消息成功消费,删除消息队列中的消息
                // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                // 确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            }
        });
        return container;
    }
    @Bean
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 监听多个queue
        //container.setQueues(businessConfig.Queue());
        // 设置当前消费者数量
        container.setConcurrentConsumers(1);
        // 设置最大的消费者数量
        container.setMaxConcurrentConsumers(5);
        // 设置不要重回队列
        container.setDefaultRequeueRejected(false);
        // 设置自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 设置消费端tag策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + System.currentTimeMillis();
            }
        });
        // 设置监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                // 消息处理
                String msg = new String(message.getBody(), "UTF-8");
                System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg
                    + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;
            }
        });
        return container;
    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件
RabbitMQ中的SimpleMessageListener
RabbitMQ中的SimpleMessageListener
150 0
RabbitMQ中的SimpleMessageListener
|
13天前
|
消息中间件 RocketMQ Apache
消息队列 MQ产品使用合集之如何修改proxy的端口
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5天前
|
消息中间件 Java 双11
RocketMQ:揭秘电商巨头背后的消息队列秘密
**RocketMQ概览:**高性能分布式消息队列,适用于有序消息、事务处理、流计算、消息推送、日志处理及Binlog分发。在双11等高流量场景下证明了其性能、稳定性和低延迟。Java开发,利于扩展,性能超RabbitMQ,支持死信队列,但可能有集成兼容性问题。适合Java开发者,为电商等场景优化,每秒处理大量消息。
27 3
RocketMQ:揭秘电商巨头背后的消息队列秘密
|
12天前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
12天前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
12天前
|
消息中间件 设计模式 网络安全
消息队列 MQ操作报错合集之broker启用controller配置时,遇到报错,是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
12天前
|
消息中间件 Apache RocketMQ
消息队列 MQ操作报错合集之设置了controller后,有一主一从,但只显示一个,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
12天前
|
消息中间件 测试技术 Apache
消息队列 MQ产品使用合集之在测试环境中拥有大量的topic会有什么影响
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
12天前
|
消息中间件 存储 网络性能优化
消息队列 MQ产品使用合集之一个设备的离线消息的数量限制是多少
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
4天前
|
消息中间件
RabbitMQ是一个功能强大的开源消息代理软件,用于处理消息队列
RabbitMQ是一个功能强大的开源消息代理软件,用于处理消息队列
8 0

热门文章

最新文章