消息中间件-RocketMq-阿里云开发者社区

开发者社区> 微服务> 正文
登录阅读全文

消息中间件-RocketMq

简介: RocketMq 消息队列

mq messageQuence消息队列


作用:解耦 流量削峰 处理数据


问题:


一致性


可用性


复杂性


2.mq产品比较

特性

avtiveMQ

RabbitMQ

RocketMQ

kafka

开发语音

java

erLang

java

scala

单机吞吐量

万级

万级

10w

10w

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(分布式)

非常高(分布式)

功能特性

比较成熟,文档多

性能好

扩展性好

大数据应用


3.快速入门


3.1 安装


下载源码包或者二进制包


源码包需要手动编译


二进制包可以直接解压使用


3.2 常用命令

#启动namesrv
nohup sh mqnamesrv &
#查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#启动broker
nohup sh mqbroker &
#查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
#停止namesrv
sh mqshutdown namesrv
#停止broker
sh mqshutdown broker



3.3 常见问题


1.启动失败


启动日志出现启动失败

shutdown hook done, consuming time total(ms): 2114


原因:有可能是内存不够 rocketmq默认内存需要4g(4.7版本)


解决办法:修改启动配置


修改bin目录下的runserver.sh


修改属性

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


修改bin目录下的runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"


3.4简单demo


启动生产者发送消息

#生产者
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
#消费者
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer


3.5 rocketmq各角色说明


producer: 生产者,消息发送方


consumer: 消费者,消息接收方


Broker: 消息存储和发送服务


NameSrv: 管理Broker地址


Topic: 主题


Message Quene


3.6 集群


3.6.1 集群模式


单Master模式


多Master模式


多Master多Slave模式(异步)


多Master多Slave模式(同步)


3.6.2 双主双从(同步)集群搭建


3.6.2.1 环境配置


简述: 部署到两台虚拟机


192.168.164.128 Master1 Slave2


192.168.164.129 Master2 Slave1


修改host文件 ip和域名映射
vim /etc/hosts
#nameserver
192.168.164.128 rocketmq-nameserver1
192.168.164.129 rocketmq-nameserver2
#broker
192.168.164.128 rocketmq-master1
192.168.164.128 rocketmq-slave2
192.168.164.129 rockermq-masert2
192.168.164.129 rocketmq-slave1
#重启网卡
systemctl restart network



关闭防火墙
systemctl stop firewalld
systemctl status firewalld



配置rocketmq环境变量
vim /etc/profile
#追加
export ROCKETMQ_HOME=/home/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin


建立rocketmq日志目录
#Master1节点
mkdir -p /home/rocketmq/store/broker-a
mkdir /home/rocketmq/store/broker-a/commitlog
mkdir /home/rocketmq/store/broker-a/consumequeue
mkdir /home/rocketmq/store/broker-a/index
mkdir /home/rocketmq/store/broker-b-s
mkdir /home/rocketmq/store/broker-b-s/commitlog
mkdir /home/rocketmq/store/broker-b-s/consumequeue
mkdir /home/rocketmq/store/broker-b-s/index
#Master2节点
mkdir -p /home/rocketmq/store/broker-b
mkdir /home/rocketmq/store/broker-b/commitlog
mkdir /home/rocketmq/store/broker-b/consumequeue
mkdir /home/rocketmq/store/broker-b/index
mkdir /home/rocketmq/store/broker-a-s
mkdir /home/rocketmq/store/broker-a-s/commitlog
mkdir /home/rocketmq/store/broker-a-s/consumequeue
mkdir /home/rocketmq/store/broker-a-s/index


3.6.2.2 修改双主双从配置文件


Master1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# brokerClusterName=DefaultCluster
# brokerName=broker-a
# brokerId=0
# deleteWhen=04
# fileReservedTime=48
# brokerRole=SYNC_MASTER
# flushDiskType=ASYNC_FLUSH
#所属的集群名字
brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样
brokerName=broker-a
#brokerId 0表示Master 非0表示Slave
brokerId=0
#nameserver地址 配置多个分号隔开
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#自动创建Topic
autoCreateTopicEnable=true
#自动创建订阅组
autoCreateSubscriptionGroup=true
#对外服务的监听端口
listenPort=10911
#删除文件时间点 凌晨四点
deleteWhen=04
#文件保留时间 默认48小时
fileReservedTime=120
#文件默认大小 1g
mapedFileSizeCommitLog=1024000000
#每个文件默认存储30w条消息
mapedFileSizeConsumeQuene=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/home/rocketmq/store/broker-a
#commitLog存储路径
storePathCommitLog=/home/rocketmq/store/broker-a/commitlog
#消息队列存储路径
storePathConsumeQueue=/home/rocketmq/store/broker-a/consumequeue
#消息队列索引存储路径
storePathIndex=/home/rocketmq/store/broker-a/index
#checkPoint 文件存储路径
storeCheckpoint=/home/rocketmq/store/broker-a/checkpoint
#abort文件存储路径
abortFile=/home/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128



Slave2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties
#所属的集群名字
brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样
brokerName=broker-b
#brokerId 0表示Master 非0表示Slave
brokerId=1
#nameserver地址 配置多个分号隔开
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#自动创建Topic
autoCreateTopicEnable=true
#自动创建订阅组
autoCreateSubscriptionGroup=true
#对外服务的监听端口
listenPort=11011
#删除文件时间点 凌晨四点
deleteWhen=04
#文件保留时间 默认48小时
fileReservedTime=120
#文件默认大小 1g
mapedFileSizeCommitLog=1024000000
#每个文件默认存储30w条消息
mapedFileSizeConsumeQuene=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/home/rocketmq/store/broker-b-s
#commitLog存储路径
storePathCommitLog=/home/rocketmq/store/broker-b-s/commitlog
#消息队列存储路径
storePathConsumeQueue=/home/rocketmq/store/broker-b-s/consumequeue
#消息队列索引存储路径
storePathIndex=/home/rocketmq/store/broker-b-s/index
#checkPoint 文件存储路径
storeCheckpoint=/home/rocketmq/store/broker-b-s/checkpoint
#abort文件存储路径
abortFile=/home/rocketmq/store/broker-b-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128



Master2 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# brokerClusterName=DefaultCluster
# brokerName=broker-a
# brokerId=0
# deleteWhen=04
# fileReservedTime=48
# brokerRole=SYNC_MASTER
# flushDiskType=ASYNC_FLUSH
#所属的集群名字
brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样
brokerName=broker-b
#brokerId 0表示Master 非0表示Slave
brokerId=0
#nameserver地址 配置多个分号隔开
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#自动创建Topic
autoCreateTopicEnable=true
#自动创建订阅组
autoCreateSubscriptionGroup=true
#对外服务的监听端口
listenPort=10911
#删除文件时间点 凌晨四点
deleteWhen=04
#文件保留时间 默认48小时
fileReservedTime=120
#文件默认大小 1g
mapedFileSizeCommitLog=1024000000
#每个文件默认存储30w条消息
mapedFileSizeConsumeQuene=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/home/rocketmq/store/broker-b
#commitLog存储路径
storePathCommitLog=/home/rocketmq/store/broker-b/commitlog
#消息队列存储路径
storePathConsumeQueue=/home/rocketmq/store/broker-b/consumequeue
#消息队列索引存储路径
storePathIndex=/home/rocketmq/store/broker-b/index
#checkPoint 文件存储路径
storeCheckpoint=/home/rocketmq/store/broker-b/checkpoint
#abort文件存储路径
abortFile=/home/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128



Slave1 修改配置文件
vim /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties
#所属的集群名字
brokerClusterName=rocketmq-cluster
#broker名字 不同的broker填的不一样
brokerName=broker-a
#brokerId 0表示Master 非0表示Slave
brokerId=1
#nameserver地址 配置多个分号隔开
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#发送消息时默认创建服务器上不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
#自动创建Topic
autoCreateTopicEnable=true
#自动创建订阅组
autoCreateSubscriptionGroup=true
#对外服务的监听端口
listenPort=11011
#删除文件时间点 凌晨四点
deleteWhen=04
#文件保留时间 默认48小时
fileReservedTime=120
#文件默认大小 1g
mapedFileSizeCommitLog=1024000000
#每个文件默认存储30w条消息
mapedFileSizeConsumeQuene=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/home/rocketmq/store/broker-a-s
#commitLog存储路径
storePathCommitLog=/home/rocketmq/store/broker-a-s/commitlog
#消息队列存储路径
storePathConsumeQueue=/home/rocketmq/store/broker-a-s/consumequeue
#消息队列索引存储路径
storePathIndex=/home/rocketmq/store/broker-a-s/index
#checkPoint 文件存储路径
storeCheckpoint=/home/rocketmq/store/broker-a-s/checkpoint
#abort文件存储路径
abortFile=/home/rocketmq/store/broker-a-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128



3.6.2.3 修改broker启动文件


修改启动配置文件,防止内存要求过大导致无法启动问题

vim /home/rocketmq/bin/runbroker.sh
修改内存
vim /home/rocketmq/bin/runserver.sh
修改内存



3.6.2.4 启动namesrv集群


两台服务器分别执行

nohup sh mqnamesrv &


3.6.2.5 启动broker集群


Master1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a.properties &



Slave2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b-s.properties &



Master2
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-b.properties &


Slave1
nohup sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a-s.properties &


3.7 mqadmin


格式: mqadmin command args


3.7.1 Topic


3.8 rocketConsole搭建


下载源码 修改配置application.properties nameserver 地址


mvn编译


java -jar 启动jar包


3.9 消息发送与消费


引入jar包



3.9.1 发送消息基本流程


  1. 创建生产者,指定生产者组名
  2. 指定nameserver地址
  3. 启动producer
  4. 创建消息对象,指定Topic,tag和消息体(内容)
  5. 发送消息
  6. 关闭producer


3.9.2 消费消息基本流程


  1. 创建消费者,指定消费者组名
  2. 指定nameserver地址
  3. 订阅Topic,tag
  4. 设置回调,处理消息
  5. 启动消费者(消费者是一直在监听的)


3.9.3 基本消息


3.9.3.1 消息发送(生产者)


3.9.3.1.1 发送同步消息
//创建生产者对象 指定group 如果group不存在 指定group name可以直接创建,如果存在需////要//setProducerGroup
DefaultMQProducer producer = new DefaultMQProducer();
//指定nameserver 集群分号隔开
producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
producer.setProducerGroup("baseGroup");
//启动producer
producer.start();
//循环发送十条消息
for (int i = 0; i < 10; i++) {
    //创建消息对象 指定topic tag 和消息体
    Message message = new Message("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
    //发送消息
    SendResult send = producer.send(message);
    //打印消息发送后的返回值
    String msgId = send.getMsgId();
    //            System.out.println("msgId====="+msgId);
    SendStatus sendStatus = send.getSendStatus();
    //            System.out.println("sendStatus====="+sendStatus);
    int queueId = send.getMessageQueue().getQueueId();
    //            System.out.println("queueId====="+queueId);
    System.out.println("发送结果"+send);
    //线程休眠1s
    TimeUnit.SECONDS.sleep(1);
}
//关闭producer
producer.shutdown();



3.9.3.1.2 发送异步消息
//创建生产者对象 指定group
        DefaultMQProducer producer = new DefaultMQProducer();
        //指定nameserver 集群分号隔开
        producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
        producer.setProducerGroup("baseGroup");
        //启动producer
        producer.start();
        //循环发送十条消息
        for (int i = 0; i < 10; i++) {
            //创建消息对象 指定topic tag 和消息体
            Message message = new Message("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
            //发送消息
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果"+sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送失败");
                }
            });
            //线程休眠1s
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭producer
        producer.shutdown();

 

3.9.3.1.3 发送单项消息


无返回结果

//创建生产者对象 指定group
        DefaultMQProducer producer = new DefaultMQProducer();
        //指定nameserver 集群分号隔开
        producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
        producer.setProducerGroup("baseGroup");
        //启动producer
        producer.start();
        //循环发送十条消息
        for (int i = 0; i < 10; i++) {
            //创建消息对象 指定topic tag 和消息体
            Message message = new Message("baseTopic", "baseTag", (("hello world")+(i+1)).getBytes());
            producer.sendOneway(message);
            //线程休眠1s
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭producer
        producer.shutdown();

   

3.9.3.2 消息消费(消费者)


3.9.3.2.1 push方式消费
//创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseConsumer");
        consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
        //订阅topic,tag
        consumer.subscribe("baseTopic", "baseTag");
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println(new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer启动");

   


3.9.3.2.2 消费模式


  1. 广播模式 所有订阅Topic的消费者都会消费消息
  2. 负载均衡模式 同一个组的消费者会瓜分Topic消息


3.9.4 顺序消息


业务需要按照顺序发送和消费消息


rocketmq自带顺序消息


3.9.4.1 原理


使用队列的数据结构实现顺序消息   先进先出


生产消息时把消息发送到broker的同一队列


消费消息时同一个队列的消息由同一个线程处理


3.9.4.2 生产顺序消息

try {
    DefaultMQProducer producer = new DefaultMQProducer();
    producer.setProducerGroup("orderGroup");
    producer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
    List<OrderEntity> list = OrderEntity.builds();
    
    for (OrderEntity o : list) {
        Message message = new Message("orderTopic", "orderTag", o.toString().getBytes());
        SendResult send = producer.send(message, new MessageQueueSelector() {
            //指定消息发送使用的队列,同一个orderId的消息统一发送到一个queue中
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                long l = (Long) o % list.size();
                return list.get((int)l);
            }
        }, o.getOrderId());
        System.out.println(send);
    }
}catch (Exception e){
    e.printStackTrace();
}finally {
    producer.shutdown();
}



3.9.4.3 消费顺序消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("orderGroup");
consumer.setInstanceName("orderConsumer");
try {
    consumer.subscribe("orderTopic", "*");
} catch (MQClientException e) {
    e.printStackTrace();
}
consumer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129:9876");
//注册一个顺序消息的监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly(){
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (int i = 0; i < list.size(); i++) {
            System.out.println(new String(list.get(i).getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
try {
    defaultMQPushConsumer.start();
} catch (MQClientException e) {
    e.printStackTrace();
}



3.9.5 延时消息


messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h


1就是1s


2就是5s


...


共18个级别


3.9.5.1 发送消息



//核心代码
message.setDelayTimeLevel(2);

for (int i = 0; i < 10; i++) {
     //创建消息对象 指定topic tag 和消息体
     Message message = new Message("orderTopic", "orderTag", (("hello world")+(i+1)).getBytes());
     //设置延时消息
     message.setDelayTimeLevel(3);
     //发送消息
     System.out.println("开始时间==="+System.currentTimeMillis());
     SendResult send = producer.send(message);
     //打印消息发送后的返回值
     String msgId = send.getMsgId();
     //            System.out.println("msgId====="+msgId);
     SendStatus sendStatus = send.getSendStatus();
     //            System.out.println("sendStatus====="+sendStatus);
     int queueId = send.getMessageQueue().getQueueId();
     //            System.out.println("queueId====="+queueId);
     System.out.println("发送结果"+send);
     //线程休眠1s
     TimeUnit.SECONDS.sleep(1);
 }


3.9.5.2 消费消息


正常消费


3.9.6 批量消息


3.9.6.1 生产消息

Message message = new Message("orderTopic", "orderTag", (("hello world")+ (2)).getBytes());
Message message1 = new Message("orderTopic", "orderTag", (("hello world")+(3)).getBytes()); 
Message message2 = new Message("orderTopic", "orderTag", (("hello world")+(4)).getBytes());
Message message3 = new Message("orderTopic", "orderTag", (("hello world")+(5)).getBytes());
List<Message> msgs = new ArrayList<>();
msgs.add(message);
msgs.add(message1);
msgs.add(message2);
msgs.add(message3);
///发送消息
System.out.println("开始时间==="+System.currentTimeMillis());
SendResult send = producer.send(msgs);


3.9.6.2 消费消息


正常消费


3.9.7 过滤消息


3.9.7.1 tag过滤


在订阅tag时可以通过*或者||指定订阅的tag

consumer.subscribe("baseTopic", "baseTag || orderTag");
consumer.subscribe("baseTopic", "*");


3.9.7.2 sql过滤


生产消息时可以给消息指定一个特定的属性

Message message = new Message("orderTopic", "orderTag", (("hello world") + (2)).getBytes());
message.putUserProperty("type", i+"");



这样在消费者订阅消息时可以通过简单sql过滤

consumer.subscribe("orderTopic", MessageSelector.bySql("type>1"));


批量消息不能使用过滤


3.9.8 事务消息


image


3.9.8.1 概述


具体查看


发送事务消息时有三种状态


  1. 提交 提交后消息才能被消费
  2. 回滚 回滚后消息被删除
  3. 中间 没有提交或回滚的状态,这个状态下rocketmq会去回查本地事务,确定要提交还是回滚


3.9.8.2 发送消息

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("transactionGroup");
        transactionMQProducer.setNamesrvAddr("192.168.164.128:9876;192.168.164.129;");
        transactionMQProducer.setInstanceName("transactionProducer");
        transactionMQProducer.setTransactionListener(new TransactionListener() {
            //发送半消息后触发,是提交还是回滚
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("message: "+message.getUserProperty("type")+"开始检查事务状态");
                if(StringUtils.equals("commit",message.getUserProperty("type"))){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if (StringUtils.equals("rollback",message.getUserProperty("type"))){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }else{
                    return LocalTransactionState.UNKNOW;
                }
            }
            //当消息状态处于中间状态时,rocketmq会回调此方法,验证此消息是发送还是回滚
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("message: "+messageExt.getMsgId()+"触发回查");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        transactionMQProducer.start();
        Message message = new Message("transactionTopic", "transcationTag", "hello world message0".getBytes());
        message.putUserProperty("type", "commit");
        Message message1 = new Message("transactionTopic", "transcationTag", "hello world message1".getBytes());
        message1.putUserProperty("type", "rollback");
        Message message2 = new Message("transactionTopic", "transcationTag", "hello world message2".getBytes());
        SendResult send = transactionMQProducer.sendMessageInTransaction(message,null);
        System.out.println("发送结果=="+send);
        Thread.sleep(2000);
        SendResult send1 = transactionMQProducer.sendMessageInTransaction(message1,null);
        System.out.println("发送结果=="+send1);
        Thread.sleep(2000);
        SendResult send2 = transactionMQProducer.sendMessageInTransaction(message2,null);
        System.out.println("发送结果=="+send2);
        Thread.sleep(100000);
        transactionMQProducer.shutdown();



3.9.8.3 限制


1.不支持延时消息和批量消息


2.事务消息可能被多次回查,可能出现重复消费情况


3.通过设置用户属性来设置服务端的回查间隔时间

message2.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,"5");


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享: