四.RocketMQ极简入门-RocketMQ顺序消息发送

简介: RocketMQ极简入门-RocketMQ顺序消息发送

前言

在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。

顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?

全局有序消息

在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。

下面我们使用一个订单来模拟顺序消息,订单状态有:创建 ,支付,发货。需要按照顺序发送和消费消息

订单实体

public class Order {
   
   
    private Long id;
    private String name;
    private String status;

    public Order() {
   
   
    }

    public Order(Long id, String name, String status) {
   
   
        this.id = id;
        this.name = name;
        this.status = status;
    }

    public Long getId() {
   
   
        return id;
    }

    public void setId(Long id) {
   
   
        this.id = id;
    }

    public String getName() {
   
   
        return name;
    }

    public void setName(String name) {
   
   
        this.name = name;
    }

    public String getStatus() {
   
   
        return status;
    }

    public void setStatus(String status) {
   
   
        this.status = status;
    }
}

发送者

生产者通过 producer.setDefaultTopicQueueNums(1); 把队列数量设置成1,然后正常发送消息

public class Producer {
   
   

    //演示消息同步发送
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
   
   
        //生产者
        DefaultMQProducer producer = new DefaultMQProducer("order-producer");

        //设置name server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //队列数量,1个
        producer.setDefaultTopicQueueNums(1);
        //启动
        producer.start();

        for (long i = 0 ; i < 4 ; i++){
   
   

            Order order = new Order(i,"订单"+i,"创建");

            //添加内容
            byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

            Message message = new Message("order-topic","order",bytes);

            message.setKeys("key-"+i);

            //执行发送第一个消息
            SendResult result = producer.send(message);

            System.out.println(result);

            //====================================================================
            order.setStatus("支付");
            //添加内容
            bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
            message = new Message("order-topic","order",bytes);
            message.setKeys("key-"+i);

            //执行发送
            result = producer.send(message);

            System.out.println(result);

            //====================================================================
            order.setStatus("发货");
            //添加内容
            bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
            message = new Message("order-topic","order",bytes);
            message.setKeys("key-"+i);

            //执行发送
            result = producer.send(message);
             //打印结果
            System.out.println(result);
        }

        producer.shutdown();
    }
}

发送结果如下

SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC450000, offsetMsgId=AC1028C700002A9F0000000000638E1D, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=48]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4E0001, offsetMsgId=AC1028C700002A9F0000000000638EF8, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=49]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4F0002, offsetMsgId=AC1028C700002A9F0000000000638FD3, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC510003, offsetMsgId=AC1028C700002A9F00000000006390AE, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=51]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC550004, offsetMsgId=AC1028C700002A9F0000000000639189, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=52]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC560005, offsetMsgId=AC1028C700002A9F0000000000639264, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=53]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC580006, offsetMsgId=AC1028C700002A9F000000000063933F, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=54]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC590007, offsetMsgId=AC1028C700002A9F000000000063941A, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=55]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5B0008, offsetMsgId=AC1028C700002A9F00000000006394F5, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=56]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5D0009, offsetMsgId=AC1028C700002A9F00000000006395D0, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=57]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5F000A, offsetMsgId=AC1028C700002A9F00000000006396AB, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=58]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC62000B, offsetMsgId=AC1028C700002A9F0000000000639786, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=59]

消费者

消费者设置 MessageListenerOrderly 进行顺序消费

public class Consumer {
   
   
    public static void main(String[] args) throws MQClientException {
   
   
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
        //从开始位置消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //广播模式
        //最大线程1个
        //defaultMQPushConsumer.setConsumeThreadMax(1);
        //defaultMQPushConsumer.setConsumeThreadMin(1);
        //同时只拉取一个消息
        //defaultMQPushConsumer.setPullBatchSize(1);

        //订阅
        defaultMQPushConsumer.subscribe("order-topic","order");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
   
   
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   
   
                msgs.forEach(message->{
   
   
                    System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeOrderlyStatus.SUCCESS;
            }

        });

        defaultMQPushConsumer.start();
    }
}

消费结果如下

MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=48, sysFlag=0, bornTimestamp=1632010570822, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570826, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638E1D, commitLogOffset=6524445, bodyCRC=543694636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=49, KEYS=key-0, CONSUME_START_TIME=1632010570828, UNIQ_KEY=7F000001244418B4AAC25E78BC450000, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=49, sysFlag=0, bornTimestamp=1632010570830, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570830, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638EF8, commitLogOffset=6524664, bodyCRC=400232688, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=50, KEYS=key-0, CONSUME_START_TIME=1632010570832, UNIQ_KEY=7F000001244418B4AAC25E78BC4E0001, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=50, sysFlag=0, bornTimestamp=1632010570831, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570832, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638FD3, commitLogOffset=6524883, bodyCRC=1884939776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, KEYS=key-0, CONSUME_START_TIME=1632010570835, UNIQ_KEY=7F000001244418B4AAC25E78BC4F0002, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=51, sysFlag=0, bornTimestamp=1632010570833, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570836, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006390AE, commitLogOffset=6525102, bodyCRC=1061325741, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570839, UNIQ_KEY=7F000001244418B4AAC25E78BC510003, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=52, sysFlag=0, bornTimestamp=1632010570837, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570837, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639189, commitLogOffset=6525321, bodyCRC=150045809, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570841, UNIQ_KEY=7F000001244418B4AAC25E78BC550004, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=53, sysFlag=0, bornTimestamp=1632010570838, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570839, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639264, commitLogOffset=6525540, bodyCRC=1869836929, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=55, KEYS=key-1, CONSUME_START_TIME=1632010570844, UNIQ_KEY=7F000001244418B4AAC25E78BC560005, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=54, sysFlag=0, bornTimestamp=1632010570840, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570840, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063933F, commitLogOffset=6525759, bodyCRC=507328046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=56, KEYS=key-2, CONSUME_START_TIME=1632010570845, UNIQ_KEY=7F000001244418B4AAC25E78BC580006, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=55, sysFlag=0, bornTimestamp=1632010570841, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570842, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063941A, commitLogOffset=6525978, bodyCRC=697186802, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=57, KEYS=key-2, CONSUME_START_TIME=1632010570847, UNIQ_KEY=7F000001244418B4AAC25E78BC590007, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=56, sysFlag=0, bornTimestamp=1632010570843, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570844, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006394F5, commitLogOffset=6526197, bodyCRC=1309462274, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=58, KEYS=key-2, CONSUME_START_TIME=1632010570850, UNIQ_KEY=7F000001244418B4AAC25E78BC5B0008, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=57, sysFlag=0, bornTimestamp=1632010570845, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570846, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006395D0, commitLogOffset=6526416, bodyCRC=18326191, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=59, KEYS=key-3, CONSUME_START_TIME=1632010570851, UNIQ_KEY=7F000001244418B4AAC25E78BC5D0009, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=58, sysFlag=0, bornTimestamp=1632010570847, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570848, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006396AB, commitLogOffset=6526635, bodyCRC=916761971, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570853, UNIQ_KEY=7F000001244418B4AAC25E78BC5F000A, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=59, sysFlag=0, bornTimestamp=1632010570850, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570850, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639786, commitLogOffset=6526854, bodyCRC=1361468291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570855, UNIQ_KEY=7F000001244418B4AAC25E78BC62000B, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"发货"}

局部有序消息

还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。

比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:

  • 把同一个订单ID的消息放入同一个MessageQueue
  • 保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列
    在这里插入图片描述

    发送者

    使用 MessageQueueSelector 消息队列选择器来把消息路由到不同的队列,下面案例就是把同一个订单的消息:创建,支付,发货 路由到同一个队列,达到局部消费的目的。
    ```java
    //演示消息同步发送
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    //生产者
    DefaultMQProducer producer = new DefaultMQProducer("order-producer2");

    //设置name server地址
    producer.setNamesrvAddr("127.0.0.1:9876");

    //发送消息超时时间
    producer.setSendMsgTimeout(1000);
    //启动
    producer.start();

    for (long i = 0 ; i < 4 ; i++){

    Order order = new Order(i,"订单"+i,"创建");

    //添加内容
    byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);

    Message message = new Message("order-topic","order2",bytes);
    message.setKeys("key-"+i);

    //执行发送
    SendResult result = producer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long id = (Long) arg;
            //使用取模算法确定id存放到哪个队列
            int index =(int) (id % mqs.size());
            //index就是要存放的队列的索引
            return mqs.get(index);
        }
        //把订单ID作为参数,作为选择器的基础数据
    },order.getId());
    //打印结果
    System.out.println(result);

    //====================================================================
    order.setStatus("支付");
    //添加内容
    bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
    message = new Message("order-topic","order2",bytes);
    message.setKeys("key-"+i);

    //执行发送
    result = producer.send(message,new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long id = (Long) arg;
            //使用取模算法确定id存放到哪个队列
            int index =(int) (id % mqs.size());
            //index就是要存放的队列的索引
            return mqs.get(index);
        }
    },order.getId());
    //打印结果
    System.out.println(result);

    //====================================================================
    order.setStatus("发货");
    //添加内容
    bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
    message = new Message("order-topic","order2",bytes);
    message.setKeys("key-"+i);

    //执行发送
    result = producer.send(message,new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long id = (Long) arg;
            //使用取模算法确定id存放到哪个队列
            int index =(int) (id % mqs.size());
            //index就是要存放的队列的索引
            return mqs.get(index);
        }
    },order.getId());
    //打印结果
    System.out.println(result);


}

producer.shutdown();

}


## 消费者
消费者一样可通过  MessageListenerOrderly 进行顺序消费

```java
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
        //从开始位置消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //广播模式
        //最大线程1个
        //defaultMQPushConsumer.setConsumeThreadMax(1);
        //defaultMQPushConsumer.setConsumeThreadMin(1);
        //同时只拉取一个消息
        //defaultMQPushConsumer.setPullBatchSize(1);

        //订阅
        defaultMQPushConsumer.subscribe("order-topic","order");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                msgs.forEach(message->{
                    System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeOrderlyStatus.SUCCESS;
            }

        });

        defaultMQPushConsumer.start();
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
6月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
180 16
|
5月前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
62 0
分享一下rocketmq入门小知识
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
263 2
EMQ
|
8月前
|
安全 网络性能优化
MQTT 5.0 报文(Packets)入门指南
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
EMQ
734 11
MQTT 5.0 报文(Packets)入门指南
|
6月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
108 1
|
7月前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
63 0
|
7月前
|
消息中间件 存储 前端开发
RabbitMQ在Java中的完美实现:从入门到精通
本文由木头左介绍如何在Java项目中使用RabbitMQ。RabbitMQ是开源的AMQP实现,支持多种客户端,适合分布式系统中的消息传递。首先需安装Erlang和RabbitMQ,接着在Java项目中添加RabbitMQ客户端库依赖。通过创建连接工厂和连接,建立与RabbitMQ的通信,并展示了创建连接和通道的代码示例。
|
8月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
1062 0
|
8月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
196 0
下一篇
开通oss服务