开发者如何使用云消息队列 RocketMQ 版

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 【10月更文挑战第12天】开发者如何使用云消息队列 RocketMQ 版

云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。它适用于各种应用场景,如异步通信、系统解耦、流量削峰等。

以下是如何在阿里云上使用 RocketMQ 的详细步骤,包括创建实例、配置生产者(Producer)和消费者(Consumer),并附上示例代码。

1. 创建阿里云 RocketMQ 实例

  1. 登录阿里云控制台
    打开阿里云官网,登录你的阿里云账号。在产品标签处找到云消息队列 RocketMQ 版

image.png

  1. 创建消息队列 RocketMQ 实例
  • 在控制台首页,搜索“消息队列 RocketMQ”。

image.png

  • 点击“创建实例”,选择实例版本,商品类型(serverless按累积量、按量付费、包年包月)。

image.png

  • 配置实例的基本信息,如实例名称、地域、VPC 网络等。
  • 完成支付并等待实例创建完成。
  1. 获取实例信息
  • 实例创建完成后,进入实例详情页面。
  • 记录实例的接入点(Endpoint)、Namespace、Access Key ID 和 Access Key Secret。

2. 配置生产者(Producer)

生产者负责将消息发送到 RocketMQ。

2.1 引入依赖

如果你使用的是 Maven 项目,可以在 pom.xml 中添加以下依赖:

xml复制代码
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>

2.2 编写生产者代码

java复制代码
import com.aliyun.openservices.ons.api.ONSFactory;  
import com.aliyun.openservices.ons.api.Producer;  
import com.aliyun.openservices.ons.api.PropertyKeyConst;  
import com.aliyun.openservices.ons.api.SendResult;  
import com.aliyun.openservices.ons.api.exception.ONSClientException;  
public class RocketMQProducer {  
public static void main(String[] args) {  
// 阿里云 AccessKey ID 和 AccessKey Secret  
String accessKeyId = "yourAccessKeyId";  
String accessKeySecret = "yourAccessKeySecret";  
// 阿里云 RocketMQ 实例的接入点  
String producerGroup = "yourProducerGroup";  
String onsAddr = "yourOnsAddr"; // 例如:http://onsaddr-internet.aliyun.com/rocketmq/onsaddr4client-internet  
// 创建生产者实例  
Producer producer = ONSFactory.createProducer(producerGroup, onsAddr,  
new PropertyKeyConst.ProducerIdKey(accessKeyId),  
new PropertyKeyConst.SecretKeyKey(accessKeySecret));  
try {  
// 启动生产者实例  
            producer.start();  
// 发送消息  
for (int i = 0; i < 10; i++) {  
String topic = "yourTopic";  
String tags = "yourTag";  
String key = "yourKey" + i;  
String body = "Hello RocketMQ " + i;  
SendResult sendResult = producer.send(new com.aliyun.openservices.ons.api.message.Message(topic, tags, key, body.getBytes()));  
                System.out.printf("%s%n", sendResult);  
            }  
// 关闭生产者实例  
            producer.shutdown();  
        } catch (ONSClientException e) {  
            e.printStackTrace();  
        }  
    }  
}

3. 配置消费者(Consumer)

消费者负责从 RocketMQ 接收消息。

3.1 引入依赖

同样,在 pom.xml 中添加依赖:

xml复制代码
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>

3.2 编写消费者代码

java复制代码
import com.aliyun.openservices.ons.api.ONSFactory;  
import com.aliyun.openservices.ons.api.Consumer;  
import com.aliyun.openservices.ons.api.PropertyKeyConst;  
import com.aliyun.openservices.ons.api.MessageListener;  
import com.aliyun.openservices.ons.api.exception.ONSClientException;  
public class RocketMQConsumer {  
public static void main(String[] args) {  
// 阿里云 AccessKey ID 和 AccessKey Secret  
String accessKeyId = "yourAccessKeyId";  
String accessKeySecret = "yourAccessKeySecret";  
// 阿里云 RocketMQ 实例的接入点  
String consumerGroup = "yourConsumerGroup";  
String onsAddr = "yourOnsAddr"; // 例如:http://onsaddr-internet.aliyun.com/rocketmq/onsaddr4client-internet  
// 创建消费者实例  
Consumer consumer = ONSFactory.createConsumer(consumerGroup, onsAddr,  
new PropertyKeyConst.ConsumerIdKey(accessKeyId),  
new PropertyKeyConst.SecretKeyKey(accessKeySecret));  
try {  
// 订阅主题和标签  
            consumer.subscribe("yourTopic", "*");  
// 注册消息监听器  
            consumer.registerMessageListener(new MessageListener() {  
@Override
public Action consume(com.aliyun.openservices.ons.api.message.Message message, ConsumeContext context) {  
                    System.out.printf("Receive New Messages: %s %n", new String(message.getBody()));  
return Action.CommitMessage;  
                }  
            });  
// 启动消费者实例  
            consumer.start();  
// 保持消费者运行  
            System.out.printf("Consumer Started.%n");  
        } catch (ONSClientException e) {  
            e.printStackTrace();  
        }  
    }  
}

4. 运行代码

  1. 配置环境
  • 确保你的阿里云账号有正确的权限。
  • 确保你的 Maven 项目已经正确引入了依赖。
  1. 运行生产者
  • 运行 RocketMQProducer 类,它将发送消息到指定的 RocketMQ 主题。
  1. 运行消费者
  • 运行 RocketMQConsumer 类,它将接收并处理来自 RocketMQ 的消息。

5. 监控和管理

阿里云提供了丰富的监控和管理功能,你可以在 RocketMQ 实例详情页面查看消息队列的状态、消费情况、延迟等。

总结

通过以上步骤,你可以在阿里云上创建并使用 RocketMQ 实例,实现消息的发送和接收。RocketMQ 提供了高性能、高可靠的消息传递服务,适用于各种分布式系统架构。希望这个指南能帮助你快速上手阿里云 RocketMQ。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
22天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
28天前
|
消息中间件 物联网 Java
开发者如何使用云消息队列 MQTT 版
【10月更文挑战第14天】开发者如何使用云消息队列 MQTT 版
45 7
|
29天前
|
消息中间件 Serverless 数据安全/隐私保护
开发者如何使用云消息队列 RabbitMQ 版
【10月更文挑战第13天】开发者如何使用云消息队列 RabbitMQ 版
69 6
|
27天前
|
消息中间件 Java Kafka
开发者如何使用云消息队列 Kafka 版
【10月更文挑战第15天】开发者如何使用云消息队列 Kafka 版
62 5
|
消息中间件 存储 Cloud Native
云原生开源开发者沙龙「微服务X消息队列专场」
云原生开源开发者沙龙「微服务X消息队列专场」
|
6月前
|
消息中间件 PHP 开发工具
阿里云OpenAPI RocketMQ 5.0的PHP收发消息文档可以在阿里云开发者门户中找到
【1月更文挑战第22天】【1月更文挑战第108篇】阿里云OpenAPI RocketMQ 5.0的PHP收发消息文档可以在阿里云开发者门户中找到
124 6
|
消息中间件 数据安全/隐私保护 RocketMQ
《开发者评测》之RocketMQ从入门到精通获奖名单
RocketMQ从入门到精通活动获奖名单正式公布!
113 0
|
消息中间件 RocketMQ 开发者
《开发者评测》之RocketMQ六大场景评测获奖名单
RocketMQ六大场景评测活动获奖名单出炉啦!
462 1
《开发者评测》之RocketMQ六大场景评测获奖名单
|
消息中间件 Cloud Native 开发者
云原生开源开发者沙龙「微服务X消息队列专场」
「8月27日深圳」云原生开源开发者沙龙微服务X消息队列专场
782 1
云原生开源开发者沙龙「微服务X消息队列专场」
|
消息中间件 Cloud Native 开发者
关于云原生开源开发者沙龙「微服务X消息队列专场」的延期通知
关于云原生开源开发者沙龙「微服务X消息队列专场」的延期通知

相关产品

  • 云消息队列 MQ