RocketMQ中生产者发消息前为啥一定要调用start()方法?

简介: RocketMQ中生产者发消息前为啥一定要调用start()方法?

前言

我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("42.192.50.8:9876");
try {
    producer.start();
    producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.shutdown();
}
复制代码

上述代码中,在消息发送之前调用了start()方法,如果不调用start()方法,直接发送消息,那么会出现以下报错:


image.png


报错消息里面很明显地告知我们,目前这个DefaultMQProducer状态没有准备好,还不能发送消息。为了一探究竟,我们得去看看start()里面究竟做了什么操作呢?

一探究竟

我们根据源码一路走下来,可以追踪到DefaultMQProducerImpl.start(final boolean startFactory)这个方法:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 创建MQClientInstance
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 注册Producer到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 启动MQClientInstance实例
                if (startFactory) {
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
复制代码

上述代码主要做了以下几点:

1.创建MQClientInstance实例;

2.注册Producer到MQClientInstance实例中;

3.启动MQClientInstance实例;

MQClientInstance实例并不是每次都会创建的,它创建出来也会缓存的MQClientManager中,不过根据源码来看的话,每次创建Producer都会对应创建一个新的MQClientInstance实例,所以一般情况下不建议一个应用服务中重复创建Producer

最终start()方法的关键实现逻辑还是需要进入MQClientInstance.start()中:

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果namesrv地址为null,那么就需要自己找namesrv地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 开启一个请求响应渠道,没猜错的话,应该是netty实现的
                    this.mQClientAPIImpl.start();
                    // 开启定时任务
                    this.startScheduledTask();
                    // 开启拉消息服务
                    this.pullMessageService.start();
                    // 开启负载均衡服务
                    this.rebalanceService.start();
                    // 再开启一个默认生产者,这个生产者不需要启动MQClientInstance实例
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
复制代码

看样子,这才是start()方法真正要做的事情:

1.找namesrv地址,应该是后面需要使用namesrv地址查询对应的broker

2.开启Netty客户端的初始化,包括与namesrv建立信道;另外开启两个定时任务,一个清除列表中过期的请求,第二个就是筛选可用的namesrv服务;

3.开启一些定时任务;包括如果没有设置namesrv地址的话,会从指定站点拉namesrv地址;清除下线broker并发送心跳给所有的broker等工作;

4.因为当前是生产者,所以pullMessageService很快就结束;

5.生产者不需要做负载均衡,所以rebalanceService很快也结束;

6.给默认创建的生产者执行一下start()方法,其实啥也没做;

上述大多数任务都是给消费者使用的,作为生产者,唯一起作用的就是前三步,查找namesrv地址、第二步与namesrv建立通信以及第三步对broker的一些定时清理工作;不过没有发生消息之前,是不会从远程获取任何数据的。所以综上所述,start()方法里面只做了以下两件事情:

1.与namesrv建立通信渠道,它甚至都没有从namesrv获取任何数据;

2.启动一些定时任务,包括清理下线的broker;

小结

虽然在生产者中,start()方法里面真正做的事情比较少,但是却是非常有必要的。发送消息之前,我们没有使用start()方法,导致消息发送失败,是因为生产者与namesrv之间的通信渠道没有建立。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件 Kubernetes Java
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
21 2
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
|
4天前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
4天前
|
消息中间件 RocketMQ
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
29 1
|
6月前
|
消息中间件 存储 缓存
RocketMQ 生产者那些事
这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。
RocketMQ 生产者那些事
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
625 0
|
消息中间件 RocketMQ 数据格式
你肯定不知道RocketMQ生产者是如何规避故障Broker的
你肯定不知道RocketMQ生产者是如何规避故障Broker的
119 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
481 1
|
消息中间件 RocketMQ
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
217 0
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
|
存储 消息中间件 RocketMQ
RocketMQ学习Broker流程、生产者和存储流程联系
放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。
121 0
RocketMQ学习Broker流程、生产者和存储流程联系
|
4天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总: