RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)

简介: RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)

前言


本文通过实战代码,Spring整合RabbitMQ,项目分二个模块,consumer和produle。


提示:以下是本篇文章正文内容,下面案例可供参考


一、项目代码


1.生产者


1.项目架构图:


00a901dd73544dc5bbea39f852143cde.png

代码如下(示例):


2.pom.xml依赖:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sky</groupId>
    <artifactId>spring-rabbitmq-produle</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


3.spring-rabbitmq-producer.xml:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
    默认交换机类型为direct,名字为:"",路由键为队列的名称
    -->
    <!--
        id:bean的名称
        name:queue的名称
        auto-declare:自动创建
        auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
        durable:是否持久化
    -->
    <rabbit:queue id="spring_queue" name="spring_queue"    auto-declare="true"/>
    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"  />
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    <!-- 定义队列-->
    <rabbit:queue id="spring_direct_queue" name="spring_direct_queue"  auto-declare="true"/>
    <!--
      定义 Routing  路由模式 交互机
    -->
    <rabbit:direct-exchange name="spring_direct_exchange" >
        <rabbit:bindings>
            <!--direct 类型的交换机绑定队列  key :路由key  queue:队列名称-->
            <rabbit:binding queue="spring_direct_queue" key="direct"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one"  auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_three" name="spring_topic_queue_three" auto-declare="true"/>
    <!--
      声明  topic 类型的交换机
    -->
    <rabbit:topic-exchange id="spring_topic_exchange"  name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="one.*"  queue="spring_topic_queue_one"/>
            <rabbit:binding pattern="two.#" queue="spring_topic_queue_two"/>
            <rabbit:binding pattern="three.#" queue="spring_topic_queue_three"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>


4.rabbitmq.properties


rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring

说明:这里免费提供rabbitmq连接方式给大家使用学习


5.ProducerTest:


package com.sky.springrabbitmqprodule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 简单模式发消息
     */
    @Test
    public void testHelloWorld(){
        rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
    }
    /**
     * 广播模式发消息
     */
    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
    }
    /**
     * 路由模式发消息
     */
    @Test
    public void testDirect(){
        rabbitTemplate.convertAndSend("spring_direct_exchange","direct","spring Direct....");
    }
    /**
     * 通配符模式发消息
     */
    @Test
    public void testTopics(){
        rabbitTemplate.convertAndSend("spring_topic_exchange","one.onekey","spring topic one....");
        rabbitTemplate.convertAndSend("spring_topic_exchange","two.twokey.topic","spring topic two....");
        rabbitTemplate.convertAndSend("spring_topic_exchange","three.threekey.topic","spring topic three....");
    }
}


2.消费者


1.项目架构图


4c0e74f4576642668837418c2109dd22.png

代码如下(示例):


2.pom.xml依赖:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sky</groupId>
    <artifactId>spring-rabbitmq-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


3.spring-rabbitmq-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
        <!--简单模式-->
<!--    <bean id="springQueueListener" class="com.sky.springrabbitmqconsumer.listener.SpringQueueListener"/>-->
        <!--广播模式-->
<!--    <bean id="fanoutListener1" class="com.sky.springrabbitmqconsumer.listener.FanoutListener"/>-->
<!--    <bean id="fanoutListener2" class="com.sky.springrabbitmqconsumer.listener.FanoutListener2"/>-->
        <!--路由模式-->
<!--    <bean id="springDirectQueue" class="com.sky.springrabbitmqconsumer.listener.SpringDirectQueue"/>-->
    <!--通配符模式-->
    <bean id="topicListenerOne" class="com.sky.springrabbitmqconsumer.listener.TopicListenerOne"/>
    <bean id="topicListenerTwo" class="com.sky.springrabbitmqconsumer.listener.TopicListenerTwo"/>
    <bean id="topicListenerThree" class="com.sky.springrabbitmqconsumer.listener.TopicListenerThree"/>
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
            <!--简单模式-->
<!--       <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>-->
            <!--广播模式-->
<!--        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
<!--        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
            <!--路由模式-->
<!--        <rabbit:listener ref="springDirectQueue" queue-names="spring_direct_queue"/>-->
        <!--通配符模式-->
        <rabbit:listener ref="topicListenerOne" queue-names="spring_topic_queue_one"/>
        <rabbit:listener ref="topicListenerTwo" queue-names="spring_topic_queue_two"/>
        <rabbit:listener ref="topicListenerThree" queue-names="spring_topic_queue_three"/>
    </rabbit:listener-container>
</beans>


4.rabbitmq.properties


rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring

说明:配置和生产者的一致


5.ConsumerTest


package com.sky.springrabbitmqconsumer.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerTest {
    public static void main(String[] args) {
        //初始化IOC容器
        ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");
    }
}


6.FanoutListener


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


7.FanoutListener2


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


8.SpringDirectQueue


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringDirectQueue implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


9.SpringQueueListener


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


10.TopicListenerOne


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerOne implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


11.TopicListenerTwo


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerTwo implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}


12.TopicListenerThree


package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerThree implements MessageListener {
    @Override
    public void onMessage(Message message) {
        //打印消息
        System.out.println(new String(message.getBody()));
    }
}

上面就是这个项目的所有代码了,下面就是Demo演示内容。


二、项目演示


演示简单模式:


消费者取消注释:


96d0029485884e868cd2989f24191914.png


消费者启动服务:

0801c916ee624ac49fa6b60676c0ffe1.png

生产者发送消息:

b5d3c0ca059d4137b93c368b509e13d4.png

消费者查看消息:


8be6d45e67ae42909e57d2d3dd496819.png


演示广播模式:


消费者取消注释:

31864d928f234ca7b2a2ef1f86860176.png

消费者启动服务:

547589d862224182a73a1e9464100ec4.png

生产者发送消息:

c2ef05911e3845ada160c5084f1638e3.png

消费者查看消息:

45bf09abca194425873de30bf087b138.png


演示路由模式:


消费者取消注释:

f786fe9e186a491e98daadabcd444245.png

消费者启动服务:

c73f4e835bf84af2b3ad354bfb96cfe5.png

生产者发送消息:

54f85280c9734eea99096fbb5bd1a674.png

消费者查看消息:

af9a5451cf0f48cb9ee939a0d6f1aff0.png


演示通配符模式:


消费者取消注释:

fa2a929ff48b44c0833ff39d8c6c6d96.png

消费者启动服务:

0cee29c49fad43b7bafc7762a9901164.png

生产者发送消息:

9542b7c3971e48aa95b53f31ad986ea8.png

消费者查看消息:

f2fa1a47e8814ecf9fa6f6d7e4f9f145.png


三、消息可靠性投递


消息可靠性实现需要保证以下几点:

  • 持久化

exchange要持久化

queue要持久化

message要持久化

  • 生产方确认Confirm
  • 消费方确认Ack
  • Broker高可用

1.rabbitmq 整个消息投递的路径


producer—>rabbitmq broker—>exchange—>queue—>consumer


消息从producer 到 exchange 则会返回一个 confirmCallback 。

消息从exchange–>queue 投递失败则会返回一个 returnCallback 。


2.实现消息可靠性投递的步骤


生产者设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。

生产者设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。

生产者使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

生产者使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认(none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失)

消费者如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。


3.具体实现可靠消息投递的代码


说明:基于上述Spring整合RabbitMQ的代码进行改动


生产者


第一处改动:设置确认模式和退回模式

f7def5a8d91f486d895b8cb733dd9db0.png

代码:

publisher-confirms="true"
publisher-returns="true"

第二处改动:声明队列和交互机的bean

0df0ba09f1114bffb9631bea11dae6ff.png

代码:

    <!--消息可靠性投递(生产端)-->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>


第三处改动:编写Confirm测试方法

    //测试Confirm 模式
    @Test
    public void testConfirm() {
        //定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关配置信息
             * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了....");
                //ack 为  true表示 消息已经到达交换机
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //如果没有投递到交换机中去就会接收失败,比如:将交换机名称写错
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });
        //进行消息发送
        rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
        //进行睡眠操作
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


第四处改动:编写Return测试方法

    //测试 return模式
    @Test
    public void testReturn() {
        //设置交换机处理失败消息的模式为true的时候,消息达到不了队列时,会将消息重新返回给生产者
        rabbitTemplate.setMandatory(true);
        //定义回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");
                System.out.println("message:"+message);
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                //处理业务
            }
        });
        //进行消息发送
        rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
        //进行睡眠操作
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


消费者


第一处改动:

监听器:AckListener

package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1、获取消息的id
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //2、获取消息
            System.out.println("message:"+new String(message.getBody()));
            //3、进行业务处理
            System.out.println("=====进行业务处理====");
            //模拟出现异常
            int  i = 5/0;
            //4、进行消息签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //拒绝签收
             /*
              * 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
              */
            System.out.println("=====业务处理异常,消息重新回到queue,broker会重新发送该消息给消费端====");
            channel.basicNack(deliveryTag, true, true);
        }
    }
}


第二处改动:

f6688f9012bd42a58e3d1af119a446f8.png


原来是通过声明一个个的bean对象,现在改为了扫描某个包下面的类

<context:component-scan base-package="com.sky.springrabbitmqconsumer.listener" />


第三处改动:

8a40be5756c54ae98235433f98fb25af.png

在rabbit:listener-container标签中设置acknowledge属性改为手动确认,(限流设置:prefetch属性改为每次抓取2条消息,并且监听自定义的ackListener)


4.具体实现可靠消息投递的演示


正常发消息Demo演示


启动生产者Confirm模式:

af76bfd574734ea5915e891f1f2cfd48.png

启动消费者:

3a844fe723344bd4b53a757088dc9e85.png

启动生产者Return模式:

1677f2e8bca645589b86c279ff0d1d61.png

消费者的控制台就会不停的打印:

7a5e085871d74d00acacd46d21975cc6.png


异常发消息Demo演示


生产者Confirm模式:

c888809a1e7c4de080443f889d5d8707.png

生产者Return模式:

cdac027ee5944c958725101b883eba85.png


四、消息在消费端限流


1.限流示例图


00d88be7424f42be829e403d66c357e8.png


2.实现步骤


  • 在rabbit:listener-container中配置 prefetch属性设置消费端一次拉取多少消息
  • 消费端的确认模式一定为手动确认:acknowledge=“manual”


3.具体实现消费端限流代码


消费者


第一处修改:监听器:QosListener

package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
          //获取到的消息
        System.out.println(new String(message.getBody()));
        Thread.sleep(3000);
        //处理业务逻辑
        //进行消息的签收,第二个参数:true表示之前没签收的都给他签收掉
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}


第二处修改:

a21f5404889045ba866a25c778af6733.png

    <!--定义监听器容器
      acknowledge="manual":手动签收
        自动确认:acknowledge="none"
        手动确认:acknowledge="manual"
        根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
      prefetch="1":每次抓取多少条消息。只有消息确认签收了,才会拉取下一条,否则不会拉取消息
    -->
<rabbit:listener-container connection-factory="connectionFactory"
                               auto-declare="true"
                               acknowledge="manual"
                               prefetch="2">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>


生成者


批量发送消息测试方法

    //批量发送消息,让消费者每次拉去指定的数量
    @Test
    public void  testQos(){
        for (int i = 0; i < 10; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
        }
    }


4.具体实现消费端限流Demo演示


启动消费者

f342fced535c48b989f3846e980e9ae0.png


启动生产者

0714df18761040fb997b2f2ae01d0405.png


查看消费者控制台日志

608f06a0b7b44524a0343bf0756a8a60.png

说明:每隔3秒打印一条消息


异常情况,消费未进行签收


1c8b698e7f9f40c4b99b2b8001d1923d.png

重启消费者和生产者发消息,这个时候会看到,原本发送的十条消息,实际只有二条消息打印在消费者的控制台上面,因为prefetch属性配置了2,所以一次性拉取了二条。

d8313cb7dabc477299371cf06089f431.png

<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">


五、TTL


1.业务场景


3c0e5935d8c74ac39f1939a9ce9b100b.png比如我们在下订单的时候,如果超过30分钟未支付,就取消这个订单,把当前商品的库存加回去。


2.定义


TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。


3.实现步骤


  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。


4.通过RabbitMQ管理控制台页面实现Demo


1.创建消息

70d400cbf6c94c0e839bb41fe184afb5.png


2.创建交换机

72ad23db39e241c29edc0990cf45645b.png


3.将交换机和消息绑定

8956d315fa834df3b0b50fdf6021d87b.png


4.发送消息

c3ed700bd3e0445e9e27698e35170e19.png

超过5秒没有消费者消费,就自动失效了。


5.通过代码实现TTL


添加ttl队列

    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指队列的过期时间-->
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
     <rabbit:topic-exchange name="test_exchange_ttl" >
         <rabbit:bindings>
             <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:topic-exchange>


发送消息测试方法

    //ttl测试
    @Test
    public void  testTtl(){
        for (int i = 0; i < 10; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend("test_exchange_confirm", "ttl.test", "message confirm....");
        }
    }


启动测试方法


8eaf2d0fbdc84aca8e094600bfb68e29.png

318c411729b648fa9c16cdf6da09363d.png

等待10秒

3dbe99ad37644bcf92348e06b5b13d47.png


六、死信队列


1.定义


死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。


6c0c9ce1b29f472bad9c1243362d420f.png

说明:死信交换机和死信队列和普通的没有区别,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。


2.消息成为死信的三种情况


  • 队列消息长度到达限制;
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间未被消费;


3.队列绑定死信交换机


给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key


4.代码实现


在spring-rabbitmq-producer.xml中添加队列和交换机

    <!--
        死信队列:
            1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
            2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
            3. 正常队列绑定死信交换机
                设置两个参数:
                    * x-dead-letter-exchange:死信交换机名称
                    * x-dead-letter-routing-key:发送给死信交换机的routingkey
    -->
    <!--
        1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    -->
     <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
         <!--3. 正常队列绑定死信交换机-->
         <rabbit:queue-arguments>
             <!--3.1 x-dead-letter-exchange:死信交换机名称-->
             <entry key="x-dead-letter-exchange" value="exchange_dlx" />
             <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
             <entry key="x-dead-letter-routing-key" value="dlx.test" />
             <!--4.1 设置队列的过期时间 ttl-->
             <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
             <!--4.2 设置队列的长度限制 max-length -->
             <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
         </rabbit:queue-arguments>
     </rabbit:queue>
     <!--    正常的交换机绑定正常的队列-->
     <rabbit:topic-exchange name="test_exchange_dlx">
         <rabbit:bindings>
             <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:topic-exchange>
    <!--
       2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
   -->
     <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
     <rabbit:topic-exchange name="exchange_dlx">
         <rabbit:bindings>
             <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:topic-exchange>


启动生产者:

96b60aadf1c34c0c8272d4a669a0ae29.png


RabbitMQ管控页面查看


1a87183b5b28422d85d4fcbf49bc5d69.png

32826ed1e08041d2a3f1bc82cbe6171b.png

消息拒收同理


七、延迟队列


1.定义


延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。在RabbitMQ中并未提供延迟队列功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。


2.场景


c25038af80d541fd9b91f74a92e3be1c.png

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。


3.具体实现


1.生产者


在spring-rabbitmq-producer.xml添加以下代码

    <!--
        延迟队列:
            1. 定义正常交换机(order_exchange)和队列(order_queue)
            2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3. 绑定,设置正常队列过期时间为30分钟
    -->
    <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
        <!--3. 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
<!--    订单业务的交换机和队列-->
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>


发消息测试

    /*
     * 测试延时消息
     * */
    @Test
    public  void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2021年10月...");
        //2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }



2.消费者


spring-rabbitmq-consumer.xml配置

<!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
          <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>


添加监听器OrderListener

package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}


4.Demo演示


启动生产者

4f9920d7443a4a5092bc15455de02bcd.png

启动消费者

f31499e501114355bd149688087fca88.png

说明:过了十秒之后才发送消息


八、消息积压


1.场景


  • 消费者宕机消息积压
  • 消费者消费能力不足
  • 发送这发送流量太大


2.解决方案


上线更多的消费者,进行正常消费,上线专门的队列消费访问,先将消息批量取出来,记录到数据库中,再慢慢处理。


九、消息幂等性


1.定义


幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。


2.解决方案


消息幂等性保障–乐观锁机制

504e044c89fd4885bbe8729adc8d7f2c.png

除此之外,我还提供了项目地址提供给大家clone,地址链接:[https://gitee.com/java_wxid/liao](https://gitee.com/java_wxid/liao),项目中的spring-rabbitmq-produle改为了spring-rabbitmq-producer,原意是产品生产消息给消费者消费,怕大家把produle搞错,改为了生产者,更加通俗易懂。


总结


提示:以上就是今天要讲的内容,本文使用Springboot对Rabbitmq进行了整合,并且提供了简单模式,广播模式,路由模式,通配符模式四种模式的Demo演示和代码。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
207 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
101 0
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
97 5
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
90 7
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
72 4
下一篇
DataWorks