RocketMQ高手之路系列之五:RocketMQ之消息发送(二)

简介: 本文主要介绍了消息发送前的准备,包括了消息的有效性验证以及路由信息查找以确认消息投递到哪个具体的Broker节点之上。下篇文章将正式阐述消息的发送。

引言

在上篇博文中,我们介绍了消息发送之前,消费生产者启动的流程。生产者启动后,就正式进入消息发送的的流程。本文主要阐述消息的发送的初步流程。

PS:消息生产者的代码模块在cilent模块中。如下:

image.png

  • 消息发送基本流程
  • 总结

一、消息发送基本流程

在介绍消息发送流程之前,我们先来看下RocketMQ的架构图,如下所示:

image.png

架构图中的各个模块的大致作用在前几篇文章中已经介绍过了,这里不再进行赘述。主要通过架构图让大家对RocketMQ中的消息发送有个整体的理解。

消息的发送方式主要包括三种方式:

(1)同步方式;

(2)异步方式:

(3)Oneway方式;

消息的发送流程主要步骤为:消息验证、路由信息查找以及消息发送三个基本步骤。在DefaultMQProducer中可以查看到对应的消息发送方法,如下所示:

 @Override
 public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }
...
@Override
public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }
...
@Override
public void send(Message msg,
        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

大致的调用方式如下图所示:

image.png

消息发动的方式默认采用同步的方式进行,同时默认的超时时间为3s。在发送消息时,我们首先需要知道该消息要发送到哪里,这就像我们寄快递,需要先明确收件人是谁。那么在RocketMQ发送消息前会对消息进行基本的消息验证。确认消息是否符合发送要求。

Validators.checkMessage(msg, this.defaultMQProducer);

具体的代码如下所示:

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
        //消息不能为空
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
        // 主题不能为空
        Validators.checkTopic(msg.getTopic());
        // 消息的body不能为空
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }
    // 消息的body长度不能为0
        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }
    // 消息的body长度不能唱过最大长度4M
        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
    }

消息的校验没有问题则会调用tryToFindTopicPublishInfo(msg.getTopic())方法。我们需要获取主题的路由信息,通过路由信息我们才知道消息需要被投递到哪个具体的Broker节点之上。

我们来一起看下查找主题的路由信息方法,如下所示:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
         //然后从nameServer上更新topic路由信息   
         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
         //然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {      
//第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

在生产者中如果缓存了topic路由信息,路由信息中如果包含了消息队列,那么会进行路由信息的返回。如果没有缓存或者没有队列信息,那么就会向NameServer查询topic的路由信息。TopicPublishInfo属性如下所示:

public class TopicPublishInfo {
  //是否是顺序消息
    private boolean orderTopic = false;
    //是否包含主题路由信息
    private boolean haveTopicRouterInfo = false;
    //主题队列的消息队列
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 没选择一次消息队列,值自增1
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
    public boolean isOrderTopic() {
        return orderTopic;
    }
    public void setOrderTopic(boolean orderTopic) {
        this.orderTopic = orderTopic;
    }
    public boolean ok() {
        return null != this.messageQueueList && !this.messageQueueList.isEmpty();
    }
    public List<MessageQueue> getMessageQueueList() {
        return messageQueueList;
    }
    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
    }
    public ThreadLocalIndex getSendWhichQueue() {
        return sendWhichQueue;
    }
    public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
        this.sendWhichQueue = sendWhichQueue;
    }
    public boolean isHaveTopicRouterInfo() {
        return haveTopicRouterInfo;
    }
    public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
        this.haveTopicRouterInfo = haveTopicRouterInfo;
    }
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
    public int getQueueIdByBroker(final String brokerName) {
        for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
            final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
            if (queueData.getBrokerName().equals(brokerName)) {
                return queueData.getWriteQueueNums();
            }
        }
        return -1;
    }
    @Override
    public String toString() {
        return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
    }
    public TopicRouteData getTopicRouteData() {
        return topicRouteData;
    }
    public void setTopicRouteData(final TopicRouteData topicRouteData) {
        this.topicRouteData = topicRouteData;
    }
}

二、总结

本文主要介绍了消息发送前的准备,包括了消息的有效性验证以及路由信息查找以确认消息投递到哪个具体的Broker节点之上。下篇文章将正式阐述消息的发送。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
371 1
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
79 0
|
消息中间件 存储
RabbitMQ如何保证消息发送成功
RabbitMQ如何保证消息发送成功
137 0
RabbitMQ如何保证消息发送成功
|
消息中间件 数据可视化 API
RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送
233 1
|
消息中间件 存储 Java
Spring Boot 中的 RabbitMQ 消息发送配置
Spring Boot 中的 RabbitMQ 消息发送配置
|
消息中间件 存储 中间件
消息中间件-RocketMQ入门 消息发送的三种方式
消息中间件-RocketMQ入门 消息发送的三种方式
171 0
|
消息中间件 RocketMQ
RocketMQ极简入门-RocketMQ顺序消息发送
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。 顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?
119 0
|
消息中间件 Java RocketMQ
四.RocketMQ极简入门-RocketMQ顺序消息发送
RocketMQ极简入门-RocketMQ顺序消息发送
|
消息中间件 数据可视化 API
三.RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ极简入门-RocketMQ普通消息发送
|
存储 消息中间件 缓存
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理