RocketMQ中使用Java客户端发送消息和消费的应用

简介: 本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示

RocketMQ中使用Java客户端发送消息和消费的应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。


2. 启动RocketMQ集群

本步骤将带您启动RocketMQ集群。

说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。

  1. 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01
./restart.sh

返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。

  1. 启动broker。

2.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

2.2执行如下命令,修改broker配置项。

vim ./conf/broker.conf

2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。

2.4执行如下命令,启动broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard
./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 验证集群启动情况。

在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster

返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。


3. 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。

本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。

说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?


4. 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?

分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。

  • 消费者说明

消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性


5. 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。

本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可。


6. 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。

一个事物消息大致的生命周期如下图。

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。


7. 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景。

本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。

建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?


8. 如何有选择性的消费消息

有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置,支持消息和属性过滤。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

#是否支持重试消息也过滤
filterSupportRetry=true
#支持属性过滤
enablePropertyFilter=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。


9. 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

本步骤将指导您如何使用ACL客户端生产消费消息。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

aclEnable=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5天前
|
安全 Java 大数据
探索Java的奇妙世界:语言特性与实际应用
探索Java的奇妙世界:语言特性与实际应用
|
7天前
|
安全 Java 调度
Java线程:深入理解与实战应用
Java线程:深入理解与实战应用
28 0
|
3天前
|
设计模式 Java API
Java 可扩展 API 设计:打造灵活的应用架构
【4月更文挑战第27天】设计可扩展的 API 是构建灵活、易于维护的应用程序架构的关键。Java 提供了丰富的工具和技术来实现这一目标,使开发者能够构建具有高度可扩展性的应用程序。
20 4
|
3天前
|
Java
【专栏】Java中的反射机制与应用实例
【4月更文挑战第27天】本文探讨了Java反射机制,该机制允许程序在运行时获取类信息、动态创建对象、调用方法和访问属性。反射通过Class、Constructor、Method和Field类实现。文中列举了反射的应用场景,如动态创建对象、调用方法、访问属性和处理注解,并提供了相关实例代码演示。
|
1天前
|
Java
Java中的条件语句结构在编程中的应用
Java中的条件语句结构在编程中的应用
4 0
|
1天前
|
敏捷开发 机器学习/深度学习 Java
Java中的异常处理机制深入理解与实践:持续集成在软件测试中的应用探索自动化测试在敏捷开发中的关键作用
【4月更文挑战第29天】在Java编程中,异常处理是一个重要的概念。它允许开发者在程序执行过程中遇到错误或异常情况时,能够捕获并处理这些异常,从而保证程序的稳定运行。本文将详细介绍Java中的异常处理机制,包括异常的分类、异常的处理方式以及自定义异常等内容。 【4月更文挑战第29天】 随着敏捷开发和DevOps文化的兴起,持续集成(CI)已成为现代软件开发周期中不可或缺的一环。本文将探讨持续集成在软件测试领域内的关键作用、实施策略以及面临的挑战。通过对自动化构建、测试用例管理、及时反馈等核心要素的详细分析,揭示持续集成如何提高软件质量和加速交付过程。 【4月更文挑战第29天】 在当今快速发
|
2天前
|
弹性计算 运维 Java
Serverless 应用引擎产品使用之在Serverless 应用引擎中,将 Java 应用从 ECS 迁移到 SAE如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
22 2
|
2天前
|
监控 搜索推荐 算法
Java排序:原理、实现与应用
【4月更文挑战第28天】本文探讨了Java中的排序算法,包括原理和实现。Java利用Comparator接口进行元素比较,通过Arrays和Collections类的sort方法对数组和列表进行排序。示例展示了使用这些方法的基本代码。此外,还讨论了冒泡排序算法和自定义排序场景,以适应不同需求。理解这些排序机制有助于提升程序效率。
8 1
|
3天前
|
监控 Java API
Java 模块化设计:概念与实战应用
【4月更文挑战第27天】模块化设计是现代软件开发的关键,它帮助开发者构建可管理、可维护的大型系统。Java 平台的模块化支持始于 Java 9,引入了一种全新的模块系统。
12 3
|
3天前
|
设计模式 消息中间件 Java
Java 设计模式:探索发布-订阅模式的原理与应用
【4月更文挑战第27天】发布-订阅模式是一种消息传递范式,被广泛用于构建松散耦合的系统。在 Java 中,这种模式允许多个对象监听和响应感兴趣的事件。
19 2

相关产品

  • 云消息队列 MQ