Apache RocketMQ

简介: Apache RocketMQ 是一款分布式消息中间件,它是由阿里巴巴集团开发并开源的项目。RocketMQ 的设计目标是为了满足高吞吐量、低延迟、可靠性高等特性的分布式消息传递场景。RocketMQ 支持发布/订阅模式和点对点模式,可以广泛应用于电商、金融、物流、移动互联网等领域。

Apache RocketMQ 是一款分布式消息中间件,它是由阿里巴巴集团开发并开源的项目。RocketMQ 的设计目标是为了满足高吞吐量、低延迟、可靠性高等特性的分布式消息传递场景。RocketMQ 支持发布/订阅模式和点对点模式,可以广泛应用于电商、金融、物流、移动互联网等领域。

RocketMQ 的主要特点包括:

高吞吐量:RocketMQ 采用多线程异步化设计,能够实现高吞吐量的消息传递。
低延迟:RocketMQ 采用零拷贝技术和堆外内存技术,能够实现低延迟的消息传递。
可靠性高:RocketMQ 采用主从架构和双写机制,能够保证消息的可靠性。
消息顺序性:RocketMQ 支持消息的顺序发送和顺序消费,能够保证消息的顺序性。
分布式事务:RocketMQ 支持分布式事务,能够实现分布式事务的一致性。
扩展性强:RocketMQ 支持水平扩展和垂直扩展,可以根据业务需求进行灵活扩展。
RocketMQ 的架构包括 NameServer、Broker、Producer、Consumer 等四个组件。其中,NameServer 是 RocketMQ 的路由中心,用于管理 Broker 节点信息和 Topic 路由信息;Broker 是 RocketMQ 的消息存储节点,用于存储和转发消息;Producer 和 Consumer 分别是消息的发送方和接收方。

RocketMQ 还提供了多种语言的客户端 SDK,包括 Java、C++、Python、Go 等,方便开发者进行消息的发送和接收,同时也提供了多种运维工具和监控工具,方便管理和监控 RocketMQ 集群的运行情况。

-

-
RocketMQ 的使用可以分为以下几个步骤:

下载和安装 RocketMQ
您可以从 RocketMQ 的官方网站下载最新版本的 RocketMQ,然后解压并安装到本地。

启动 NameServer 和 Broker
在启动 RocketMQ 之前,需要先启动 NameServer 和 Broker。NameServer 用于管理 Broker 节点信息和 Topic 路由信息,而 Broker 则用于存储和转发消息。以下是启动 NameServer 和 Broker 的命令:

Copy

启动 NameServer

sh bin/mqnamesrv

启动 Broker

sh bin/mqbroker -n localhost:9876
创建消息生产者和消费者
在使用 RocketMQ 进行消息传递之前,需要先创建消息生产者和消费者。以下是创建 Java 版本的消息生产者和消费者的示例代码:

消息生产者:

java
Copy
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定 Producer Group 名称
DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");

    // 指定 NameServer 地址
    producer.setNamesrvAddr("localhost:9876");

    // 启动生产者实例
    producer.start();

    // 创建消息实例,指定 Topic、Tag 和消息内容
    Message message = new Message("my_topic", "my_tag", "Hello, RocketMQ!".getBytes());

    // 发送消息
    producer.send(message);

    // 关闭生产者实例
    producer.shutdown();
}

}
消息消费者:

java
Copy
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,并指定 Consumer Group 名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");

    // 指定 NameServer 地址
    consumer.setNamesrvAddr("localhost:9876");

    // 订阅 Topic 和 Tag
    consumer.subscribe("my_topic", "my_tag");

    // 注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            // 处理消息
            for (MessageExt message : messages) {
                System.out.println(new String(message.getBody()));
            }

            // 返回消费状态
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    // 启动消费者实例
    consumer.start();
}

}
在上述示例代码中,消息生产者使用 DefaultMQProducer 类来创建消息实例并发送消息,消息消费者使用 DefaultMQPushConsumer 类来订阅消息并注册消息监听器,当有新消息到达时,会自动调用消息监听器中的 consumeMessage 方法进行消息处理。

关闭生产者和消费者
在消息传递结束后,需要关闭生产者和消费者。以下是关闭生产者和消费者的示例代码:

java
Copy
// 关闭生产者实例
producer.shutdown();

// 关闭消费者实例
consumer.shutdown();
这些示例代码仅仅是一个简单的演示,实际使用时需要根据业务需求进行相应的配置和调整。同时,RocketMQ 还提供了多种高级特性,例如消息顺序性、事务消息、消息过滤等,需要根据实际需求进行使用。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
162 2
|
3月前
|
消息中间件 人工智能 Apache
Apache RocketMQ 中文社区全新升级!
RocketMQ 中文社区升级发布只是起点,我们将持续优化体验细节,推出更多功能和服务,更重要的是提供更多全面、深度、高质量的内容。
534 12
|
4月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
280 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
4月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
176 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
|
4月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
238 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
3月前
|
消息中间件 安全 API
Apache RocketMQ ACL 2.0 全新升级
RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。
13602 3
|
4月前
|
消息中间件 Cloud Native Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(7)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
108 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(7)
|
4月前
|
消息中间件 Cloud Native Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
115 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
|
4月前
|
消息中间件 Apache 数据安全/隐私保护
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(3)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
113 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(3)
|
4月前
|
消息中间件 Apache RocketMQ
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(5)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
114 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(5)

推荐镜像

更多