操作手册
【实践】快速体验云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
场景简介
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
背景知识
本场景主要涉及以下云产品和服务:
云消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。RocketMQ自诞生以来一直服务阿里集团十余年,历经多次双十一万亿级数据洪峰稳定性验证。
前提条件
云起实验室将在您的账号下开通本次实操资源,资源按量付费,需要您自行承担本次实操的云资源费用。
本实验使用云消息队列RocketMQ版Serverless系列实例,实验总费用不超过0.1元。如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
进入实操前,请确保阿里云账号满足以下条件:
创建实验资源
在实验页面,勾选我已阅读并同意《阿里云云起实践平台服务协议》和我已授权阿里云云起实践平台创建、读取及释放实操相关资源后,单击开始实操。
创建资源需要5分钟左右的时间,请您耐心等待。
在云产品资源列表,您可以查看本场景涉及的云产品资源信息。
获取接入点
在云产品资源列表的消息队列RocketMQ版区域,单击管理。
在实例详情页面的TCP协议接入点区域,可以查看实例的接入点信息。
VPC专有网络接入点:使用 VPC 专有网络访问云消息队列RocketMQ版时使用。云消息队列RocketMQ版默认提供的接入点。
公网接入点:使用公网访问云消息队列RocketMQ版时使用该接入点。仅当开启公网访问时显示。
说明本实验以查看公网接入点为例,后续内容会使用到公网接入点。
获取账号密码
客户端接入云消息队列RocketMQ版服务端时,需要根据接入方式配置实例用户名密码。
使用公网访问云消息队列RocketMQ版服务端:需要配置实例的用户名密码。
使用VPC网络访问云消息队列 RocketMQ 版服务端:无需配置实例的用户名密码,系统会根据VPC接入点智能识别用户身份(使用VPC内网访问时系统会按照产生的内网流量计算,云消息队列RocketMQ版不收取费用,在私网连接(PrivateLink)侧进行计费)。
本实验以公网访问为例,查看如何获取RocketMQ版Serverless系列实例的账号密码。
在左侧导航栏中,单击访问控制。
在IP白名单页签,单击智能身份识别。
在智能身份识别页签,可以查看到实例的用户名和密码。
说明本实验后续内容需要使用到实例的用户名和密码。
创建Topic
现在我们在RocketMQ实例中创建一个Topic资源。
在左侧导航栏中,单击Topic管理。
在Topic管理页面,单击创建Topic。
在创建Topic面板,填写Topic名称和描述,在本实验中将名称和描述设置为YUNQI-RMQTopic,消息类型选择为普通消息,然后单击确定,一个Topic便创建完成了。
创建订阅组(Group)
拥有一个Topic后,我们再创建一个订阅组(Group)。订阅组将被用于消息消费过程。
在左侧导航栏中,单击Group管理。
在Group管理页面,单击创建Group。
在创建Group面板,填写Group ID,在本实验中将Group ID设置为test-group。其他参数保持默认即可,然后单击确定。此时,一个订阅组便创建完成了。
收发消息
为方便体验,我们选择在控制台进行消息的发送,编写消费者代码并运行,以消费控制台发送的那条消息。
发送消息。
在左侧导航栏中,单击Topic管理。
在Topic管理页面,找到您创建的Topic,单击其右侧操作列下的快速体验。
在快速体验的消息生产和消费面板中,发送方式选择控制台,填入消息内容,单击确定。
发送成功后,这条消息便已进入您实例所在的存储中,您可单击这里查看其消息轨迹。
接收消息。
编写消费者代码,本实验将说明如何在IntelliJ IDEA中完成消费者的启动。本实验将从0开始教您从零开始构建一个Java项目。若您已有一定开发经验,请您根据真实情况选择性跳过。
首先,安装IntelliJ IDEA,选择社区版(Community)进行下载并安装。
打开IntelliJ IDEA,单击New Project。
新建一个Java工程。
在运行代码前,请在您的工程中添加pom依赖。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency> </dependencies>
添加完成后,pom.xml文件如下图所示。
创建一个名为PushConsumerExample的类,复制并粘贴下方代码,将代码中endpoints、topic、consumerGroup、instanceId、userName、passWord六个成员变量的值修改为您的RocketMQ实例相关信息,然后并运行。
import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "{实例接入点,本实验需要填写公网接入点,如rmq-cn-xxx.cn-zhangjiakou.rmq.aliyuncs.com:8080}"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "{Topic名称,如YUNQI-RMQTopic}"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "{Group ID, 如test-group}"; String instanceId = "{实例id,如rmq-cn-xxx}"; String userName = "{账号名}"; String passWord = "{密码}"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace(instanceId) .setCredentialProvider(new StaticSessionCredentialsProvider(userName, passWord)) .build(); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) //设置消费监听器。 .setMessageListener(messageView -> { //处理消息并返回消费结果。 // LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close(); } }
启动后,消费成功即可拿到之前在控制台发送的消息。
可观测能力
阿里云云消息队列RocketMQ版的可观测能力多样,细粒度的有消息级别的查询、轨迹查询。粗粒度的有仪表盘,能够在实例维度查看消息的生产、发送、堆积等情况。
消息查询&轨迹。
针对我们刚刚发送的消息,可以在消息查询功能,查询该消息的具体内容、查看消息轨迹,并可指定消费者进行消费能力验证等。
在左侧导航栏中,单击消息查询。
在消息查询页面,查询方式选择按Topic查询,Topic选择YUNQI-RMQTopic,单击查询。
在消息查询页面,找到目标消息,单击其右侧操作列下的详情,即可查询该消息的具体内容。
在消息查询页面,找到目标消息,单击其右侧操作列下的消息轨迹,即可查询该消息的具体轨迹。
消息轨迹功能,能够支持对特定消息进行全生命周期的展示,包括其生产者、存储时间、存储 ID、投递事件、消费者等信息。通过该可观测能力,我们能够十分清晰地了解消息收发的细节。
在消息查询页面,找到目标消息,选择其右侧操作列下的更多 > 消费验证,即可指定消费者进行消费能力验证。
在消息查询页面,找到目标消息,选择其右侧操作列下的更多 > 下载消息,即可下载消息内容。
仪表盘。
相对于消息查询功能,仪表盘属于粗粒度的可观测能力。该能力可以展现实例维度、Topic维度、Group维度的整体情况,包括但不限于收发速率、堆积情况等数据。且依托于Grafana的可视化能力,这些指标的展示都是十分直观且灵活的。
在左侧导航栏中,单击仪表盘。
在服务管理角色对话框中,单击授权。
说明如果您的阿里云账号已授权,请您跳过此步骤。
在仪表盘页面,可以查看到实例维度、Topic维度、Group维度的整体情况。在消费者区域,我们可以看到刚刚测试的消息在何时进入实例,消费延迟时间等信息。
其它拓展能力以及参考文档
开源RocketMQ在GitHub社区中不断迭代成长,定期发布版本,您可以在社区内查看最新特性、提出Bug,甚至参与Bug的修复。
Apache RocketMQ的官网为:
Apache RocketMQ的开源Github社区为:
Apache RocketMQ的中文学习社区为:
Apache RocketMQ的中国开发者钉钉群为:21982288
此外,阿里云云消息队列RocketMQ版的更多特性、教程、最佳实践均可在官方文档中找到。基于Serverless系列可以让体验成本可控,若对其它消息队列特性感兴趣,请自行上手尝试。
阿里云云消息队列RocketMQ版的官方文档为:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/?spm=5176.234368.J_5253785160.5.2e4b4ef6ujTZNV
清理资源
在完成实验后,如果无需继续使用资源,选择不保留资源,单击结束实操。在结束实操对话框中,单击确定。
在完成实验后,如果需要继续使用资源,选择付费保留资源,单击结束实操。在结束实操对话框中,单击确定。请随时关注账户扣费情况,避免发生欠费。