SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息

简介: RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

安装消息中间件

Windows安装ErLang

https://github.com/erlang/otp/releases/tag/OTP-25.0
在这里插入图片描述

在这里插入图片描述

Windows安装RabbitMq

https://www.rabbitmq.com/install-windows.html
在这里插入图片描述
在这里插入图片描述

安装RabbitMq UI界面

在这里插入图片描述
打开RabbitMQ Command Prompt 进入命令行

# 查看mq服务状态
rabbitmqctl.bat status

在这里插入图片描述

# 安装ui界面
rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

访问 http://localhost:15672/
默认账号密码guest/guest
在这里插入图片描述

安装延时消息插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
在这里插入图片描述
将 ez文件拷贝到安装目录rabbitmq_server-3.10.2\plugins下

# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

SpringBoot整合

这里我直接用我先前建好的微服务
order-service作为消息发送者,storage-service作为消息接收者
在这里插入图片描述
在这里插入图片描述

消息发送端order-service

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672
    # 消息确认(ACK)
    publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
    publisher-returns: true #确认消息已发送到队列(Queue)

RabbitMqConfig

package top.fate.config;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 11:41
 */
@Configuration
public class RabbitMqConfig {

    private static final Logger LOG = LogManager.getLogger();

    public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
    public static final String DIRECT_EXCHANGE = "direct_exchange"; //交换器名称
    public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键

    public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
    public static final String DELAY_EXCHANGE = "delay_exchange"; //交换器名称
    public static final String DELAY_ROUTING_KEY = "delay_routing_key"; //路由键

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        //确认消息送到交换机(Exchange)回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
        {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause)
            {
                LOG.info("\n确认消息送到交换机(Exchange)结果:");
                LOG.info("相关数据:" + correlationData);
                LOG.info("是否成功:" + ack);
                LOG.info("错误原因:" + cause);
            }
        });

        //确认消息送到队列(Queue)回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
        {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage)
            {
                LOG.info("\n确认消息送到队列(Queue)结果:");
                LOG.info("发生消息:" + returnedMessage.getMessage());
                LOG.info("回应码:" + returnedMessage.getReplyCode());
                LOG.info("回应信息:" + returnedMessage.getReplyText());
                LOG.info("交换机:" + returnedMessage.getExchange());
                LOG.info("路由键:" + returnedMessage.getRoutingKey());
            }
        });
        return rabbitTemplate;
    }

    /**
     * Json转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * Direct交换器
     */
    @Bean
    public DirectExchange directExchange()
    {
        /**
         * 创建交换器,参数说明:
         * String name:交换器名称
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         */
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    /**
     * 队列
     */
    @Bean
    public Queue directQueue()
    {
        /**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */
        return new Queue(DIRECT_QUEUE, true, false, false, null);
    }

    /**
     * 绑定
     */
    @Bean
    Binding directBinding(DirectExchange directExchange, Queue directQueue)
    {
        //将队列和交换机绑定, 并设置用于匹配键:routingKey路由键
        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    /******************************延时队列******************************/

    @Bean
    public CustomExchange delayExchange()
    {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayQueue()
    {
        Queue queue = new Queue(DELAY_QUEUE, true);
        return queue;
    }

    @Bean
    public Binding delaybinding(Queue delayQueue, CustomExchange delayExchange)
    {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }

}

实体对象

package top.fate.entity;

import lombok.Data;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:26
 */
@Data
public class TestEntity {

    private String username;
    private String password;

    public TestEntity(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public TestEntity() {
    }
}

生产者服务接口

package top.fate.service;

import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:29
 */
public interface ProducerService {

    /**
     * 发送json格式数据
     *
     * @param o
     */
    void sendTestJson(Object o);

    /**
     * 延时发送map格式数据
     *
     * @param map
     */
    void sendDelayTestMap(Map map);
}

生产者服务实现类

package top.fate.service.impl;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.fate.config.RabbitMqConfig;
import top.fate.service.ProducerService;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:30
 */
@Service
public class ProducerServiceImpl implements ProducerService {

    private static final Logger LOG = LogManager.getLogger();

    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送json格式数据
     *
     * @param o
     */
    @Override
    public void sendTestJson(Object o) {

        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY, o);
        LOG.info("json格式的数据发送成功 发送时间为" + formatter.format(new Date()));
    }

    /**
     * 延时发送map格式数据
     *
     * @param map
     */
    @Override
    public void sendDelayTestMap(Map map) {
        rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.DELAY_ROUTING_KEY, map, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {

                message.getMessageProperties().setHeader("x-delay", 5000);

                return message;
            }
        });
        LOG.info("map格式的数据发送成功 发送时间为" + formatter.format(new Date()));
    }
}

测试Controller

package top.fate.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.fate.entity.TestEntity;
import top.fate.service.ProducerService;

import java.util.HashMap;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:43
 */
@RestController
@RequestMapping(value = "producer")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    @GetMapping("sendObject")
    public void sendObject(){
        producerService.sendTestJson(new TestEntity("user","123456"));
    }

    @GetMapping("sendMap")
    public void sendMap(){
        Map map = new HashMap();
        map.put("user1",new TestEntity("user1","123"));
        map.put("user2",new TestEntity("user2","123"));
        map.put("user3",new TestEntity("user3","123"));
        producerService.sendDelayTestMap(map);
    }
}

消息接收端storage-service

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMqConfig

package top.fate.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.fate.service.impl.AckReceiver;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:46
 */
@Configuration
public class RabbitMqConfig {

    public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
    public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称

    /**
     * 消息接收确认处理类
     */
    @Autowired
    private AckReceiver ackReceiver;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * 客户端配置
     * 配置手动确认消息、消息接收确认
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer()
    {
        //消费者数量,默认10
        int DEFAULT_CONCURRENT = 10;

        //每个消费者获取最大投递数量 默认50
        int DEFAULT_PREFETCH_COUNT = 50;

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(DEFAULT_CONCURRENT);
        container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);

        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //添加队列,可添加多个队列
        container.addQueues(new Queue(DIRECT_QUEUE,true));
        container.addQueues(new Queue(DELAY_QUEUE,true));

        //设置消息处理类
        container.setMessageListener(ackReceiver);

        return container;
    }
}

消息接收接口

package top.fate.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;

import java.io.IOException;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:52
 */
public interface ConsumerReceiver {

    void receiverJson(Message message, Channel channel) throws IOException;

    void receiverMap(Message message, Channel channel) throws IOException;
}

消息接收实现类

package top.fate.service.impl;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import top.fate.entity.TestEntity;
import top.fate.service.ConsumerReceiver;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:54
 */
@Service
public class ConsumerReceiverImpl implements ConsumerReceiver {


    private static final Logger LOG = LogManager.getLogger();

    @Override
    public void receiverJson(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //将JSON格式数据转换为实体对象
            TestEntity testEntity = JSON.parseObject(message.getBody(), TestEntity.class);

            LOG.info("接收者收到JSON格式消息:");
            System.out.println("账号:" + testEntity.getUsername());
            System.out.println("密码:" + testEntity.getPassword());

            /**
             * 确认消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             */
            channel.basicAck(deliveryTag, true);

            /**
             * 否定消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            //channel.basicNack(deliveryTag, true, false);
        }
        catch (Exception e)
        {
            /**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag, false);

            e.printStackTrace();
        }
    }

    @Override
    public void receiverMap(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //将JSON格式数据转换为Map对象
            Map map = JSON.parseObject(message.getBody(), Map.class);

            LOG.info("接收者收到Map格式消息:");

            LOG.info(map.get("user1"));
            LOG.info(map.get("user2"));
            LOG.info(map.get("user3"));

            //确认消息
            channel.basicAck(deliveryTag, true);

            //否定消息
            //channel.basicNack(deliveryTag, true, false);
        }
        catch (Exception e)
        {
            //拒绝消息
            channel.basicReject(deliveryTag, false);

            e.printStackTrace();
        }
    }
}

消息分发处理类

package top.fate.service.impl;

import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.fate.config.RabbitMqConfig;
import top.fate.service.ConsumerReceiver;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:50
 */
@Service
public class AckReceiver implements ChannelAwareMessageListener {

    private static final Logger LOG = LogManager.getLogger();

    /**
     * 用户消息接收类
     */
    @Autowired
    private ConsumerReceiver consumerReceiver;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        //时间格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        LOG.info("消息接收成功,接收时间:" + dateFormat.format(new Date()) + "\n");

        //获取队列名称
        String queueName = message.getMessageProperties().getConsumerQueue();

        //接收用户信息Json格式数据
        if (queueName.equals(RabbitMqConfig.DIRECT_QUEUE))
        {
            consumerReceiver.receiverJson(message, channel);
        }

        //延时接收用户信息Map格式数据
        if (queueName.equals(RabbitMqConfig.DELAY_QUEUE))
        {
            consumerReceiver.receiverMap(message, channel);
        }
        //多个队列的处理,则如上述代码,继续添加方法....
    }
}

启动测试

在这里插入图片描述
项目启动的时候创建交换机绑定路由key,及创建队列
在这里插入图片描述
在这里插入图片描述

  • 普通消息

http://localhost:8082/producer/sendObject

在这里插入图片描述
在这里插入图片描述

  • 延时消息

http://localhost:8082/producer/sendMap
在这里插入图片描述
在这里插入图片描述

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
前端开发 Java
SpringBoot接收数组参数
SpringBoot接收数组参数
|
6月前
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
201 0
|
5月前
|
JSON 前端开发 Java
Springboot接收ajax提交JSON数组
Springboot接收ajax提交JSON数组
|
3月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
4月前
|
JSON 前端开发 Java
SpringBoot 入门 参数接收 必传参数 数组 集合 时间接收
SpringBoot 入门 参数接收 必传参数 数组 集合 时间接收
|
8月前
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
137 0
|
5月前
|
消息中间件
RabbitMQ接收消息时打印出来的是数字
RabbitMQ接收消息时打印出来的是数字
|
6月前
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
45 0
|
6月前
|
XML JSON Java
Spring Boot 学习研究笔记(十五) @RequestMapping 注解及参数接收、校验详解(1)
Spring Boot 学习研究笔记(十五) @RequestMapping 注解及参数接收、校验详解
123 0
|
7月前
|
移动开发 小程序 Android开发
支付宝小程序IOS连接mqtt接收AMQJS0007E Socket error
支付宝小程序IOS连接mqtt接收AMQJS0007E Socket error
65 0