RocketMQ中使用Java客户端发送消息和消费的应用

简介: 本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示

RocketMQ中使用Java客户端发送消息和消费的应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。


2. 启动RocketMQ集群

本步骤将带您启动RocketMQ集群。

说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。

  1. 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01
./restart.sh

返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。

  1. 启动broker。

2.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

2.2执行如下命令,修改broker配置项。

vim ./conf/broker.conf

2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。

2.4执行如下命令,启动broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard
./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 验证集群启动情况。

在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster

返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。


3. 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。

本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。

说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?


4. 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?

分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。

  • 消费者说明

消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性


5. 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。

本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可。


6. 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。

一个事物消息大致的生命周期如下图。

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。


7. 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景。

本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。

建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?


8. 如何有选择性的消费消息

有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置,支持消息和属性过滤。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

#是否支持重试消息也过滤
filterSupportRetry=true
#支持属性过滤
enablePropertyFilter=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。


9. 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

本步骤将指导您如何使用ACL客户端生产消费消息。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

aclEnable=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
23天前
|
存储 Java
Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。
【10月更文挑战第19天】本文详细介绍了Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。HashMap以其高效的插入、查找和删除操作著称,而TreeMap则擅长于保持元素的自然排序或自定义排序,两者各具优势,适用于不同的开发场景。
38 1
|
15天前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
基于开源框架Spring AI Alibaba快速构建Java应用
|
8天前
|
SQL Java 数据库连接
从理论到实践:Hibernate与JPA在Java项目中的实际应用
本文介绍了Java持久层框架Hibernate和JPA的基本概念及其在具体项目中的应用。通过一个在线书店系统的实例,展示了如何使用@Entity注解定义实体类、通过Spring Data JPA定义仓库接口、在服务层调用方法进行数据库操作,以及使用JPQL编写自定义查询和管理事务。这些技术不仅简化了数据库操作,还显著提升了开发效率。
20 3
|
18天前
|
SQL 监控 Java
技术前沿:Java连接池技术的最新发展与应用
本文探讨了Java连接池技术的最新发展与应用,包括高性能与低延迟、智能化管理和监控、扩展性与兼容性等方面。同时,结合最佳实践,介绍了如何选择合适的连接池库、合理配置参数、使用监控工具及优化数据库操作,为开发者提供了一份详尽的技术指南。
28 7
|
16天前
|
SQL Java 数据库连接
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率。本文介绍了连接池的工作原理、优势及实现方法,并提供了HikariCP的示例代码。
30 3
|
17天前
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
24 4
|
16天前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
33 2
|
17天前
|
缓存 Java 数据库连接
Hibernate:Java持久层框架的高效应用
通过上述步骤,可以在Java项目中高效应用Hibernate框架,实现对关系数据库的透明持久化管理。Hibernate提供的强大功能和灵活配置,使得开发者能够专注于业务逻辑的实现,而不必过多关注底层数据库操作。
11 1
|
21天前
|
移动开发 前端开发 JavaScript
java家政系统成品源码的关键特点和技术应用
家政系统成品源码是已开发完成的家政服务管理软件,支持用户注册、登录、管理个人资料,家政人员信息管理,服务项目分类,订单与预约管理,支付集成,评价与反馈,地图定位等功能。适用于各种规模的家政服务公司,采用uniapp、SpringBoot、MySQL等技术栈,确保高效管理和优质用户体验。

相关产品

  • 云消息队列 MQ