高性能消息中间件 RabbitMQ(五)

简介: 高性能消息中间件 RabbitMQ(五)

六、RabbitMQ高级特性

6.1 消费端限流

之前我们说MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置限流机制

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取20条消息消费,签收后不满20条才会继续拉取消息。
        prefetch: 20

3、消费者接受消息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class OosConsumer {
    @RabbitListener(queues = "boot_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        //1.获取消息
        System.out.println("当前时间:"+new String(message.getBody()));
        //2.模拟业务处理
        Thread.sleep(2000);
        //3.签收消息
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }
}

20230619

6.2 限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下:

1.生产者批量发送消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

2.消费端配置不公平分发

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

3、编写两个消费者消费相同的队列信息

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "boot_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
    // 消费者2
    @RabbitListener(queues = "boot_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
消费者2:send message...1
消费者1:send message...0
19:53:21.676 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.867 seconds (JVM running for 1.259)
消费者1:send message...3
消费者1:send message...4
消费者1:send message...2
消费者1:send message...5
消费者1:send message...6
消费者2:send message...7
消费者1:send message...8
消费者1:send message...9

发现消费者1消费的要比消费者2消费的多。能者多劳。

6.3 设置队列所有消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

1、在创建队列时设置其存活时间:

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
                .ttl(10000)      //队列存活时间10s单位毫秒
                .build();
    }

2、生产者生产消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("boot_topic_exchange", "message", "send message..."+i);
        }
    }
}

十秒后,未被消费的消息会被移除。

6.4 设置单条消息存活时间

1、在消息发送的时候设置发送时间

/*发送消息并设置消息的存活时间*/
    @Test
    public void testSend() {
        //1.创建消息属性
        MessageProperties messageProperties = new MessageProperties();
        //2.设置存活时间,单位毫秒
        messageProperties.setExpiration("10000");
        //3.创建消息对象
        Message message = new Message(("send message……").getBytes(), messageProperties);
        //4.发送消息
        rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
    }

注意:

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

6.5 优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

1、设置队列的优先级

// 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return QueueBuilder
                .durable(QUEUE_NAME)// 队列名
               // .ttl(10000)      //队列中消息存活时间10s单位毫秒
                .maxPriority(10)  //设置队列的优先级越大优先级越高,最大255,推荐最大不超过10
                .build();
    }

2、编写生产者发送有优先级的消息

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) { // i为5时消息的优先级较高
                //1.创建消息属性
                MessageProperties messageProperties = new MessageProperties();
                //2.设置消息优先级
                messageProperties.setPriority(9);
                //3.创建消息对象
                Message message = new Message(("send message……" + i).getBytes(), messageProperties);
                rabbitTemplate.convertAndSend("boot_topic_exchange","message",message);
            }else {
                rabbitTemplate.convertAndSend("boot_topic_exchange","message","send message……" + i);
            }
        }
    }
}

3、编写消费者测试是否是第五条消息最先被消费

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //1.获取消息
        System.out.println(new String(message.getBody()));
        //2.手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.0)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :Starting DemoApplication using Java 1.8.0_341 on ZHANGJIN with PID 26080 (D:\Java\code\springbootcode\sb_rabbitMQ_consumer\target\classes started by 张锦 in D:\Java\code\springbootcode\sb_rabbitMQ)
17:47:14.858 INFO  ---  [main           ] com.zj.DemoApplication                            :No active profile set, falling back to 1 default profile: "default"
17:47:15.482 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Attempting to connect to: [192.168.66.100:5672]
17:47:15.498 INFO  ---  [main           ] o.s.a.rabbit.connection.CachingConnectionFactory  :Created new connection: rabbitConnectionFactory#2f2bf0e2:0/SimpleConnection@27f0ad19 [delegate=amqp://MQzhang@192.168.66.100:5672/, localPort= 53985]
17:47:15.529 INFO  ---  [main           ] com.zj.DemoApplication                            :Started DemoApplication in 0.893 seconds (JVM running for 1.338)
send message……5
send message……0
send message……1
send message……2
send message……3
send message……4
send message……6
send message……7
send message……8
send message……9

第五条消息首先被消费。

七、RabbitMQ死信队列

7.1 概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

7.2 代码实现

1、创建死信交换机和死信队列

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    //死信交换机和死信队列
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    //普通交换机和普通队列
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";
    // 创建死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE) //死信交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    // 创建死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)   //死信队列名称
                .build();
    }
    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
                                 @Qualifier(DEAD_QUEUE) Queue queue){
         return BindingBuilder
                 .bind(queue)
                 .to(exchange)
                 .with("dead")     //交换机路由键
                 .noargs();
    }
    //创建普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE) //普通交换机类型和名称
                .durable(false)   //是否持久化
                .build();
    }
    //创建普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)   //普通信队列名称
                .deadLetterExchange(DEAD_EXCHANGE)  //绑定死信交换机,因为队列中的无法消费的信息会被放到死信交换机上。
                .deadLetterRoutingKey("dead")   //死信队列路由关键字
                .ttl(10000)  //消息存活时间
                .maxLength(10)  //消息最大长度
                .build();
    }
    //普通交换机绑定普通对列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
                                 @Qualifier(NORMAL_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("normal")
                .noargs();
    }
}

2.创建生产者发送消息(测试存活时间过期变成死信)

@Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }

十秒后

消息全部去了死信队列。

2.创建生产者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.存活时间过期
//        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        //2.超过队列长度变成死信
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
        }
    }
}

因为设置了普通队列的长度,所以超出队列长度的那部分就去了死信队列。也设置了队列的存活时间,因此普通队列的消息在10秒后变成了死信。

2.创建生产者和消费者(超过队列长度变成死信)

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        //存活时间过期或者超过消息的长度时消息会变成死信队列,消息被消费者退回后队列没有签收消息会变成死信
        //1.生产者拒签消息,消息变成死信。
        rabbitTemplate.convertAndSend("normal_exchange","normal","普通队列消息");
    }
}
@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "normal_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
       //拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
    }
}

拒签消息,消息变成了死信。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
52 0
|
6月前
|
消息中间件 数据库
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
51 0
|
6月前
|
消息中间件
消息中间件系列教程(17) -RabbitMQ-死信队列
消息中间件系列教程(17) -RabbitMQ-死信队列
130 0
|
6月前
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
40 0
|
6月前
|
消息中间件
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
50 0
|
6月前
|
消息中间件 缓存 API
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
101 0
|
6月前
|
消息中间件 Java Maven
消息中间件系列教程(13) -RabbitMQ-SpringBoot集成RabbitMQ
消息中间件系列教程(13) -RabbitMQ-SpringBoot集成RabbitMQ
44 0
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列

热门文章

最新文章