Rabbitmq direct模式保证一个队列只对应一个消费者

简介: Rabbitmq direct模式保证一个队列只对应一个消费者


  1. rabbitMQ生产者生产的消息是有序进入任务队列的;但多个消费者的情况下无论是否ack,都是无序的,不考虑任务时长;
  2. rabbitMQ分发消息的时候采用round-robin模式,依次分配,并非一个一个分配;
  3. rabbitMQ为防止重复消费,必须实现幂等型,即每个消费者必须能够查询到任务的执行状态
  4. rabbitMQ要想顺序消费,必须一个任务队列只有一个消费者,必要时需要拆分任务队列
  5. rabbitMQ防止消息丢失,必须把交换机和任务队列以及对应的类型消息进行持久化
  6. rabbitMQ通过设置每个消费者同时处理消息的最大个数,来进行负载均衡
  7. rabbitMQ通过设置消息的存活时间(TTL),超时后,就会被发送到队列的死信交换机,被再次路由,此时再次路由到的队列就被称为死信队列(Dead Letter Queue)。需要注意,死信交换机和死信交换机都是基于其用途来描述的,它们实际上也是普通的交换机和普通的队列。如果队列没有指定DLX或者无法被路由到一个DLQ,则队列中过期的消息会被直接丢弃(特殊场景,订单15分钟后未付款,就关闭)
  8. rabbitMQ消息分发有轮训模式和公平模式,对于金融行情的使用需要用轮训模式,保证每个消费者消费的数据都一样
  9. rabbitMQ在connection上面抽象出来很多channel
  10. RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储

RabbitMQ提供了四种Exchange:fanout,direct,topic,header

header模式在实际使用中较少

性能排序:fanout > direct >> topic。比例大约为11:10:6

一 、消息分发

官网的示例中介绍了默认情况下rabbitMqRabbitMQ会一个一个的发送信息给下一个消费者(consumer),而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。

二、消息应答-保证消息被正确接受并处理

默认情况下消费者C1接收到消息1无论是否正常接受和处理都会立即应答rabbit服务器,然后消息1就会从队列中被删除,假如C1突然出现异常状况导致消息1没有被处理完毕,那么消息1就处理失败了,也不会有其他消费者去处理消息1。事实上我们希望的是消息1如果没有被C1正确处理完毕,那么就发送给其他消费者处理,为了达到这个目的,只需要做两件事情,第一关闭rabbitMq的自动应答机制,第二消费者正确处理完消息后手动应答。

//第二个参数autoAck设置成false表示关闭自动应答
channel.basicConsume(QUEUE_NAME, false, consumer);  
while (true)
{  
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(hashCode + " [x] Received '" + message + "'");
    doWork(message);
    System.out.println(hashCode + " [x] Done");
    //手动应答,第二个参数multiple表示是否批量应答,很明显现在不是批量应答
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false );
}  

三、消费者负载均衡

默认情况下rabitMq会把队列里面的消息立即发送到消费者,无论该消费者有多少消息没有应答,也就是说即使发现消费者来不及处理,新的消费者加入进来也没有办法处理已经堆积的消息,因为那些消息已经被发送给老消费者了。

chanel.basicQos(prefetchCount)

prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。

这样做的好处是,如果系统处于高峰期,消费者来不及处理,消息会堆积在队列中,新启动的消费者可以马上从队列中取到消息开始工作。

代码

每个队列与交换机都绑定了一个key,为BindingKey,此时我们模拟用户下单,订单创建成功后,只对用户发送 qq 和 email 邮件提醒

  • 注解方式配置格式
// 声明队列并绑定到指定交换机
bindings = @QueueBinding(
     value = @Queue("声明队列的属性信息"),
      exchange = @Exchange("声明交换机和属性信息"),
     key = "绑定的BindingKey"
  )
  1. 消费者 DirectEmailConsumer
/**
 *  @QueueBinding (队列,交换机,交换机与队列的BindingKey)
 *     声明创建队列 email_direct_Queue
 *     声明创建交换机direct_order_exchange
 *     绑定交换机与队列的关系,BindingKey = “email”
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false")
        ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false")
        ,key = "email"
))
public class DirectEmailConsumer {
    // 接收消息
    @RabbitHandler
    public void receiveMess(String message){
        System.out.println("EmailConsumer direct 接收到订单消息————>"+message);
    }
}
  1. 消费者 DirectQqConsumer 类
    代码与上述一致,只是创建的队列和交换机绑定的key不一样
/** 
 *  @QueueBinding (队列,交换机,交换机与队列的BindingKey)
 *     声明创建队列 qq_direct_Queue
 *     声明创建交换机direct_order_exchange(不存在则创建,不会多次创建)
 *     绑定交换机与队列的关系,BindingKey = “qq”
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "qq_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false")
        ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false")
        ,key = "qq"
))
public class DirectQqConsumer {
    // 接收消息
    @RabbitHandler
    public void receiveMess(String message){
        System.out.println("QqConsumer direct 接收到订单消息————>"+message);
    }
}
  1. 消费者 DirectSmsConsumer 类
    代码与上述一致,只是创建的队列和交换机绑定的key不一样
/**
 *  @QueueBinding (队列,交换机,交换机与队列的BindingKey)
 *     声明创建队列 sms_direct_Queue
 *     声明创建交换机direct_order_exchange(不存在则创建,不会多次创建)
 *     绑定交换机与队列的关系,BindingKey = “sms”
 )
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false")
        ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false")
        ,key = "sms"
))
public class DirectSmsConsumer {
    // 接收消息
    @RabbitHandler
    public void receiveMess(String message){
        System.out.println("SmsConsumer direct 接收到订单消息————>"+message);
    }
}
  1. 运行主程序,开启消费者监听

    图形化界面查看队列创建信息
  2. 生产者 Producer
    声明交换机、队列等信息
@Configuration
public class RabbitMQConfiguration {
    // 1.声明fanout广播模式的交换机
    @Bean
    public FanoutExchange getExchange(){
        /**
         * @params1 :交换机名称
         * @params2 :是否持久化
         * @params4 :是否自动删除
         */
        return new FanoutExchange("fanout_order_exchange",true,false);
    }
    // 2.声明三个队列队列:emailQueue、smsQueue、qqQueue
    @Bean
    public Queue getEmailQueue(){
        /**
         * @params1 :队列名称
         * @params2 :队列是否持久化(如果是,则重启服务不会丢失)
         * @params3 :是否是独占队列(如果是,则仅限于此连接)
         * @params4 :是否自动删除(最后一条消息消费完毕,队列是否自动删除)
         */
        return new Queue("email_fanout_Queue",true,false,false);
    }
    @Bean
    public Queue getSMSQueue(){
        return new Queue("sms_fanout_Queue",true,false,false);
    }
    @Bean
    public Queue getQqQueue(){
        return new Queue("qq_fanout_Queue",true,false,false);
    }
    // 3.绑定交换机与队列的关系
    @Bean
    public Binding getEmailBinding(){
        return BindingBuilder.bind(getEmailQueue()).to(getExchange());
    }
    @Bean
    public Binding getSMSBinding(){
        return BindingBuilder.bind(getSMSQueue()).to(getExchange());
    }
    @Bean
    public Binding getQQBinding(){
        return BindingBuilder.bind(getQqQueue()).to(getExchange());
    }
}
  1. 创建订单服务,模拟下单
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate template;
    /**
     * 模拟用户创建订单
     * @param userId  客户ID
     * @param productId 产品ID
     * @param num 数量
     */
    public void createOrder(String userId, String productId, int num){
        // 1.根据商品ID查询库存是否充足
        // 2.生成订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功....");
        // 3.将订单id封装成MQ消息,投递到交换机
        /**@params1 :交换机名称
         * @params2 :路由key/队列名称
         * @params3 :消息内容
         * 注:指定RoutingKey=qq和email
         * 交换机direct_order_exchange与绑定的队列的BindingKey匹配的队列才会接收到
         */
        template.convertAndSend("direct_order_exchange","qq",orderId);
        template.convertAndSend("direct_order_exchange","email",orderId);
    }
}
  1. 测试类进行测试
@SpringBootTest
class RabbitOrderSpringbootProducerApplicationTests {
    @Autowired
    private OrderService orderService;
    @Test
    void contextLoads() {
        orderService.createOrder("1001","96",1);
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
6天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
38 3
|
3月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
229 6
|
4月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
321 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
4月前
|
消息中间件 JSON Java
|
5月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
146 2
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
86 2
|
4月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
107 0