- rabbitMQ生产者生产的消息是有序进入任务队列的;但多个消费者的情况下无论是否ack,都是无序的,不考虑任务时长;
- rabbitMQ分发消息的时候采用round-robin模式,依次分配,并非一个一个分配;
- rabbitMQ为防止重复消费,必须实现幂等型,即每个消费者必须能够查询到任务的执行状态
- rabbitMQ要想顺序消费,必须一个任务队列只有一个消费者,必要时需要拆分任务队列
- rabbitMQ防止消息丢失,必须把交换机和任务队列以及对应的类型消息进行持久化
- rabbitMQ通过设置每个消费者同时处理消息的最大个数,来进行负载均衡
- rabbitMQ通过设置消息的存活时间(TTL),超时后,就会被发送到队列的死信交换机,被再次路由,此时再次路由到的队列就被称为死信队列(Dead Letter Queue)。需要注意,死信交换机和死信交换机都是基于其用途来描述的,它们实际上也是普通的交换机和普通的队列。如果队列没有指定DLX或者无法被路由到一个DLQ,则队列中过期的消息会被直接丢弃(特殊场景,订单15分钟后未付款,就关闭)
- rabbitMQ消息分发有轮训模式和公平模式,对于金融行情的使用需要用轮训模式,保证每个消费者消费的数据都一样
- rabbitMQ在connection上面抽象出来很多channel
- RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchange:fanout,direct,topic,header
header模式在实际使用中较少
性能排序:fanout > direct >> topic。比例大约为11:10:6
一 、消息分发
官网的示例中介绍了默认情况下rabbitMqRabbitMQ会一个一个的发送信息给下一个消费者(consumer),而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。
二、消息应答-保证消息被正确接受并处理
默认情况下消费者C1接收到消息1无论是否正常接受和处理都会立即应答rabbit服务器,然后消息1就会从队列中被删除,假如C1突然出现异常状况导致消息1没有被处理完毕,那么消息1就处理失败了,也不会有其他消费者去处理消息1。事实上我们希望的是消息1如果没有被C1正确处理完毕,那么就发送给其他消费者处理,为了达到这个目的,只需要做两件事情,第一关闭rabbitMq的自动应答机制,第二消费者正确处理完消息后手动应答。
//第二个参数autoAck设置成false表示关闭自动应答 channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(hashCode + " [x] Received '" + message + "'"); doWork(message); System.out.println(hashCode + " [x] Done"); //手动应答,第二个参数multiple表示是否批量应答,很明显现在不是批量应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false ); }
三、消费者负载均衡
默认情况下rabitMq会把队列里面的消息立即发送到消费者,无论该消费者有多少消息没有应答,也就是说即使发现消费者来不及处理,新的消费者加入进来也没有办法处理已经堆积的消息,因为那些消息已经被发送给老消费者了。
chanel.basicQos(prefetchCount)
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。
这样做的好处是,如果系统处于高峰期,消费者来不及处理,消息会堆积在队列中,新启动的消费者可以马上从队列中取到消息开始工作。
代码
每个队列与交换机都绑定了一个key,为BindingKey,此时我们模拟用户下单,订单创建成功后,只对用户发送 qq 和 email 邮件提醒
- 注解方式配置格式
// 声明队列并绑定到指定交换机 bindings = @QueueBinding( value = @Queue("声明队列的属性信息"), exchange = @Exchange("声明交换机和属性信息"), key = "绑定的BindingKey" )
- 消费者 DirectEmailConsumer
/** * @QueueBinding (队列,交换机,交换机与队列的BindingKey) * 声明创建队列 email_direct_Queue * 声明创建交换机direct_order_exchange * 绑定交换机与队列的关系,BindingKey = “email” */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "email_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false") ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false") ,key = "email" )) public class DirectEmailConsumer { // 接收消息 @RabbitHandler public void receiveMess(String message){ System.out.println("EmailConsumer direct 接收到订单消息————>"+message); } }
- 消费者 DirectQqConsumer 类
代码与上述一致,只是创建的队列和交换机绑定的key不一样
/** * @QueueBinding (队列,交换机,交换机与队列的BindingKey) * 声明创建队列 qq_direct_Queue * 声明创建交换机direct_order_exchange(不存在则创建,不会多次创建) * 绑定交换机与队列的关系,BindingKey = “qq” */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "qq_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false") ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false") ,key = "qq" )) public class DirectQqConsumer { // 接收消息 @RabbitHandler public void receiveMess(String message){ System.out.println("QqConsumer direct 接收到订单消息————>"+message); } }
- 消费者 DirectSmsConsumer 类
代码与上述一致,只是创建的队列和交换机绑定的key不一样
/** * @QueueBinding (队列,交换机,交换机与队列的BindingKey) * 声明创建队列 sms_direct_Queue * 声明创建交换机direct_order_exchange(不存在则创建,不会多次创建) * 绑定交换机与队列的关系,BindingKey = “sms” ) */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "sms_direct_Queue",durable = "true",exclusive = "false",autoDelete = "false") ,exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT,durable = "true",autoDelete = "false") ,key = "sms" )) public class DirectSmsConsumer { // 接收消息 @RabbitHandler public void receiveMess(String message){ System.out.println("SmsConsumer direct 接收到订单消息————>"+message); } }
- 运行主程序,开启消费者监听
图形化界面查看队列创建信息 - 生产者 Producer
声明交换机、队列等信息
@Configuration public class RabbitMQConfiguration { // 1.声明fanout广播模式的交换机 @Bean public FanoutExchange getExchange(){ /** * @params1 :交换机名称 * @params2 :是否持久化 * @params4 :是否自动删除 */ return new FanoutExchange("fanout_order_exchange",true,false); } // 2.声明三个队列队列:emailQueue、smsQueue、qqQueue @Bean public Queue getEmailQueue(){ /** * @params1 :队列名称 * @params2 :队列是否持久化(如果是,则重启服务不会丢失) * @params3 :是否是独占队列(如果是,则仅限于此连接) * @params4 :是否自动删除(最后一条消息消费完毕,队列是否自动删除) */ return new Queue("email_fanout_Queue",true,false,false); } @Bean public Queue getSMSQueue(){ return new Queue("sms_fanout_Queue",true,false,false); } @Bean public Queue getQqQueue(){ return new Queue("qq_fanout_Queue",true,false,false); } // 3.绑定交换机与队列的关系 @Bean public Binding getEmailBinding(){ return BindingBuilder.bind(getEmailQueue()).to(getExchange()); } @Bean public Binding getSMSBinding(){ return BindingBuilder.bind(getSMSQueue()).to(getExchange()); } @Bean public Binding getQQBinding(){ return BindingBuilder.bind(getQqQueue()).to(getExchange()); } }
- 创建订单服务,模拟下单
@Service public class OrderService { @Autowired private RabbitTemplate template; /** * 模拟用户创建订单 * @param userId 客户ID * @param productId 产品ID * @param num 数量 */ public void createOrder(String userId, String productId, int num){ // 1.根据商品ID查询库存是否充足 // 2.生成订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功...."); // 3.将订单id封装成MQ消息,投递到交换机 /**@params1 :交换机名称 * @params2 :路由key/队列名称 * @params3 :消息内容 * 注:指定RoutingKey=qq和email * 交换机direct_order_exchange与绑定的队列的BindingKey匹配的队列才会接收到 */ template.convertAndSend("direct_order_exchange","qq",orderId); template.convertAndSend("direct_order_exchange","email",orderId); } }
- 测试类进行测试
@SpringBootTest class RabbitOrderSpringbootProducerApplicationTests { @Autowired private OrderService orderService; @Test void contextLoads() { orderService.createOrder("1001","96",1); } }