rocketmq发送消息底层分析

简介: 企业中一般都会封装rocketmq 同步 异步 单向方法,你只需要配置好nameserver地址 topic tag 消息体等,然后调用封装方法进行发送即可。流程差不多如下1、导入mq依赖

企业中一般都会封装rocketmq 同步 异步 单向方法,你只需要配置好nameserver地址 topic tag 消息体等,然后调用封装方法进行发送即可。

流程差不多如下

1、导入mq依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

2、消息发送者步骤

创建消息生成者producer,指定组名

指定Nameserver mq地址

启动生产者

创建消息对象,指定Topic、Tag和消息体

发送消息

关闭生产者


3、消费者步骤

创建消费者Consumer,指定组名

指定Nameserver地址

订阅主题Topic和Tag

(listener监听消息,去消费先到的消息)

设置回调函数,处理消息

启动消费者

同步 异步 单向底层区别

//同步
            SendResult send = producer.send(msg);
            //异步
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("===" + sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("e:" + throwable);
                }
            });
            //单向
            producer.sendOneway(msg);

异步相对同步来说send方法多了些参数SendCallback sendCallback。而这个sendCallback重写的方法有成功和失败的方法。

单向相对同步来说换了一个方法sendOneway.

在跟踪源码去看到

异步


同步

具体实现方法

一下源码传参异步,单向的区别在

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

以及switch里面

switch(communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
return sendResult;
}
default:
break label122;
}


具体如下

sendDefaultImpl方法(核心:所有同步、异步、单向)

    private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //校验 Producer 处于运行状态
        this.makeSureStateOK();
        //校验消息格式
        Validators.checkMessage(msg, this.defaultMQProducer);
        long invokeID = this.random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        //获取Topic路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            //发送的队列
            MessageQueue mq = null;
            //异常
            Exception exception = null;
            //最后一次发送结果
            SendResult sendResult = null;
            //计算调用发送消息到成功为止的最大次数:同步、单向调用一次 ;异步3次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            //第几次发送
            int times = 0;
            //存储每次发送消息现在broker
            String[] brokersSent = new String[timesTotal];
            while(true) {
                label122: {
                    String info;
                    if (times < timesTotal) {
                        info = null == mq ? null : mq.getBrokerName();
                        //选择要发送的消息队列
                        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                        if (mqSelected != null) {
                            mq = mqSelected;
                            brokersSent[times] = mqSelected.getBrokerName();
                            long endTimestamp;
                            try {
                                beginTimestampPrev = System.currentTimeMillis();
                                long costTime = beginTimestampPrev - beginTimestampFirst;
                                if (timeout >= costTime) {
                                    //Kernel内核的意思,调用发送消息核心方法
                                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                    endTimestamp = System.currentTimeMillis();
                                    //更新Broker可用性信息,在选择发送到的消息队列时,会参考Broker发送消息的延迟。源码多次操作Broker,可见不懂Broker就无法真正理解mq源码。
                                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                    switch(communicationMode) {
                                    case ASYNC:
                                        return null;
                                    case ONEWAY:
                                        return null;
                                    case SYNC:
                                        if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            return sendResult;
                                        }
                                    default:
                                        break label122;
                                    }
                                }
                                callTimeout = true;
                            } catch (RemotingException var26) {
                                //当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。
                               //这也是我们在生产环境看到可能出现多次重复发送的情况。
                                //更新Broker可用信息,更新继续循环
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
                                this.log.warn(msg.toString());
                                exception = var26;
                                break label122;
                            } catch (MQClientException var27) {
                               //更新Broker可用信息,继续循环
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
                                this.log.warn(msg.toString());
                                exception = var27;
                                break label122;
                            } catch (MQBrokerException var28) {
                                //更新Broker可用信息,部分情况下异常返回,结束循环
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var28);
                                this.log.warn(msg.toString());
                                exception = var28;
                                switch(var28.getResponseCode()) {
                                case 1:
                                case 14:
                                case 16:
                                case 17:
                                case 204:
                                case 205:
                                    break label122;
                                //如果有发送结果,返回,没有,抛出异常
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
                                    throw var28;
                                }
                            } catch (InterruptedException var29) {
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var29);
                                this.log.warn(msg.toString());
                                this.log.warn("sendKernelImpl exception", var29);
                                this.log.warn(msg.toString());
                                throw var29;
                            }
                        }
                    }
                    if (sendResult != null) {
                        return sendResult;
                    }
                    info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
                    info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                    MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                    if (callTimeout) {
                        throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                    }
                    if (exception instanceof MQBrokerException) {
                        mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
                    } else if (exception instanceof RemotingConnectException) {
                        mqClientException.setResponseCode(10001);
                    } else if (exception instanceof RemotingTimeoutException) {
                        mqClientException.setResponseCode(10002);
                    } else if (exception instanceof MQClientException) {
                        mqClientException.setResponseCode(10003);
                    }
                    throw mqClientException;
                }
                ++times;
            }
        } else {
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if (null != nsList && !nsList.isEmpty()) {
                throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
            } else {
                throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
            }
        }
    }

但是异步是在该sendDefaultImpl方法之前做一个异步处理

ExecutorService :那么mq异步用了线程池来提高异步请求的效率!!

    @Deprecated
    public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException {
        final long beginStartTime = System.currentTimeMillis();
        ExecutorService executor = this.getCallbackExecutor();
        try {
            executor.submit(new Runnable() {
                public void run() {
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeout > costTime) {
                        try {
                            DefaultMQProducerImpl.this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                        } catch (Exception var4) {
                            sendCallback.onException(var4);
                        }
                    } else {
                        sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                    }
                }
            });
        } catch (RejectedExecutionException var9) {
            throw new MQClientException("executor rejected ", var9);
        }
    }


NettyRemotingClient定义了多线程



并没有发现是哪种特殊的多线程,因此是通用的多线程。就好比map和hashmap,最后用了map一样。

倒回来看,这里是多线程执行,大大提高了运行效率,并且代码严谨和规范,有判定条件,不同程度的try catch


线程池执行sendSelectImpl方法,传了msg,async异步,sendCallback(成功和失败的方法,发送消息不用等待响应,不管有没有成功或失败,生产者还会发送消息),timeout - costTime(最大等待时间默认3000-消耗时间)

大概看一下入参的情况


这里timeTotal默认是1,异步的时候是3,影响后面循环次数。


那么for循环里面要做什么事情呢

选择消息要发送的队列,准备发送

调用核心发送消息方法

更新Broker可用性信息

上面这个类sendDefaultImpl,有详细注释。

其实还可以继续深入下去,可以模仿rocketmq,自己写一个自定义mq框架那也是很厉害了!!!


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 缓存 物联网
MQTT常见问题之MQTT发送消息到阿里云服务器被拒如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
69 0
|
2月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
15天前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
68 0
RocketMQ—一次连接namesvr失败的案例分析
|
3月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
66 0
|
2月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
80 1
|
3月前
|
存储 缓存 物联网
MQTT常见问题之MQTT发送消息过多内存不够处理不过来如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
570 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
10月前
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何发送消息
3分钟白话RocketMQ系列—— 如何发送消息
240 0
|
3月前
|
消息中间件 Oracle Java
【RocketMq】Broker 启动脚本分析
【RocketMq】Broker 启动脚本分析
73 0