RocketMQ高手之路系列之四:RocketMQ之消息发送(一)

简介: 本文主要叙述了消息的结构以及在消息发送之前,客户端启动的流程是怎样的,而客户端实例的创建时消息发送的前提。在下一篇文章中,将介绍具体的消息发送流程。

引言

前面章节介绍了RocketMQ的路由管理模块,它所解决的问题是如何让消息的发 送者以及消费者找到正确的地址信息。本章主要阐述消息在发送之前,如何进行客户端启动的。

  • 消息格式
  • 启动生产者
  • 总结

一、消息格式

要想弄清楚消息发送的过程,我们需要了解消息的格式是怎样的。在common模块中对message进行了定义,如下所示:

image.png

以下为message的一个全属性构造函数:

 public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;
        if (tags != null && tags.length() > 0)
            this.setTags(tags);
        if (keys != null && keys.length() > 0)
            this.setKeys(keys);
        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

二、启动生产者

消息生产者相关代码都是在client模块中,DefaultMQProducer该类为默认的消息生产者实现类,它继承了ClientConfig 同时实现了MQProducer 接口。对于RocketMQ,生产者就是发送消息的客户端,所以在发送消息前需要初始化对应的实例来进行消息的发送。

public class DefaultMQProducer extends ClientConfig implements MQProducer {
...
}

类图关系如下所示:

image.png

DefaultMQProducer的相关属性如下所示:

image.png

    /**
     * 生产者所属组
     */
  private String producerGroup;
    /**
     * 默认的topicKey
     */
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    /**
     * 默认主题中的Broker队列的数量
     */
    private volatile int defaultTopicQueueNums = 4;
    /**
     * 消息发送默认超时时间为3s
     */
    private int sendMsgTimeout = 3000;
    /**
     * 消息体大小超过默认值则需要进行压缩,此处默认为4K
     */
    private int compressMsgBodyOverHowmuch = 1024 * 4;
    /**
     * 同步方式发送消息的重试次数,默认为为2次,总共会执行3次
     */
    private int retryTimesWhenSendFailed = 2;
    /**
     * 异方式发送消息的重试次数,默认为为2次
     */
    private int retryTimesWhenSendAsyncFailed = 2;
    /**
     * 消息重试时选择另外一个Broker,是否不等待存储结果就进行返回,默认为false
     */
    private boolean retryAnotherBrokerWhenNotStoreOK = false;
    /**
     * 允许发送消息的最大消息长度,默认为4M
     */
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

接下来,我们来一起看下消息生产者实例到底是怎么进行启动的。首先看下涉及到的主要方法调用如下所示:

image.png

我们来具体看下代码实现,首先是

public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

后面调用defaultMQProducerImpl中的start方法,具体分析如下所示:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
        //检查producerGroup是否符合要求
                this.checkConfig();
        //修改生产者的instanceName为进程的ID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
        //创建MQClientInstance实例
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
        //进行注册
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                if (startFactory) {
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    //想所有的Broker发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

那么在MQClientManager中进行clientId进行生成,源码如如下所示:

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    //创建clientId
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            //维护缓存表,一个clientId对应一个MQClientInstance
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        return instance;
    }

clientId的组成如下所示,它的组成为IP + instanceName + unitName

  public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
        return sb.toString();
    }

三、总结

本文主要叙述了消息的结构以及在消息发送之前,客户端启动的流程是怎样的,而客户端实例的创建时消息发送的前提。在下一篇文章中,将介绍具体的消息发送流程。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
371 1
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
79 0
|
消息中间件 存储
RabbitMQ如何保证消息发送成功
RabbitMQ如何保证消息发送成功
137 0
RabbitMQ如何保证消息发送成功
|
消息中间件 数据可视化 API
RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送
233 1
|
消息中间件 存储 Java
Spring Boot 中的 RabbitMQ 消息发送配置
Spring Boot 中的 RabbitMQ 消息发送配置
|
消息中间件 存储 中间件
消息中间件-RocketMQ入门 消息发送的三种方式
消息中间件-RocketMQ入门 消息发送的三种方式
171 0
|
消息中间件 RocketMQ
RocketMQ极简入门-RocketMQ顺序消息发送
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。 顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?
119 0
|
消息中间件 Java RocketMQ
四.RocketMQ极简入门-RocketMQ顺序消息发送
RocketMQ极简入门-RocketMQ顺序消息发送
|
消息中间件 数据可视化 API
三.RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ极简入门-RocketMQ普通消息发送
|
存储 消息中间件 缓存
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理
从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(3): 消息读写队列,消息存储,消息发送,消息消费关联流程和原理