RocketMQ中使用Java客户端发送消息和消费的应用
1. 实验环境说明
实验环境
- 体验手册。
a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。
b.实验操作手册。
- 云产品资源。
a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。
b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。
- 体验报告。
小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。
- 实验环境。
实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。
- 实验体验时间。
体验时间一般为一个小时。
- 实验环境功能栏。
功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。
- 编写实验报告。
填写实验报告,帮助大家快速通过实验了解RocketMQ。
实验帮助
如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。
2. 启动RocketMQ集群
本步骤将带您启动RocketMQ集群。
说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。
- 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01 ./restart.sh
返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。
- 启动broker。
2.1执行如下命令,进入broker目录。
cd /usr/local/services/5-rocketmq/broker-01
2.2执行如下命令,修改broker配置项。
vim ./conf/broker.conf
2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。
说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。
2.4执行如下命令,启动broker。
./restart.sh
返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。
- 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard ./restart.sh
返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。
- 验证集群启动情况。
在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster。
返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。
3. 如何发送和消费并发消息
并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。
本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。
说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。
cd /data/demos git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?
4. 如何发送和消费顺序消息
顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。
什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?
分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。
本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
- 生产者说明
生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。
- 消费者说明
消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性
5. 如何发送和消费延迟消息
延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。
本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
- 生产者说明
生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。
- 消费者说明: 消费者按照并发消息消费即可。
6. 如何发送和消费事务消息
事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。
一个事物消息大致的生命周期如下图。
概括为如下几个重要点:
- 生产者发送half消息(事物消息)
- Broker存储half消息
- 生产者处理本地事物,处理成功后commit事物
- 消费者消费到事物消息
本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。
生产者端的主要代码包含3个步骤:
6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。
这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。
6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。
6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。
7. 生产者消费者如何同步发送、消费消息(Request-Reply)
request-reply模式,可以满足目前类似RPC同步调用的场景。
本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。
建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。
生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。
消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。
一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?
8. 如何有选择性的消费消息
有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 修改broker配置,支持消息和属性过滤。
3.1执行如下命令,进入broker目录。
cd /usr/local/services/5-rocketmq/broker-01
3.2执行如下命令,编辑配置文件broker.conf。
vim conf/broker.conf
3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项。添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。
#是否支持重试消息也过滤 filterSupportRetry=true #支持属性过滤 enablePropertyFilter=true
添加完成后的文件内容如下。
3.4执行如下命令,重启broker。
./restart.sh
返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。
- 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
以下分别介绍生产者和消费者主要demo代码。
- 生产者
在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。
- 消费者
tag过滤消费时,在订阅topic时, 也添加上tag订阅
SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。
9. 如何使用ACL客户端生产消费消息
ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL。
本步骤将指导您如何使用ACL客户端生产消费消息。
- 启动一个RocketMQ集群。
说明:已启动一个RocketMQ集群,本操作请您忽略。
- 进入/data/demos目录,并下载全部java代码Demo。
说明:已下载java代码Demo,本操作请您忽略。
- 修改broker配置。
3.1执行如下命令,进入broker目录。
cd /usr/local/services/5-rocketmq/broker-01
3.2执行如下命令,编辑配置文件broker.conf。
vim conf/broker.conf
3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项。添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。
aclEnable=true
添加完成后的文件内容如下。
3.4执行如下命令,重启broker。
./restart.sh
返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。
- 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
- 执行如下命令,打包Demo代码。
mvn clean package
- 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime
返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。
- Demo代码说明。
如果您想查看全部代码,您可以查看实验本地或者访问Demo代码。
带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:
static RPCHook getAclRPCHook(String accessKey, String secretKey) { return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); }
在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。
实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6