RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage

简介: RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage


顺序消息的概念

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。

RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序


顺序消费的原理

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);

而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


消费状态

package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Suspend current queue a moment
     * 不能跳过消息,等待一下
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

演示

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

Producer

package com.artisan.rocketmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 16:38
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        // 订单列表
        List<OrderStep> orderList = buildOrders();
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            // 加个时间前缀
            String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                    body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//订单id
            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }
        producer.shutdown();
    }
    /**
     * 生成模拟订单数据
     */
    private static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("购物车");
        orderList.add(orderDemo);
        return orderList;
    }
    /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;
        public long getOrderId() {
            return orderId;
        }
        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }
        public String getDesc() {
            return desc;
        }
        public void setDesc(String desc) {
            this.desc = desc;
        }
        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }
}

日志


Consumer

package com.artisan.rocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-10 16:38
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        /**
         * 设置消费位置
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        // 有序消费 MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                Random random = new Random();
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序
                    try {
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + ", queueId=" + msg.getQueueId()
                                + ", content:" + new String(msg.getBody(),
                                RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

运行日志


代码

请移步:https://github.com/yangshangwei/rocketmqMaster

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
文字识别 小程序 安全
印刷文字识别操作报错合集之微信小程序调用API时路径总是返回不对,该如何处理
在使用印刷文字识别(OCR)服务时,可能会遇到各种错误。例如:1.Java异常、2.配置文件错误、3.服务未开通、4.HTTP错误码、5.权限问题(403 Forbidden)、6.调用拒绝(Refused)、7.智能纠错问题、8.图片质量或格式问题,以下是一些常见错误及其可能的原因和解决方案的合集。
|
4月前
|
文字识别 前端开发 API
印刷文字识别操作报错合集之通过HTTPS连接到OCR服务的API时报错,该如何处理
在使用印刷文字识别(OCR)服务时,可能会遇到各种错误。例如:1.Java异常、2.配置文件错误、3.服务未开通、4.HTTP错误码、5.权限问题(403 Forbidden)、6.调用拒绝(Refused)、7.智能纠错问题、8.图片质量或格式问题,以下是一些常见错误及其可能的原因和解决方案的合集。
|
4月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
4月前
|
DataWorks 关系型数据库 MySQL
DataWorks操作报错合集之调用CreateQualityRule API时,BlockType参数为0,会报错:"blockType less than minimum",该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
前端开发 JavaScript API
惊天揭秘!AJAX与Fetch API如何让你的前后端交互秒变‘神级操作’!
【7月更文挑战第15天】在Web开发中,AJAX和Fetch API革新了前后端交互,告别了表单提交带来的页面刷新。AJAX利用XMLHttpRequest实现部分页面更新,开启无刷新时代;Fetch API作为现代替代,以其简洁和Promise支持简化异步操作。从AJAX的先驱地位到Fetch API的进化,两者提升了Web应用的性能和用户体验,成为现代开发的必备技能。
49 2
|
4月前
|
搜索推荐 API UED
资源部署及场景API调用体验过程的引导与操作流畅性
资源部署及场景API调用体验过程的引导与操作流畅性
|
4月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
77 1
|
4月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 缓存 Apache
消息队列 MQ使用问题之对于Grpc参数的调优,该如何操作
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章