RocketMQ中使用Java客户端发送消息和消费的应用

简介: 本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示

RocketMQ中使用Java客户端发送消息和消费的应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。


2. 启动RocketMQ集群

本步骤将带您启动RocketMQ集群。

说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。

  1. 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01
./restart.sh

返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。

  1. 启动broker。

2.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

2.2执行如下命令,修改broker配置项。

vim ./conf/broker.conf

2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。

2.4执行如下命令,启动broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard
./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 验证集群启动情况。

在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster

返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。


3. 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。

本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。

说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?


4. 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?

分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。

  • 消费者说明

消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性


5. 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。

本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可。


6. 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。

一个事物消息大致的生命周期如下图。

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。


7. 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景。

本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。

建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?


8. 如何有选择性的消费消息

有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置,支持消息和属性过滤。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

#是否支持重试消息也过滤
filterSupportRetry=true
#支持属性过滤
enablePropertyFilter=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。


9. 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

本步骤将指导您如何使用ACL客户端生产消费消息。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

aclEnable=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
人工智能 Java API
MCP客户端调用看这一篇就够了(Java版)
本文详细介绍了MCP(Model Context Protocol)客户端的开发方法,包括在没有MCP时的痛点、MCP的作用以及如何通过Spring-AI框架和原生SDK调用MCP服务。文章首先分析了MCP协议的必要性,接着分别讲解了Spring-AI框架和自研SDK的使用方式,涵盖配置LLM接口、工具注入、动态封装工具等步骤,并提供了代码示例。此外,还记录了开发过程中遇到的问题及解决办法,如版本冲突、服务连接超时等。最后,文章探讨了框架与原生SDK的选择,认为框架适合快速构建应用,而原生SDK更适合平台级开发,强调了两者结合使用的价值。
9408 33
MCP客户端调用看这一篇就够了(Java版)
|
5月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4206 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
8月前
|
消息中间件 存储 数据采集
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
487 106
|
6月前
|
存储 网络协议 Java
Java获取客户端IP问题:返回127.0.0.1
总结:要解决Java获取客户端IP返回127.0.0.1的问题,首先要找出原因,再采取合适的解决方案。请参考上述方案来改进代码,确保在各种网络环境下都能正确获取客户端IP地址。希望本文对您有所帮助。
471 25
|
8月前
|
消息中间件 人工智能 自然语言处理
基于 RocketMQ 事件驱动架构的 AI 应用实践
基于 RocketMQ 事件驱动架构的 AI 应用实践
289 2
|
8月前
|
消息中间件 存储 前端开发
MQ有什么应用场景
MQ有什么应用场景
|
11月前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
347 7
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
316 3
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
1140 0
探索 RocketMQ:企业级消息中间件的选择与应用

相关产品

  • 云消息队列 MQ
  • 下一篇
    oss云网关配置