RocketMQ-消息消费模式 顺序消费

简介: RocketMQ-消息消费模式 顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到




集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条




Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的

如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。

多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作


广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:
生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据




顺序消费

实现生产顺序:12345消费顺序12345

哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程
假设我们没有实现顺序消费的时候
创建生产者
1.创建实体类

@Setter
@Getter
public class OrderStep {
    private long orderId;
    private String desc;
    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
}

2.创建测试类

public class OrderUtil {
    public static List<OrderStep> buildOrders(){
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}

3.创建生产者类

public class Producer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
            producer.setNamesrvAddr("43.143.161.59:9876");
            producer.start();
            String topic = "orderTopic";
            List<OrderStep> orderSteps = OrderUtil.buildOrders();
            for(OrderStep step:orderSteps){
                Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
                producer.sendOneway(msg);
            }
            producer.shutdown();
        }
}

创建消费者类

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了


如何改实现顺序消费

生产者类

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "orderTopic";
        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        //设置队列选择器
        MessageQueueSelector selector = new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                System.out.println("队列个数"+list.size());
                Long orderId = (Long) o;
                int index = (int)(orderId % list.size());
                return list.get(index);
            }
        };
        for(OrderStep step:orderSteps){
            Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
            //指定消息选择器,换入的参数
            producer.send(msg,selector,step.getOrderId());
        }
        producer.shutdown();
    }
}

消费者类

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        //从什么地方开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //一个队列对应一个线程
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for(MessageExt msg:list){
                    System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
13天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
43 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
89 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
79 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
下一篇
开通oss服务