RocketMQ中使用Java客户端发送消息和消费的应用

简介: 本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示

RocketMQ中使用Java客户端发送消息和消费的应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。


2. 启动RocketMQ集群

本步骤将带您启动RocketMQ集群。

说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。

  1. 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01
./restart.sh

返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。

  1. 启动broker。

2.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

2.2执行如下命令,修改broker配置项。

vim ./conf/broker.conf

2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。

2.4执行如下命令,启动broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard
./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 验证集群启动情况。

在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster

返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。


3. 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。

本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。

说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?


4. 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?

分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。

  • 消费者说明

消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性


5. 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。

本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可。


6. 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。

一个事物消息大致的生命周期如下图。

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。


7. 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景。

本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。

建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?


8. 如何有选择性的消费消息

有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置,支持消息和属性过滤。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

#是否支持重试消息也过滤
filterSupportRetry=true
#支持属性过滤
enablePropertyFilter=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。


9. 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

本步骤将指导您如何使用ACL客户端生产消费消息。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

aclEnable=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
人工智能 算法 Java
Java与AI驱动区块链:构建智能合约与去中心化AI应用
区块链技术和人工智能的融合正在开创去中心化智能应用的新纪元。本文深入探讨如何使用Java构建AI驱动的区块链应用,涵盖智能合约开发、去中心化AI模型训练与推理、数据隐私保护以及通证经济激励等核心主题。我们将完整展示从区块链基础集成、智能合约编写、AI模型上链到去中心化应用(DApp)开发的全流程,为构建下一代可信、透明的智能去中心化系统提供完整技术方案。
347 3
|
5月前
|
存储 数据采集 搜索推荐
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
本篇文章探讨了 Java 大数据在智慧文旅景区中的创新应用,重点分析了如何通过数据采集、情感分析与可视化等技术,挖掘游客情感需求,进而优化景区服务。文章结合实际案例,展示了 Java 在数据处理与智能推荐等方面的强大能力,为文旅行业的智慧化升级提供了可行路径。
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
|
5月前
|
机器学习/深度学习 数据采集 数据可视化
Java 大视界 -- 基于 Java 的大数据可视化在城市空气质量监测与污染溯源中的应用(216)
本文探讨Java大数据可视化在城市空气质量监测与污染溯源中的创新应用,结合多源数据采集、实时分析与GIS技术,助力环保决策,提升城市空气质量管理水平。
Java 大视界 -- 基于 Java 的大数据可视化在城市空气质量监测与污染溯源中的应用(216)
|
5月前
|
存储 监控 数据可视化
Java 大视界 -- 基于 Java 的大数据可视化在企业生产运营监控与决策支持中的应用(228)
本文探讨了基于 Java 的大数据可视化技术在企业生产运营监控与决策支持中的关键应用。面对数据爆炸、信息孤岛和实时性不足等挑战,Java 通过高效数据采集、清洗与可视化引擎,助力企业构建实时监控与智能决策系统,显著提升运营效率与竞争力。
|
5月前
|
Java 大数据 数据处理
Java 大视界 -- 基于 Java 的大数据实时数据处理在工业互联网设备协同制造中的应用与挑战(222)
本文探讨了基于 Java 的大数据实时数据处理在工业互联网设备协同制造中的应用与挑战。文章分析了传统制造模式的局限性,介绍了工业互联网带来的机遇,并结合实际案例展示了 Java 在多源数据采集、实时处理及设备协同优化中的关键技术应用。同时,也深入讨论了数据安全、技术架构等挑战及应对策略。
|
5月前
|
数据采集 搜索推荐 Java
Java 大视界 -- Java 大数据在智能教育虚拟学习环境构建与用户体验优化中的应用(221)
本文探讨 Java 大数据在智能教育虚拟学习环境中的应用,涵盖多源数据采集、个性化推荐、实时互动优化等核心技术,结合实际案例分析其在提升学习体验与教学质量中的成效,并展望未来发展方向与技术挑战。
|
3月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
210 8
|
4月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
805 12
|
4月前
|
安全 Java API
Java SE 与 Java EE 区别解析及应用场景对比
在Java编程世界中,Java SE(Java Standard Edition)和Java EE(Java Enterprise Edition)是两个重要的平台版本,它们各自有着独特的定位和应用场景。理解它们之间的差异,对于开发者选择合适的技术栈进行项目开发至关重要。
670 1
|
5月前
|
设计模式 XML 安全
Java枚举(Enum)与设计模式应用
Java枚举不仅是类型安全的常量,还具备面向对象能力,可添加属性与方法,实现接口。通过枚举能优雅实现单例、策略、状态等设计模式,具备线程安全、序列化安全等特性,是编写高效、安全代码的利器。

相关产品

  • 云消息队列 MQ