开发者社区> 慕枫技术笔记> 正文

RocketMQ高手之路系列之五:RocketMQ之消息发送(二)

简介: 本文主要介绍了消息发送前的准备,包括了消息的有效性验证以及路由信息查找以确认消息投递到哪个具体的Broker节点之上。下篇文章将正式阐述消息的发送。
+关注继续查看

引言

在上篇博文中,我们介绍了消息发送之前,消费生产者启动的流程。生产者启动后,就正式进入消息发送的的流程。本文主要阐述消息的发送的初步流程。

PS:消息生产者的代码模块在cilent模块中。如下:

image.png

  • 消息发送基本流程
  • 总结

一、消息发送基本流程

在介绍消息发送流程之前,我们先来看下RocketMQ的架构图,如下所示:

image.png

架构图中的各个模块的大致作用在前几篇文章中已经介绍过了,这里不再进行赘述。主要通过架构图让大家对RocketMQ中的消息发送有个整体的理解。

消息的发送方式主要包括三种方式:

(1)同步方式;

(2)异步方式:

(3)Oneway方式;

消息的发送流程主要步骤为:消息验证、路由信息查找以及消息发送三个基本步骤。在DefaultMQProducer中可以查看到对应的消息发送方法,如下所示:

 @Override
 public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }
...
@Override
public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }
...
@Override
public void send(Message msg,
        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

大致的调用方式如下图所示:

image.png

消息发动的方式默认采用同步的方式进行,同时默认的超时时间为3s。在发送消息时,我们首先需要知道该消息要发送到哪里,这就像我们寄快递,需要先明确收件人是谁。那么在RocketMQ发送消息前会对消息进行基本的消息验证。确认消息是否符合发送要求。

Validators.checkMessage(msg, this.defaultMQProducer);

具体的代码如下所示:

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
        //消息不能为空
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
        // 主题不能为空
        Validators.checkTopic(msg.getTopic());

        // 消息的body不能为空
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }
        // 消息的body长度不能为0
        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }

        // 消息的body长度不能唱过最大长度4M
        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
    }

消息的校验没有问题则会调用tryToFindTopicPublishInfo(msg.getTopic())方法。我们需要获取主题的路由信息,通过路由信息我们才知道消息需要被投递到哪个具体的Broker节点之上。

我们来一起看下查找主题的路由信息方法,如下所示:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
         //然后从nameServer上更新topic路由信息   
         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
         //然后再从本地缓存变量topicPublishInfoTable中再get一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {      
//第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

在生产者中如果缓存了topic路由信息,路由信息中如果包含了消息队列,那么会进行路由信息的返回。如果没有缓存或者没有队列信息,那么就会向NameServer查询topic的路由信息。TopicPublishInfo属性如下所示:

public class TopicPublishInfo {
    //是否是顺序消息
    private boolean orderTopic = false;
    //是否包含主题路由信息
    private boolean haveTopicRouterInfo = false;
    //主题队列的消息队列
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    // 没选择一次消息队列,值自增1
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    
    private TopicRouteData topicRouteData;

    public boolean isOrderTopic() {
        return orderTopic;
    }

    public void setOrderTopic(boolean orderTopic) {
        this.orderTopic = orderTopic;
    }

    public boolean ok() {
        return null != this.messageQueueList && !this.messageQueueList.isEmpty();
    }

    public List<MessageQueue> getMessageQueueList() {
        return messageQueueList;
    }

    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
    }

    public ThreadLocalIndex getSendWhichQueue() {
        return sendWhichQueue;
    }

    public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
        this.sendWhichQueue = sendWhichQueue;
    }

    public boolean isHaveTopicRouterInfo() {
        return haveTopicRouterInfo;
    }

    public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
        this.haveTopicRouterInfo = haveTopicRouterInfo;
    }

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

    public int getQueueIdByBroker(final String brokerName) {
        for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
            final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
            if (queueData.getBrokerName().equals(brokerName)) {
                return queueData.getWriteQueueNums();
            }
        }

        return -1;
    }

    @Override
    public String toString() {
        return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
    }

    public TopicRouteData getTopicRouteData() {
        return topicRouteData;
    }

    public void setTopicRouteData(final TopicRouteData topicRouteData) {
        this.topicRouteData = topicRouteData;
    }
}

二、总结

本文主要介绍了消息发送前的准备,包括了消息的有效性验证以及路由信息查找以确认消息投递到哪个具体的Broker节点之上。下篇文章将正式阐述消息的发送。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,大概有三种登录方式:
9831 0
方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消息
方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消息
1108 0
一文看懂RocketMQ生产者发送消息源码解析(中)
一文看懂RocketMQ生产者发送消息源码解析
96 0
RocketMQ 消息发送system busy、broker busy原因分析与解决方案
1、现象 最近收到很多RocketMQ使用者,反馈生产环境中在消息发送过程中偶尔会出现如下4个错误信息之一:1)[REJECTREQUEST]system busy, start flow control for a while2)too many requests and system thre.
1731 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
20531 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
13838 0
一文看懂RocketMQ生产者发送消息源码解析(上)
一文看懂RocketMQ生产者发送消息源码解析
8 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18805 0
RocketMQ高手之路系列之五:RocketMQ之消息发送(二)
本文主要介绍了消息发送前的准备,包括了消息的有效性验证以及路由信息查找以确认消息投递到哪个具体的Broker节点之上。下篇文章将正式阐述消息的发送。
38 0
+关注
慕枫技术笔记
InfoQ签约作者、CSDN博客专家、专注于架构设计、微服务以及云原生技术分享
126
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载