精通SpringBoot——第八篇:整合RabbitMQ消息队列

简介: springboot 整合消息队列RabbitMQ

最近由于个人原因,好几天没有更新博客文章了。今天来和朋友们一起学习下,SpringBoot怎么整合RabbitMQ。目前消息组件大致有三种:.activemq, rabbitmq, kafka。这三者各有优缺点,RabbitMQ相比之下是处于其他二者之间的一个消息组件。RabbitMQ依赖于erlang,在linux下安装的话,要先安装erlang环境。下面来看看怎么SpringBoot 怎么整合RabbitMQ吧。

  1. 想要使用RabbitMQ ,pom依赖是少不了的~
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.再来看看application.yml文件的内容

spring:
  rabbitmq:
    username: rabbit
    password: 123456
    host: localhost
    port: 5672
    virtual-host: /
    #手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
    listener:
      simple:
        acknowledge-mode: manual

RabbitMQConfig的内容

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String DEFAULT_MAIL_QUEUE = "dev.mail.register.default.queue";

    public static final String MANUAL_MAIL_QUEUE = "dev.mail.register.manual.queue";

    @Bean
    public Queue defaultMailQueue (){
        // Queue queue = new  Queue(Queue名称,消息是否需要持久化处理)
        return new Queue(DEFAULT_MAIL_QUEUE, true);
    }

    @Bean
    public Queue manualMailQueue(){
        return new Queue(MANUAL_MAIL_QUEUE, true);
    }
}

搞两个监听器(使用@RabbitListener注解)来监听下这两种消息 (怎么感觉自己现在说话一股土味儿,最近吃土吃多了么~ 好吧,写的代码估计也是土味的吧)


import com.developlee.rabbitmq.config.RabbitMQConfig;
import com.developlee.rabbitmq.entity.MailEntity;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
@Component
public class MailHandler {

    private static final Logger logger = LoggerFactory.getLogger(MailHandler.class);

    /**
     * <p>TODO 该方案是 spring-boot-data-amqp 默认的方式,不太推荐。具体推荐使用  listenerManualAck()</p>
     * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
     * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易将磁盘空间耗完
     * 解决方案:手动ACK,或者try-catch 然后在 catch 里面讲错误的消息转移到其它的系列中去
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * <p>
     *
     * @param mail 监听的内容
     */
    @RabbitListener(queues = {RabbitMQConfig.DEFAULT_MAIL_QUEUE})
    public void listenerAutoAck(MailEntity mail, Message message, Channel channel) {
        //TODO  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info("listenerAutoAck 监听的消息-{}", mail.toString());
            //TODO  通知MQ 消息已被成功消费,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            //处理失败, 重新压入MQ.
            try {
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    @RabbitListener(queues = {RabbitMQConfig.MANUAL_MAIL_QUEUE})
    public void listenerManualAck(MailEntity mail, Message message, Channel channel) {
        logger.info("listenerManualAck 监听的消息-{}", mail.toString());
        try {
            //TODO  通知MQ 消息已被成功消费,可以ACK了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //如果报错,容错处理,
        }
    }
}

再来一波测试代码,测试下......

import com.developlee.rabbitmq.config.RabbitMQConfig;
import com.developlee.rabbitmq.entity.MailEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Lee
 * @// TODO 2018/6/22-11:20
 * @description
 */
@RestController
@RequestMapping(value = "/mail")
public class MailController {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MailController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_MAIL_QUEUE, mailEntity);
     * 对应 {@link MailHandler#listenerAutoAck};
     * this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_MAIL_QUEUE, mailEntity);
     * 对应 {@link MailHandler#listenerManualAck};
     */
    @GetMapping("/default")
    public void defaultMailMsg() {
        MailEntity mailEntity = new MailEntity();
        mailEntity.setId("1");
        mailEntity.setName("First Mail Message");
        mailEntity.setTitle("RabbitMQ with Spring boot!");
        mailEntity.setContent("Come on! Let's study Micro-Service together!");
        this.rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_MAIL_QUEUE, mailEntity);
        this.rabbitTemplate.convertAndSend(RabbitMQConfig.MANUAL_MAIL_QUEUE, mailEntity);
    }
}

MailEntity.java

import java.io.Serializable;

public class MailEntity implements Serializable {

    private static final long serialVersionUID = -2164058270260403154L;

    private String id;
    private String name;
    private String title;
    private String content;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

启动项目 ,浏览器地址栏输入http://localhost:8080/mail。 something you will find in your heart。

今天台风暴雨,明天一样。但后天可能会天晴,也许会有彩虹。—— By 一个挣扎的程序猿。

最后,以上示例代码可在我的github.com中找到。
我的个人公众号:developlee的潇洒人生。
关注了也不一定更新,更新就不得了了。
qrcode_for_gh_2bd3f44efa21_258

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
39 6
|
3月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
189 7
|
3月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
114 9
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
3月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
3月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
82 4
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
60 0
手撸MQ消息队列——循环数组

相关产品

  • 云消息队列 MQ