mq messageQuence消息队列
作用:解耦 流量削峰 处理数据
问题:
一致性
可用性
复杂性
2.mq产品比较
特性 |
avtiveMQ |
RabbitMQ |
RocketMQ |
kafka |
开发语音 |
java |
erLang |
java |
scala |
单机吞吐量 |
万级 |
万级 |
10w |
10w |
时效性 |
ms |
us |
ms |
ms |
可用性 |
高(主从架构) |
高(主从架构) |
非常高(分布式) |
非常高(分布式) |
功能特性 |
比较成熟,文档多 |
性能好 |
扩展性好 |
大数据应用 |
3.快速入门
3.1 安装
下载源码包或者二进制包
源码包需要手动编译
二进制包可以直接解压使用
3.2 常用命令
#启动namesrvnohup sh mqnamesrv & #查看启动日志tail -f ~/logs/rocketmqlogs/namesrv.log #启动brokernohup sh mqbroker & #查看启动日志tail -f ~/logs/rocketmqlogs/broker.log #停止namesrvsh mqshutdown namesrv #停止brokersh mqshutdown broker
3.3 常见问题
1.启动失败
启动日志出现启动失败
shutdown hook done, consuming time total(ms): 2114
原因:有可能是内存不够 rocketmq默认内存需要4g(4.7版本)
解决办法:修改启动配置
修改bin目录下的runserver.sh
修改属性
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改bin目录下的runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
3.4简单demo
启动生产者发送消息
#生产者exportNAMESRV_ADDR=localhost:9876 sh tools.sh org.apache.rocketmq.example.quickstart.Producer #消费者exportNAMESRV_ADDR=localhost:9876 sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
3.5 rocketmq各角色说明
producer: 生产者,消息发送方
consumer: 消费者,消息接收方
Broker: 消息存储和发送服务
NameSrv: 管理Broker地址
Topic: 主题
Message Quene
3.6 集群
3.6.1 集群模式
单Master模式
多Master模式
多Master多Slave模式(异步)
多Master多Slave模式(同步)
3.6.2 双主双从(同步)集群搭建
3.6.2.1 环境配置
简述: 部署到两台虚拟机
192.168.164.128 Master1 Slave2
192.168.164.129 Master2 Slave1
修改host文件 ip和域名映射
vim /etc/hosts #nameserver192.168.164.128 rocketmq-nameserver1 192.168.164.129 rocketmq-nameserver2 #broker192.168.164.128 rocketmq-master1 192.168.164.128 rocketmq-slave2 192.168.164.129 rockermq-masert2 192.168.164.129 rocketmq-slave1 #重启网卡systemctl restart network
关闭防火墙
systemctl stop firewalld systemctl status firewalld
配置rocketmq环境变量
vim /etc/profile #追加exportROCKETMQ_HOME=/home/rocketmq exportPATH=$PATH:$ROCKETMQ_HOME/bin
建立rocketmq日志目录
#Master1节点mkdir-p /home/rocketmq/store/broker-a mkdir /home/rocketmq/store/broker-a/commitlog mkdir /home/rocketmq/store/broker-a/consumequeue mkdir /home/rocketmq/store/broker-a/index mkdir /home/rocketmq/store/broker-b-s mkdir /home/rocketmq/store/broker-b-s/commitlog mkdir /home/rocketmq/store/broker-b-s/consumequeue mkdir /home/rocketmq/store/broker-b-s/index #Master2节点mkdir-p /home/rocketmq/store/broker-b mkdir /home/rocketmq/store/broker-b/commitlog mkdir /home/rocketmq/store/broker-b/consumequeue mkdir /home/rocketmq/store/broker-b/index mkdir /home/rocketmq/store/broker-a-s mkdir /home/rocketmq/store/broker-a-s/commitlog mkdir /home/rocketmq/store/broker-a-s/consumequeue mkdir /home/rocketmq/store/broker-a-s/index
3.6.2.2 修改双主双从配置文件
Master1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a.properties # Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# brokerClusterName=DefaultCluster# brokerName=broker-a# brokerId=0# deleteWhen=04# fileReservedTime=48# brokerRole=SYNC_MASTER# flushDiskType=ASYNC_FLUSH#所属的集群名字brokerClusterName=rocketmq-cluster #broker名字 不同的broker填的不一样brokerName=broker-a #brokerId 0表示Master 非0表示SlavebrokerId=0#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=10911#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-a #commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-a/commitlog #消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-a/consumequeue #消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-a/index #checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-a/checkpoint #abort文件存储路径abortFile=/home/rocketmq/store/broker-a/abort #限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SYNC_MASTER #刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
Slave2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties #所属的集群名字brokerClusterName=rocketmq-cluster #broker名字 不同的broker填的不一样brokerName=broker-b #brokerId 0表示Master 非0表示SlavebrokerId=1#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=11011#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-b-s #commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-b-s/commitlog #消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-b-s/consumequeue #消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-b-s/index #checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-b-s/checkpoint #abort文件存储路径abortFile=/home/rocketmq/store/broker-b-s/abort #限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE #刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
Master2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b.properties # Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# brokerClusterName=DefaultCluster# brokerName=broker-a# brokerId=0# deleteWhen=04# fileReservedTime=48# brokerRole=SYNC_MASTER# flushDiskType=ASYNC_FLUSH#所属的集群名字brokerClusterName=rocketmq-cluster #broker名字 不同的broker填的不一样brokerName=broker-b #brokerId 0表示Master 非0表示SlavebrokerId=0#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=10911#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-b #commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-b/commitlog #消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-b/consumequeue #消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-b/index #checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-b/checkpoint #abort文件存储路径abortFile=/home/rocketmq/store/broker-b/abort #限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SYNC_MASTER #刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
Slave1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties #所属的集群名字brokerClusterName=rocketmq-cluster #broker名字 不同的broker填的不一样brokerName=broker-a #brokerId 0表示Master 非0表示SlavebrokerId=1#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=11011#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-a-s #commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-a-s/commitlog #消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-a-s/consumequeue #消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-a-s/index #checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-a-s/checkpoint #abort文件存储路径abortFile=/home/rocketmq/store/broker-a-s/abort #限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE #刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
3.6.2.3 修改broker启动文件
修改启动配置文件,防止内存要求过大导致无法启动问题
vim /home/rocketmq/bin/runbroker.sh 修改内存 vim /home/rocketmq/bin/runserver.sh 修改内存
3.6.2.4 启动namesrv集群
两台服务器分别执行
nohup sh mqnamesrv &
3.6.2.5 启动broker集群
Master1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a.properties &
Slave2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
Master2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b.properties &
Slave1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
3.7 mqadmin
格式: mqadmin command args
3.7.1 Topic
3.8 rocketConsole搭建
下载源码 修改配置application.properties nameserver 地址
mvn编译
java -jar 启动jar包
3.9 消息发送与消费
引入jar包
3.9.1 发送消息基本流程
- 创建生产者,指定生产者组名
- 指定nameserver地址
- 启动producer
- 创建消息对象,指定Topic,tag和消息体(内容)
- 发送消息
- 关闭producer
3.9.2 消费消息基本流程
- 创建消费者,指定消费者组名
- 指定nameserver地址
- 订阅Topic,tag
- 设置回调,处理消息
- 启动消费者(消费者是一直在监听的)
3.9.3 基本消息
3.9.3.1 消息发送(生产者)
3.9.3.1.1 发送同步消息
//创建生产者对象 指定group 如果group不存在 指定group name可以直接创建,如果存在需////要//setProducerGroupDefaultMQProducerproducer=newDefaultMQProducer(); //指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;"); producer.setProducerGroup("baseGroup"); //启动producerproducer.start(); //循环发送十条消息for (inti=0; i<10; i++) { //创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes()); //发送消息SendResultsend=producer.send(message); //打印消息发送后的返回值StringmsgId=send.getMsgId(); // System.out.println("msgId====="+msgId);SendStatussendStatus=send.getSendStatus(); // System.out.println("sendStatus====="+sendStatus);intqueueId=send.getMessageQueue().getQueueId(); // System.out.println("queueId====="+queueId);System.out.println("发送结果"+send); //线程休眠1sTimeUnit.SECONDS.sleep(1); } //关闭producerproducer.shutdown();
3.9.3.1.2 发送异步消息
//创建生产者对象 指定groupDefaultMQProducerproducer=newDefaultMQProducer(); //指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;"); producer.setProducerGroup("baseGroup"); //启动producerproducer.start(); //循环发送十条消息for (inti=0; i<10; i++) { //创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes()); //发送消息producer.send(message, newSendCallback() { publicvoidonSuccess(SendResultsendResult) { System.out.println("发送结果"+sendResult); } publicvoidonException(Throwablethrowable) { System.out.println("发送失败"); } }); //线程休眠1sTimeUnit.SECONDS.sleep(1); } //关闭producerproducer.shutdown();
3.9.3.1.3 发送单项消息
无返回结果
//创建生产者对象 指定groupDefaultMQProducerproducer=newDefaultMQProducer(); //指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;"); producer.setProducerGroup("baseGroup"); //启动producerproducer.start(); //循环发送十条消息for (inti=0; i<10; i++) { //创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes()); producer.sendOneway(message); //线程休眠1sTimeUnit.SECONDS.sleep(1); } //关闭producerproducer.shutdown();
3.9.3.2 消息消费(消费者)
3.9.3.2.1 push方式消费
//创建消费者对象DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("baseConsumer"); consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876"); //订阅topic,tagconsumer.subscribe("baseTopic", "baseTag"); //监听消息consumer.registerMessageListener(newMessageListenerConcurrently() { publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list, ConsumeConcurrentlyContextconsumeConcurrentlyContext) { for (MessageExtmessageExt : list) { System.out.println(newString(messageExt.getBody())); } returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("consumer启动");
3.9.3.2.2 消费模式
- 广播模式 所有订阅Topic的消费者都会消费消息
- 负载均衡模式 同一个组的消费者会瓜分Topic消息
3.9.4 顺序消息
业务需要按照顺序发送和消费消息
rocketmq自带顺序消息
3.9.4.1 原理
使用队列的数据结构实现顺序消息 先进先出
生产消息时把消息发送到broker的同一队列
消费消息时同一个队列的消息由同一个线程处理
3.9.4.2 生产顺序消息
try { DefaultMQProducerproducer=newDefaultMQProducer(); producer.setProducerGroup("orderGroup"); producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876"); List<OrderEntity>list=OrderEntity.builds(); for (OrderEntityo : list) { Messagemessage=newMessage("orderTopic", "orderTag", o.toString().getBytes()); SendResultsend=producer.send(message, newMessageQueueSelector() { //指定消息发送使用的队列,同一个orderId的消息统一发送到一个queue中publicMessageQueueselect(List<MessageQueue>list, Messagemessage, Objecto) { longl= (Long) o%list.size(); returnlist.get((int)l); } }, o.getOrderId()); System.out.println(send); } }catch (Exceptione){ e.printStackTrace(); }finally { producer.shutdown(); }
3.9.4.3 消费顺序消息
DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer(); consumer.setConsumerGroup("orderGroup"); consumer.setInstanceName("orderConsumer"); try { consumer.subscribe("orderTopic", "*"); } catch (MQClientExceptione) { e.printStackTrace(); } consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876"); //注册一个顺序消息的监听器defaultMQPushConsumer.registerMessageListener(newMessageListenerOrderly(){ publicConsumeOrderlyStatusconsumeMessage(List<MessageExt>list, ConsumeOrderlyContextconsumeOrderlyContext) { for (inti=0; i<list.size(); i++) { System.out.println(newString(list.get(i).getBody())); } returnConsumeOrderlyStatus.SUCCESS; } }); try { defaultMQPushConsumer.start(); } catch (MQClientExceptione) { e.printStackTrace(); }
3.9.5 延时消息
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1就是1s
2就是5s
...
共18个级别
3.9.5.1 发送消息
//核心代码message.setDelayTimeLevel(2); for (inti=0; i<10; i++) { //创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("orderTopic", "orderTag", (("hello world")+(i+1)).getBytes()); //设置延时消息message.setDelayTimeLevel(3); //发送消息System.out.println("开始时间==="+System.currentTimeMillis()); SendResultsend=producer.send(message); //打印消息发送后的返回值StringmsgId=send.getMsgId(); // System.out.println("msgId====="+msgId);SendStatussendStatus=send.getSendStatus(); // System.out.println("sendStatus====="+sendStatus);intqueueId=send.getMessageQueue().getQueueId(); // System.out.println("queueId====="+queueId);System.out.println("发送结果"+send); //线程休眠1sTimeUnit.SECONDS.sleep(1); }
3.9.5.2 消费消息
正常消费
3.9.6 批量消息
3.9.6.1 生产消息
Messagemessage=newMessage("orderTopic", "orderTag", (("hello world")+ (2)).getBytes()); Messagemessage1=newMessage("orderTopic", "orderTag", (("hello world")+(3)).getBytes()); Messagemessage2=newMessage("orderTopic", "orderTag", (("hello world")+(4)).getBytes()); Messagemessage3=newMessage("orderTopic", "orderTag", (("hello world")+(5)).getBytes()); List<Message>msgs=newArrayList<>(); msgs.add(message); msgs.add(message1); msgs.add(message2); msgs.add(message3); ///发送消息System.out.println("开始时间==="+System.currentTimeMillis()); SendResultsend=producer.send(msgs);
3.9.6.2 消费消息
正常消费
3.9.7 过滤消息
3.9.7.1 tag过滤
在订阅tag时可以通过*或者||指定订阅的tag
consumer.subscribe("baseTopic", "baseTag || orderTag"); consumer.subscribe("baseTopic", "*");
3.9.7.2 sql过滤
生产消息时可以给消息指定一个特定的属性
Messagemessage=newMessage("orderTopic", "orderTag", (("hello world") + (2)).getBytes()); message.putUserProperty("type", i+"");
这样在消费者订阅消息时可以通过简单sql过滤
consumer.subscribe("orderTopic", MessageSelector.bySql("type>1"));
批量消息不能使用过滤
3.9.8 事务消息
3.9.8.1 概述
具体查看
发送事务消息时有三种状态
- 提交 提交后消息才能被消费
- 回滚 回滚后消息被删除
- 中间 没有提交或回滚的状态,这个状态下rocketmq会去回查本地事务,确定要提交还是回滚
3.9.8.2 发送消息
TransactionMQProducertransactionMQProducer=newTransactionMQProducer("transactionGroup"); transactionMQProducer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;"); transactionMQProducer.setInstanceName("transactionProducer"); transactionMQProducer.setTransactionListener(newTransactionListener() { //发送半消息后触发,是提交还是回滚publicLocalTransactionStateexecuteLocalTransaction(Messagemessage, Objecto) { System.out.println("message: "+message.getUserProperty("type")+"开始检查事务状态"); if(StringUtils.equals("commit",message.getUserProperty("type"))){ returnLocalTransactionState.COMMIT_MESSAGE; }elseif (StringUtils.equals("rollback",message.getUserProperty("type"))){ returnLocalTransactionState.ROLLBACK_MESSAGE; }else{ returnLocalTransactionState.UNKNOW; } } //当消息状态处于中间状态时,rocketmq会回调此方法,验证此消息是发送还是回滚publicLocalTransactionStatecheckLocalTransaction(MessageExtmessageExt) { System.out.println("message: "+messageExt.getMsgId()+"触发回查"); returnLocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.start(); Messagemessage=newMessage("transactionTopic", "transcationTag", "hello world message0".getBytes()); message.putUserProperty("type", "commit"); Messagemessage1=newMessage("transactionTopic", "transcationTag", "hello world message1".getBytes()); message1.putUserProperty("type", "rollback"); Messagemessage2=newMessage("transactionTopic", "transcationTag", "hello world message2".getBytes()); SendResultsend=transactionMQProducer.sendMessageInTransaction(message,null); System.out.println("发送结果=="+send); Thread.sleep(2000); SendResultsend1=transactionMQProducer.sendMessageInTransaction(message1,null); System.out.println("发送结果=="+send1); Thread.sleep(2000); SendResultsend2=transactionMQProducer.sendMessageInTransaction(message2,null); System.out.println("发送结果=="+send2); Thread.sleep(100000); transactionMQProducer.shutdown();
3.9.8.3 限制
1.不支持延时消息和批量消息
2.事务消息可能被多次回查,可能出现重复消费情况
3.通过设置用户属性来设置服务端的回查间隔时间
message2.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,"5");