详解RocketMQ使用

简介: 详解RocketMQ使用

1.环境

RocketMQ版本:4.7.1

Maven依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>
 

2.生产者、消费者的模式

生产者有三种生产模式:

  • 同步,生产者等待broker的响应后再往下走。
  • 异步,生产者不等待broker的响应,继续往下走,broker的响应通过事件监听的方式,触发回调函数,通知生产者。
  • 单向,生产者只管发,不管broker的响应,这种模式没办法做消息丢失后的补救

消费者有两种消费模式:

  • 主动拉取
  • 等待推送

生产者示例:

public static void main(String[] args) throws MQClientException, InterruptedException {
    //创建消费者,创建的时候可以指定该消费者属于哪个消费者组
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //指定name server的地址
        producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        producer.start();
    //发送一千条信息
        for (int i = 0; i < 1000; i++) {
            try {
              //消息,topic为TopicTest,后面跟的一串是tag
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //同步发送
                SendResult sendResult = producer.send(msg);
        /**异步发送,通过自定义回调函数的方式来触发响应
        producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                }**/
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

消费者示例:

推模式:

public static void main(String[] args) throws InterruptedException, MQClientException {
    //创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        //设置name server
        consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

拉模式:

private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
 
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
 
        consumer.shutdown();
    }
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;
 
        return 0;
    }
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }
 
}

3.顺序消息

有些场景中我们需要保证消息的有序性,比如以下场景:

1.开通会员,将积分初始化为10分

2.完成赚取积分的操作,+5分

3.完成违规操作,-10分

最后剩余积分5分

如果消息乱序为321,最后积分为10分。


生产者将消息顺序的发到一个MessageQueue上,然后消费者去一个MessageQueue上拿消息,就能保证消息的顺序性。


RocketMQ支持生产者、消费者在生产、消费的时候指定MessageQueue,但是具体怎样利用这一点来实现顺序消费,需要开发人员去手写。


生产者:

for (int i = 0; i < 100; i++) {
                for(int j=0;i<5;i++) {
                    //每一个订单用同一个orderId
                    int orderId = i;
                    Message msg =
                            new Message("TopicTest","order_"+orderId, "KEY" + orderId,
                                    ("order_" + orderId+"step"+j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        /**
                         * 回调函数
                         * @param mqs broker中的所有MessageQueue
                         * @param msg 发送的消息
                         * @param arg 就是传过来的orderId
                         * @return
                         */
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            //每个订单的message用同一个orderId,必然会选中同一个MessageQueue,自然会顺序存进去
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
            }

消费者:

//MessageListenerOrderly这种类型的监听器,会让consumer一直去读一个messagequeue的内容,一直读完为止
consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("收到的消息内容:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


4.广播消息

默认情况下,topic中的一条消息只会被一个消费者所消费。广播模式,topic中的一条消息,可以被订阅该topic的所有消费者消费。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
           ConsumeConcurrentlyContext context) {
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    });
    consumer.start();
    System.out.printf("Broadcast Consumer Started.%n");
    }

5.延迟消息

延迟消息,即指定消息在MQ的一个驻留时间,过多少时间后,过了驻留时间,消费者才能消费到消息。

延迟消息可以用来做定时任务。

API上来说,就是在生产者端,给消息指定一个延迟级别即可:

//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//3即第三个等级,10s
msg.setDelayTimeLevel(3);

很明显上面的延迟级别都是定死的,如果我们要用来做定时任务,需要自定义延迟级别。开源版的RocketMQ不支持自定义延迟级别,只有商业版(阿里云上部署的RocketMQ)才支持。


开源版的RocketMQ只能自己去改源码,所以这里也是各个公司做自己定制化的RocketMQ时,一个改造的重点

6.批量消息

RocketMQ支持生产者批量发送消息,以此去减少生产者的网络IO,批量消息只和生产者有关,消费者仍然是按照一条一条的去消费的。

public class SimpleBatchProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();
 
        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
 
        producer.send(messages);
    }
}

7.过滤消息

通过tag,消费者可以消费同一个topic下自己感兴趣的消息

生产者:

public class TagFilterProducer {
 
    public static void main(String[] args) throws Exception {
 
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
 
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
 
        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
 
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
 
        producer.shutdown();
    }

消费者:

public class TagFilterConsumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
 
        consumer.subscribe("TagFilterTest", "TagA || TagC");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.printf("Consumer Started.%n");
    }
}

8.事务消息

事务消息,即发送到MQ的消息能和本地事务一起被回滚。

MQ消息和业务之间其实会存在类似于分布式事务的一致性问题,举个例子:


以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。很明显这几个步骤的调用都应该是在一个事务中的,这些调用都可以用MQ做成移步的,那么问题就来了。这些业务中一旦有一个业务失败,其它业务应该同时失败,但是消息已经发出去了怎么办?


这时候事务消息就派上用场了。


事务消息的生产者在发送消息时,会将消息转为一个half(半消息),并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC这个Topic,这个Topic对消费者是不可见的,当本地事务执行成功后会向MQ发送commit信号,MQ再将消息转为原本的Topic,当本地事务执行失败后会向MQ发送roll_back信号,MQ会丢弃对应消息。如果本地事务没有执行完,可以向MQ发送一个unknown信号,MQ收到unknow信号后,会让对应的消息等待,并且定期回查状态为unknown的消息。


自定义本地事务的提交逻辑和检查逻辑:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
 
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
 
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }
 
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
 
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
 
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
 
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
 
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 Dubbo
RocketMQ(二)
RocketMQ(二)
|
3月前
|
消息中间件 Java 大数据
RocketMQ
【8月更文挑战第29天】RocketMQ
75 15
|
6月前
|
消息中间件 监控 API
RocketMQ 5.0
RocketMQ 5.0 是一款分布式消息中间件,由阿里巴巴开源,提供了高性能、高可靠、高可扩展性的消息传递服务。它采用发布/订阅模式,支持多种消息协议,如 JMS、MQTT 等,可用于构建企业级应用的异步消息处理、系统解耦、流量削峰等场景。
155 4
|
4月前
|
消息中间件 Java RocketMQ
【RocketMQ系列九】SpringCloudStream整合RocketMQ
【RocketMQ系列九】SpringCloudStream整合RocketMQ
374 1
|
4月前
|
消息中间件 存储 Java
【RocketMQ系列一】初识RocketMQ
【RocketMQ系列一】初识RocketMQ
47 1
|
6月前
|
消息中间件 存储 Cloud Native
RocketMQ的初步认识
RocketMQ的初步认识
55 0
|
消息中间件 存储 JSON
RocketMQ使用总结
RocketMQ使用总结
181 0
|
消息中间件 存储 运维
RocketMq
RocketMq
121 0
|
消息中间件 存储 缓存
RocketMQ参数约束和建议
Apache RocketMQ 系统中存在很多自定义参数和资源命名,您在使用 Apache RocketMQ 时建议参考如下说明规范系统设置,避对某些具体参数设置不合理导致应用出现异常。
228 0
|
消息中间件 监控 Java
RocketMq介绍
RocketMq介绍
250 0