消息队MQ

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
云原生网关 MSE Higress,422元/月
简介: 消息队MQ

文章描述

😊 @ 作者:Lion J
`💖 @ 主页:
https://blog.csdn.net/weixin_69252724`
🎉 @ 主题: 消息队列MQ_rabbitMQ搭建
⏱️ @ 创作时间:2024年03月9日
————————————————


举一个 电商的例子
在开发的一个场景中,用户下订单给订单服务,订单服务调用库存服务减产库存情况, 订单服务再下订单, 下完订单再通知用户订单信息

在这里插入图片描述


一、MQ是什么?

MQ 全称(Message Queue)又名消息队列,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。

言简意赅的说,就是 将服务中间某个行为步骤先抽取到一个容器,让容器去操作,不影响当前的服务

二、常见MQ中间件

ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,
开发比较灵活,采用 C 语言 实现,实际上只是一个 socket 库的重新封装,如果做为消息队列使用,需要开发大量的代码。 ZeroMQ 仅提供非持久性的队列,也就是说如果 down 机,数据将会丢失。
RabbitMQ: 使用 erlang 语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
ActiveMQ: 历史悠久的 Apache 开源项目。已经在很多产品中得到应用,实现
了 JMS1.1 规范,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ: 阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来 很简单。
Kafka: Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

三、RocletMQ环境搭建

rabbitMQ搭建

  1. 下载解压
    https://rocketmq.apache.org/download/
  2. 配置环境变量
ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-4.9.3
NAMESRV_ADDR =127.0.0.1:9876
  1. 启动Name Server

进入到bin目录输入命令:
mqnamesrv.cmd

  1. 启动Broker

进入到 bin 目录输入命令:
mqbroker.cmd -n 127.0.0.1:9876 atuoCreateTopicEnable=true

控制台安装启动

  1. 解压

在这里插入图片描述

  1. 修改其 src/main/resources 中的 application.properties 配置文件

在这里插入图片描述

  1. 在解压目录 rocketmq-console 的 pom.xml 中添加如下 JAXB 依赖。
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
  1. 打包_命令行进入到 rocketmq-console

mvn clean package -Dmaven.test.skip=true

  1. 打包后,进入 target 目录

启动控制台 java -jar rocketmq-console-ng-1.0.0.jar

  1. 访问

http://127.0.0.1:6060
在这里插入图片描述

四、RocketMQ架构

在这里插入图片描述

其中Broker是RocketMQ的核心, 当Broker启动后, 就会向NameServer中注册自身消息, 然后Producer在NameServer中获取Broker的信息,然后向Broker发送投递消息; 消费者Consumer在NameServer中获取Broker消息之后就会从Broker中接收消息

NameServer,Broker,Producer,Consumer。
Broker(邮递员) Broker 是 RocketMQ 的核心,负责消息的接收,存储,投递等功能.
NameServer(邮局) 消息队列的协调者,Broker 向它注册路由信息,同时Producer 和 Consumer 向其获取路由信息
Producer(寄件人) 消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消 息
Consumer(收件人) 消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息
Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对 Topic 来发送和接收消息
Message Queue(邮件) 为了提高性能和吞吐量,引入了 Message Queue,一个 Topic 可以设置一个或多个 Message Queue,这样消息就可以并行往各个Message Queue 发送消息,消费者也可以并行的从多个 Message Queue 读取消息 Message Message 是消息的载体。
Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

五、java消息发送和接收演示

消息发送者

        public class MQProducerTest {
            public static void main(String[] args) throws Exception {
//1. 创建消息生产者, 指定生产者所属的组名
                DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定 Nameserver 地址
                producer.setNamesrvAddr("192.168.109.131:9876");
//3. 启动生产者
                producer.start();
//4. 创建消息对象,指定主题、标签和消息体
                Message msg = new Message("myTopic", "myTag",
                        ("RocketMQ Message").getBytes());
//5. 发送消息
                SendResult sendResult = producer.send(msg, 10000);
                System.out.println(sendResult);
//6. 关闭生产者
                producer.shutdown();
            }
        }

消息接收

        public class MQConsumerTest {
            public static void main(String[] args) throws Exception {
//1. 创建消息消费者, 指定消费者所属的组名
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
//2. 指定 Nameserver 地址
                consumer.setNamesrvAddr("192.168.109.131:9876");
//3. 指定消费者订阅的主题和标签
                consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
                                                                            msgs,
                                                                    ConsumeConcurrentlyContext
                                                                            context) {
                        System.out.println("Receive New Messages: " + msgs);//返回消费状态
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
//5. 启动消息消费者
                consumer.start();
                System.out.println("Consumer Started.");
            }
        }

六、案例

在这里插入图片描述

订单微服务发送消息

  1. 添加rocketmq依赖
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
  1. 添加配置

在这里插入图片描述

  1. 编写测试代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
rocketMQTemplate.convertAndSend("order-topic", order);

用户微服务接收消息

  1. 添加依赖
        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
  1. 修配置文件

在这里插入图片描述

  1. 编写消息接收服务
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        System.out.println("收到一个订单信息:"+ JSON.toJSONString(order)+",接下来发送短信");
    }
}
  1. 启动服务,执行下单操作,观看后台输出

在这里插入图片描述

七、发送不同类型消息

RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送

可靠同步发送: 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方 式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送: 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送 方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知 启动转码服务,转码完成后通知推送转码结果等。
单向发送: 单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不 等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

● 同步消息

//同步消息
//参数一: topic
//参数二: 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一
                条同步消息");
                System.out.println(sendResult);

●发送异步消息

//参数一: topic
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1","这是一条异步消息",new

    SendCallback() {
        @Override
        public void onSuccess (SendResult sendResult){
            System.out.println(sendResult);
        }
        @Override
        public void onException (Throwable throwable){
            System.out.println(throwable);
        }
    });
//让线程不要终止
Thread.sleep(30000000)

●单向消息

rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 Java Kafka
MQ四兄弟:如何保证消息顺序性
在分布式系统中,消息队列(MQ)是确保组件间高效通信的关键。RabbitMQ、RocketMQ、Kafka和Pulsar通过不同机制保证消息顺序性:RabbitMQ依赖单一队列和消费者模式;RocketMQ使用MessageQueueSelector;Kafka基于Partition和Key;Pulsar通过分区主题和键路由。这些系统的核心思想是将相同特征的消息发送到同一队列或分区,并按先进先出原则消费,从而确保消息顺序性。
37 0
|
6月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
|
6月前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 Java
消息队列-死信队列
消息队列-死信队列
82 0
|
7月前
|
消息中间件 存储 监控
【消息中间件】详解mq消息积压
【消息中间件】详解mq消息积压
251 0
|
8月前
|
消息中间件 存储 缓存
【面试问题】MQ 如何保证消息的顺序性?
【1月更文挑战第27天】【面试问题】MQ 如何保证消息的顺序性?
|
8月前
|
消息中间件 缓存 监控
mq如何保证消息顺序性
mq如何保证消息顺序性
142 0
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
201 1
|
消息中间件 关系型数据库 MySQL
如何保证MQ中消息的顺序性?
如何保证MQ中消息的顺序性?
107 1
|
消息中间件 监控 NoSQL
RocketMq普通消息,死信队列,消息幂等性(redis)
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
263 0