消息中间件-RocketMq

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: RocketMq 消息队列

mq messageQuence消息队列


作用:解耦 流量削峰 处理数据


问题:


一致性


可用性


复杂性


2.mq产品比较

特性

avtiveMQ

RabbitMQ

RocketMQ

kafka

开发语音

java

erLang

java

scala

单机吞吐量

万级

万级

10w

10w

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(分布式)

非常高(分布式)

功能特性

比较成熟,文档多

性能好

扩展性好

大数据应用


3.快速入门


3.1 安装


下载源码包或者二进制包


源码包需要手动编译


二进制包可以直接解压使用


3.2 常用命令

#启动namesrvnohup sh mqnamesrv &
#查看启动日志tail -f ~/logs/rocketmqlogs/namesrv.log
#启动brokernohup sh mqbroker &
#查看启动日志tail -f ~/logs/rocketmqlogs/broker.log
#停止namesrvsh mqshutdown namesrv
#停止brokersh mqshutdown broker



3.3 常见问题


1.启动失败


启动日志出现启动失败

shutdown hook done, consuming time total(ms): 2114


原因:有可能是内存不够 rocketmq默认内存需要4g(4.7版本)


解决办法:修改启动配置


修改bin目录下的runserver.sh


修改属性

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


修改bin目录下的runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"


3.4简单demo


启动生产者发送消息

#生产者exportNAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
#消费者exportNAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer


3.5 rocketmq各角色说明


producer: 生产者,消息发送方


consumer: 消费者,消息接收方


Broker: 消息存储和发送服务


NameSrv: 管理Broker地址


Topic: 主题


Message Quene


3.6 集群


3.6.1 集群模式


单Master模式


多Master模式


多Master多Slave模式(异步)


多Master多Slave模式(同步)


3.6.2 双主双从(同步)集群搭建


3.6.2.1 环境配置


简述: 部署到两台虚拟机


192.168.164.128 Master1 Slave2


192.168.164.129 Master2 Slave1


修改host文件 ip和域名映射
vim /etc/hosts
#nameserver192.168.164.128 rocketmq-nameserver1
192.168.164.129 rocketmq-nameserver2
#broker192.168.164.128 rocketmq-master1
192.168.164.128 rocketmq-slave2
192.168.164.129 rockermq-masert2
192.168.164.129 rocketmq-slave1
#重启网卡systemctl restart network



关闭防火墙
systemctl stop firewalld
systemctl status firewalld



配置rocketmq环境变量
vim /etc/profile
#追加exportROCKETMQ_HOME=/home/rocketmq
exportPATH=$PATH:$ROCKETMQ_HOME/bin


建立rocketmq日志目录
#Master1节点mkdir-p /home/rocketmq/store/broker-a
mkdir /home/rocketmq/store/broker-a/commitlog
mkdir /home/rocketmq/store/broker-a/consumequeue
mkdir /home/rocketmq/store/broker-a/index
mkdir /home/rocketmq/store/broker-b-s
mkdir /home/rocketmq/store/broker-b-s/commitlog
mkdir /home/rocketmq/store/broker-b-s/consumequeue
mkdir /home/rocketmq/store/broker-b-s/index
#Master2节点mkdir-p /home/rocketmq/store/broker-b
mkdir /home/rocketmq/store/broker-b/commitlog
mkdir /home/rocketmq/store/broker-b/consumequeue
mkdir /home/rocketmq/store/broker-b/index
mkdir /home/rocketmq/store/broker-a-s
mkdir /home/rocketmq/store/broker-a-s/commitlog
mkdir /home/rocketmq/store/broker-a-s/consumequeue
mkdir /home/rocketmq/store/broker-a-s/index


3.6.2.2 修改双主双从配置文件


Master1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# brokerClusterName=DefaultCluster# brokerName=broker-a# brokerId=0# deleteWhen=04# fileReservedTime=48# brokerRole=SYNC_MASTER# flushDiskType=ASYNC_FLUSH#所属的集群名字brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样brokerName=broker-a
#brokerId 0表示Master 非0表示SlavebrokerId=0#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=10911#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-a
#commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-a/commitlog
#消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-a/consumequeue
#消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-a/index
#checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-a/checkpoint
#abort文件存储路径abortFile=/home/rocketmq/store/broker-a/abort
#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SYNC_MASTER
#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128



Slave2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties
#所属的集群名字brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样brokerName=broker-b
#brokerId 0表示Master 非0表示SlavebrokerId=1#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=11011#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-b-s
#commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-b-s/commitlog
#消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-b-s/consumequeue
#消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-b-s/index
#checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-b-s/checkpoint
#abort文件存储路径abortFile=/home/rocketmq/store/broker-b-s/abort
#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE
#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128



Master2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# brokerClusterName=DefaultCluster# brokerName=broker-a# brokerId=0# deleteWhen=04# fileReservedTime=48# brokerRole=SYNC_MASTER# flushDiskType=ASYNC_FLUSH#所属的集群名字brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样brokerName=broker-b
#brokerId 0表示Master 非0表示SlavebrokerId=0#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=10911#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-b
#commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-b/commitlog
#消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-b/consumequeue
#消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-b/index
#checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-b/checkpoint
#abort文件存储路径abortFile=/home/rocketmq/store/broker-b/abort
#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SYNC_MASTER
#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128



Slave1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties
#所属的集群名字brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样brokerName=broker-a
#brokerId 0表示Master 非0表示SlavebrokerId=1#nameserver地址 配置多个分号隔开namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数defaultTopicQueueNums=4#自动创建TopicautoCreateTopicEnable=true#自动创建订阅组autoCreateSubscriptionGroup=true#对外服务的监听端口listenPort=11011#删除文件时间点 凌晨四点deleteWhen=04#文件保留时间 默认48小时fileReservedTime=120#文件默认大小 1gmapedFileSizeCommitLog=1024000000#每个文件默认存储30w条消息mapedFileSizeConsumeQuene=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/home/rocketmq/store/broker-a-s
#commitLog存储路径storePathCommitLog=/home/rocketmq/store/broker-a-s/commitlog
#消息队列存储路径storePathConsumeQueue=/home/rocketmq/store/broker-a-s/consumequeue
#消息队列索引存储路径storePathIndex=/home/rocketmq/store/broker-a-s/index
#checkPoint 文件存储路径storeCheckpoint=/home/rocketmq/store/broker-a-s/checkpoint
#abort文件存储路径abortFile=/home/rocketmq/store/broker-a-s/abort
#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE
#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128



3.6.2.3 修改broker启动文件


修改启动配置文件,防止内存要求过大导致无法启动问题

vim /home/rocketmq/bin/runbroker.sh
修改内存
vim /home/rocketmq/bin/runserver.sh
修改内存



3.6.2.4 启动namesrv集群


两台服务器分别执行

nohup sh mqnamesrv &


3.6.2.5 启动broker集群


Master1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a.properties &



Slave2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties &



Master2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b.properties &


Slave1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties &


3.7 mqadmin


格式: mqadmin command args


3.7.1 Topic


3.8 rocketConsole搭建


下载源码 修改配置application.properties nameserver 地址


mvn编译


java -jar 启动jar包


3.9 消息发送与消费


引入jar包



3.9.1 发送消息基本流程


  1. 创建生产者,指定生产者组名
  2. 指定nameserver地址
  3. 启动producer
  4. 创建消息对象,指定Topic,tag和消息体(内容)
  5. 发送消息
  6. 关闭producer


3.9.2 消费消息基本流程


  1. 创建消费者,指定消费者组名
  2. 指定nameserver地址
  3. 订阅Topic,tag
  4. 设置回调,处理消息
  5. 启动消费者(消费者是一直在监听的)


3.9.3 基本消息


3.9.3.1 消息发送(生产者)


3.9.3.1.1 发送同步消息
//创建生产者对象 指定group 如果group不存在 指定group name可以直接创建,如果存在需////要//setProducerGroupDefaultMQProducerproducer=newDefaultMQProducer();
//指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
producer.setProducerGroup("baseGroup");
//启动producerproducer.start();
//循环发送十条消息for (inti=0; i<10; i++) {
//创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
//发送消息SendResultsend=producer.send(message);
//打印消息发送后的返回值StringmsgId=send.getMsgId();
//            System.out.println("msgId====="+msgId);SendStatussendStatus=send.getSendStatus();
//            System.out.println("sendStatus====="+sendStatus);intqueueId=send.getMessageQueue().getQueueId();
//            System.out.println("queueId====="+queueId);System.out.println("发送结果"+send);
//线程休眠1sTimeUnit.SECONDS.sleep(1);
}
//关闭producerproducer.shutdown();



3.9.3.1.2 发送异步消息
//创建生产者对象 指定groupDefaultMQProducerproducer=newDefaultMQProducer();
//指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
producer.setProducerGroup("baseGroup");
//启动producerproducer.start();
//循环发送十条消息for (inti=0; i<10; i++) {
//创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
//发送消息producer.send(message, newSendCallback() {
@OverridepublicvoidonSuccess(SendResultsendResult) {
System.out.println("发送结果"+sendResult);
                }
@OverridepublicvoidonException(Throwablethrowable) {
System.out.println("发送失败");
                }
            });
//线程休眠1sTimeUnit.SECONDS.sleep(1);
        }
//关闭producerproducer.shutdown();

3.9.3.1.3 发送单项消息


无返回结果

//创建生产者对象 指定groupDefaultMQProducerproducer=newDefaultMQProducer();
//指定nameserver 集群分号隔开producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
producer.setProducerGroup("baseGroup");
//启动producerproducer.start();
//循环发送十条消息for (inti=0; i<10; i++) {
//创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
producer.sendOneway(message);
//线程休眠1sTimeUnit.SECONDS.sleep(1);
        }
//关闭producerproducer.shutdown();

 

3.9.3.2 消息消费(消费者)


3.9.3.2.1 push方式消费
//创建消费者对象DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("baseConsumer");
consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
//订阅topic,tagconsumer.subscribe("baseTopic", "baseTag");
//监听消息consumer.registerMessageListener(newMessageListenerConcurrently() {
@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list, ConsumeConcurrentlyContextconsumeConcurrentlyContext) {
for (MessageExtmessageExt : list) {
System.out.println(newString(messageExt.getBody()));
                }
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
consumer.start();
System.out.println("consumer启动");

 


3.9.3.2.2 消费模式


  1. 广播模式 所有订阅Topic的消费者都会消费消息
  2. 负载均衡模式 同一个组的消费者会瓜分Topic消息


3.9.4 顺序消息


业务需要按照顺序发送和消费消息


rocketmq自带顺序消息


3.9.4.1 原理


使用队列的数据结构实现顺序消息   先进先出


生产消息时把消息发送到broker的同一队列


消费消息时同一个队列的消息由同一个线程处理


3.9.4.2 生产顺序消息

try {
DefaultMQProducerproducer=newDefaultMQProducer();
producer.setProducerGroup("orderGroup");
producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
List<OrderEntity>list=OrderEntity.builds();
for (OrderEntityo : list) {
Messagemessage=newMessage("orderTopic", "orderTag", o.toString().getBytes());
SendResultsend=producer.send(message, newMessageQueueSelector() {
//指定消息发送使用的队列,同一个orderId的消息统一发送到一个queue中@OverridepublicMessageQueueselect(List<MessageQueue>list, Messagemessage, Objecto) {
longl= (Long) o%list.size();
returnlist.get((int)l);
            }
        }, o.getOrderId());
System.out.println(send);
    }
}catch (Exceptione){
e.printStackTrace();
}finally {
producer.shutdown();
}



3.9.4.3 消费顺序消息

DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer();
consumer.setConsumerGroup("orderGroup");
consumer.setInstanceName("orderConsumer");
try {
consumer.subscribe("orderTopic", "*");
} catch (MQClientExceptione) {
e.printStackTrace();
}
consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
//注册一个顺序消息的监听器defaultMQPushConsumer.registerMessageListener(newMessageListenerOrderly(){
@OverridepublicConsumeOrderlyStatusconsumeMessage(List<MessageExt>list, ConsumeOrderlyContextconsumeOrderlyContext) {
for (inti=0; i<list.size(); i++) {
System.out.println(newString(list.get(i).getBody()));
        }
returnConsumeOrderlyStatus.SUCCESS;
    }
});
try {
defaultMQPushConsumer.start();
} catch (MQClientExceptione) {
e.printStackTrace();
}



3.9.5 延时消息


messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h


1就是1s


2就是5s


...


共18个级别


3.9.5.1 发送消息



//核心代码message.setDelayTimeLevel(2);
for (inti=0; i<10; i++) {
//创建消息对象 指定topic tag 和消息体Messagemessage=newMessage("orderTopic", "orderTag", (("hello world")+(i+1)).getBytes());
//设置延时消息message.setDelayTimeLevel(3);
//发送消息System.out.println("开始时间==="+System.currentTimeMillis());
SendResultsend=producer.send(message);
//打印消息发送后的返回值StringmsgId=send.getMsgId();
//            System.out.println("msgId====="+msgId);SendStatussendStatus=send.getSendStatus();
//            System.out.println("sendStatus====="+sendStatus);intqueueId=send.getMessageQueue().getQueueId();
//            System.out.println("queueId====="+queueId);System.out.println("发送结果"+send);
//线程休眠1sTimeUnit.SECONDS.sleep(1);
 }


3.9.5.2 消费消息


正常消费


3.9.6 批量消息


3.9.6.1 生产消息

Messagemessage=newMessage("orderTopic", "orderTag", (("hello world")+ (2)).getBytes());
Messagemessage1=newMessage("orderTopic", "orderTag", (("hello world")+(3)).getBytes()); 
Messagemessage2=newMessage("orderTopic", "orderTag", (("hello world")+(4)).getBytes());
Messagemessage3=newMessage("orderTopic", "orderTag", (("hello world")+(5)).getBytes());
List<Message>msgs=newArrayList<>();
msgs.add(message);
msgs.add(message1);
msgs.add(message2);
msgs.add(message3);
///发送消息System.out.println("开始时间==="+System.currentTimeMillis());
SendResultsend=producer.send(msgs);


3.9.6.2 消费消息


正常消费


3.9.7 过滤消息


3.9.7.1 tag过滤


在订阅tag时可以通过*或者||指定订阅的tag

consumer.subscribe("baseTopic", "baseTag || orderTag");
consumer.subscribe("baseTopic", "*");


3.9.7.2 sql过滤


生产消息时可以给消息指定一个特定的属性

Messagemessage=newMessage("orderTopic", "orderTag", (("hello world") + (2)).getBytes());
message.putUserProperty("type", i+"");



这样在消费者订阅消息时可以通过简单sql过滤

consumer.subscribe("orderTopic", MessageSelector.bySql("type>1"));


批量消息不能使用过滤


3.9.8 事务消息



3.9.8.1 概述


具体查看


发送事务消息时有三种状态


  1. 提交 提交后消息才能被消费
  2. 回滚 回滚后消息被删除
  3. 中间 没有提交或回滚的状态,这个状态下rocketmq会去回查本地事务,确定要提交还是回滚


3.9.8.2 发送消息

TransactionMQProducertransactionMQProducer=newTransactionMQProducer("transactionGroup");
transactionMQProducer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
transactionMQProducer.setInstanceName("transactionProducer");
transactionMQProducer.setTransactionListener(newTransactionListener() {
//发送半消息后触发,是提交还是回滚@OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemessage, Objecto) {
System.out.println("message: "+message.getUserProperty("type")+"开始检查事务状态");
if(StringUtils.equals("commit",message.getUserProperty("type"))){
returnLocalTransactionState.COMMIT_MESSAGE;
                }elseif (StringUtils.equals("rollback",message.getUserProperty("type"))){
returnLocalTransactionState.ROLLBACK_MESSAGE;
                }else{
returnLocalTransactionState.UNKNOW;
                }
            }
//当消息状态处于中间状态时,rocketmq会回调此方法,验证此消息是发送还是回滚@OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmessageExt) {
System.out.println("message: "+messageExt.getMsgId()+"触发回查");
returnLocalTransactionState.COMMIT_MESSAGE;
            }
        });
transactionMQProducer.start();
Messagemessage=newMessage("transactionTopic", "transcationTag", "hello world message0".getBytes());
message.putUserProperty("type", "commit");
Messagemessage1=newMessage("transactionTopic", "transcationTag", "hello world message1".getBytes());
message1.putUserProperty("type", "rollback");
Messagemessage2=newMessage("transactionTopic", "transcationTag", "hello world message2".getBytes());
SendResultsend=transactionMQProducer.sendMessageInTransaction(message,null);
System.out.println("发送结果=="+send);
Thread.sleep(2000);
SendResultsend1=transactionMQProducer.sendMessageInTransaction(message1,null);
System.out.println("发送结果=="+send1);
Thread.sleep(2000);
SendResultsend2=transactionMQProducer.sendMessageInTransaction(message2,null);
System.out.println("发送结果=="+send2);
Thread.sleep(100000);
transactionMQProducer.shutdown();



3.9.8.3 限制


1.不支持延时消息和批量消息


2.事务消息可能被多次回查,可能出现重复消费情况


3.通过设置用户属性来设置服务端的回查间隔时间

message2.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,"5");


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
10天前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
10天前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
3月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
242 3
|
2月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
2月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
2月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
3月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
3月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
164 11
|
3月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章

相关产品

  • 云消息队列 MQ
  • 下一篇
    无影云桌面