RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息

简介: RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息


广播消息

广播消息概述

广播消息就是向所有用户发送消息。 如果我们希望所有订阅者都能收到有关某个主题的消息,可以使用广播消息。

举个例子 生产者发送10条消息,有2个订阅者,则这两个订阅者会分别收到10条消息, 而与广播模式相对应的集群模式这是 2个订阅者一共收到10条消息。

Rocketmq 消费者默认是集群的方式消费的,使用广播模式进行消费需要显示设置

核心:消费端设置消息模型 consumer.setMessageModel(MessageModel.BROADCASTING);


演示步骤

  • 启动2个或者2个以上的消费者
  • 启动生产者发送消息
  • 观察2个消费者的消息接收情况 :两个Consumer收到了同样的消息,OK.

生产者:

package com.artisan.rocketmq.broadcast;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 19:22
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        for (int i = 0; i < 4; i++){
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    ("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消费者:

package com.artisan.rocketmq.broadcast;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 19:27
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //广播,全量消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt ext : msgs){
                    System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

测试结果:

生产者:

消费者1:

消费者2:


延时消息

概述

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。


使用场景

举个例子: 电商系统,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。


延时机制

org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

当前支持的延迟时间

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

分别对应级别

1 2 3....................

设置消息时延

Message message = new Message;
message.setDelayTimeLevel(3)

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。


实现原理

延迟队列的核心思路: 【利用中间队列临时存储】—>所有的延迟消息由producer消息发憷之后,都会存放在一个topic下 (SHCEDULE_TOPIC_XXXX), 不同的延迟级别对应不同的队列序号,当延迟时间到了之后,由定时线程读取转换为普通的消息存到真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。


示例

生产者:

package com.artisan.rocketmq.schedule;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.Date;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 17:23
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        int totalMessagesToSend = 3;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            //延时消费  6-->2分钟
            message.setDelayTimeLevel(6);
            // Send the message
            producer.send(message);
        }
        System.out.printf("message send is completed .%n" + new Date());
        producer.shutdown();
    }
}

消费者:

package com.artisan.rocketmq.schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Date;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 17:23
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println(new Date() + "Receive message[msgId=" + message.getMsgId() + "] "
                            + "message content is :" + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        //System.out.printf("Consumer Started.%n");
    }
}

设置的延迟level为6 ,对应的时间间隔是两分钟,OK。


批量消息

批量消息概述

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息

此外,这一批消息的总大小不应超过4MB。rocketmq建议每次批量消息大小大概在1MB。当消息大小超过4MB时,需要将消息进行分割

示例

生产者

package com.artisan.rocketmq.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 21:27
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class BatchProducer {
    public static void main(String[] args) throws Exception {
        /**
         * rocketMq 支持消息批量发送
         * 同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时任务。
         * <strong> 同一批次消息建议大小不超过~1M </strong>,消息最大不能超过4M,需要
         * 对msg进行拆分
         */
        DefaultMQProducer producer = new DefaultMQProducer("batch_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        ListSplitter splitter = new ListSplitter(messages);
        /**
         * 对批量消息进行拆分
         */
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                producer.send(listItem);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

消息拆分

package com.artisan.rocketmq.batch;
import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 21:35
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        //遍历消息准备拆分
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

消费者

package com.artisan.rocketmq.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 21:38
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        consumer.subscribe("BatchTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

代码

请移步: https://github.com/yangshangwei/rocketmqMaster


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
64 0
|
6月前
|
消息中间件 存储 JSON
RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读
RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读
73 0
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 SQL RocketMQ
RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息
RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息
146 0
|
6月前
|
消息中间件 监控 Shell
RocketMQ-初体验RocketMQ(03)_RocketMQ多机集群部署
RocketMQ-初体验RocketMQ(03)_RocketMQ多机集群部署
83 0
|
6月前
|
消息中间件 Java RocketMQ
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
65 0
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
|
6月前
|
消息中间件 API RocketMQ
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
53 0
|
6月前
|
消息中间件 存储 Java
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
196 0
|
消息中间件 运维 算法
RabbitMQ高阶使用延时任务
RabbitMQ高阶使用延时任务
168 0
|
消息中间件 中间件 数据库
RocketMQ 事务消息初体验
事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景、功能原理、实战例子三个模块慢慢为你揭开事务消息的神秘面纱。
5356 8