RocketMQ发送消息原理(含事务消息)

简介: 本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。

前言

上一篇文章已经介绍了RocketMQ的功能,架构,从本文开始,我们将开始深入源码层面,一步一步学习RocketMQ设计原理。

在消息队列中,生产者负责发送消息到Broker,本文分享RocketMQ发送消息的实现原理以及一些注意的事项。

一、生产者端的发送流程

一般来说我们的业务应用端是生产者,负责和Broker和nameserver通信完成消息投递的功能。

image.png

在源码中,发送消息的主逻辑在 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl中,阅读起来不难,逻辑比较清晰

image.png

我把发送端的主线流程理了一张流程图。

主要步骤

1、根据Topic从nameserver或本地获取路由信息,包含topic的队列信息,broker信息等

在RocketMQ的生产者端,使用ConcurrentHashMap将topic关联的队列信息进行缓存

private final ConcurrentMap<String/* topic */, TopicPublishInfo> 
topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

2、根据重试次数,循环发送消息

3、使用生产者负载均衡策略(默认轮训),查找需要把消息发送哪个队列

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
   
   
        if (lastBrokerName == null) {
   
   
            return selectOneMessageQueue();
        } else {
   
   
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
   
   
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
   
   
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

这里设计比较巧妙,使用到ThreadLocal记录上次发送的队列索引,消费者轮训负载均衡策略也使用到了该技巧。

image.png

4、消息内容组装成RemotingCommand对象,包括请求头和请求体

image.png

5、分oneway,sync,async的方式进行发送

image.png

6、如果是async,oneway会获取令牌再发送

7、组装请求头,调用netty组件发送消息,最终调用Channel将消息内容写到socket

image.png

8、发送结果处理,这里只有同步发送模式才直接处理结果

image.png

如果是异步,会在发送时指定一个回调函数,在回调函数中处理结果。

image.png

  • 回调接口
public interface SendCallback {
   
   
    void onSuccess(final SendResult sendResult);

    void onException(final Throwable e);
}

二、Broker端接收发送消息请求与处理流程

broker这边入口是netty监听客户端消息的地方,在NettyServerHandler

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
   
   

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
   
   
        //接收客户端消息入口
        processMessageReceived(ctx, msg);
    }
}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
   
   
    final RemotingCommand cmd = msg;
    if (cmd != null) {
   
   
        switch (cmd.getType()) {
   
   
            case REQUEST_COMMAND:
                //消息请求    
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

紧接着发送消息的请求处理器

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
   
   
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
   
   
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                //解析消息头
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
   
   
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                //存储消息前回调函数
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                //消息处理
                if (requestHeader.isBatch()) {
   
   
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
   
   
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }
                //存储消息后回调函数
                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

消息存储逻辑在org.apache.rocketmq.store.CommitLog#putMessage 最终会将消息写入到commitlog

org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
   
   
        // 记录消息存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());

        // Back to Results
        AppendMessageResult result = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        //写消息加锁
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
   
   
            //.....
            //消息内容存储到commitlog
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //....
        } finally {
   
   
            putMessageLock.unlock();
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        //刷盘处理  
        handleDiskFlush(result, putMessageResult, msg);
        //高可用处理,将消息同步到从节点
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

上面追加消息到commitlog,其实还没有真正的持久化

result = mappedFile.appendMessage(msg, this.appendMessageCallback);

而是通过刷盘机制刷新数据到磁盘,判断刷盘方式,如果是同步刷盘,立即刷新缓冲数据到磁盘

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
   
   
        // Synchronization flush 同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
   
   
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
   
   
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
   
   
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
   
   
                service.wakeup();
            }
        }
        // Asynchronous flush 异步刷盘
        else {
   
   
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
   
   
                flushCommitLogService.wakeup();
            } else {
   
   
                commitLogService.wakeup();
            }
        }
    }

这个地方,同步刷盘,设计者使用了多个线程来实现刷盘功能,具有超时的能力,如果指定时间没有刷盘完成会立即返回,不会阻塞请求。

最终调用 java.nio.MappedByteBuffer#force0sun.nio.ch.FileDispatcherImpl#force0 从缓冲写入磁盘。


public int flush(final int flushLeastPages) {
   
   
        if (this.isAbleToFlush(flushLeastPages)) {
   
   
            if (this.hold()) {
   
   
                int value = getReadPosition();

                try {
   
   
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
   
   
                        //真正的刷新到磁盘
                        this.fileChannel.force(false);
                    } else {
   
   
                        //真正的刷新到磁盘
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
   
   
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
   
   
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

通过同步树刷盘异步刷盘可用在一定程度上保证消息不丢失,rocketmq还支持集群模式,主从同步模式支持同步或异步,实现数据在多个节点上备份。

//等5s,如果slave未返回,则超时
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
   
   
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
   
   
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
   
   //判断开关有没有开
                // Determine whether to wait 判断一下slave是否正常
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
   
   
                    //master里面最新的消息偏移量
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
   
   
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
   
   
                    //从节点与主节点同步的数据 超过256M了,slave不行了
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

    }

我将broker端接收发送消息请求后的总体处理过程整理到下面流程图了

image.png

核心步骤总结如下:

1、netty接收到请求,转发到SendMessageProcessor处理器

2、消息头解码

3、判断是是重试消息,并且判断是否达到最大重试次数,如果到了则转换topic和队列,加入死信队列

4、是否是事务消息,转换内置的事务消息topic和queueId

5、不是事务消息,判断是否是延迟消息,是延迟消息,转换成延迟消息topic(SCHEDULE_TOPIC_XXXX)和队列(延迟等级-1,延迟等级从1开始到18)

6、创建或获取消息文件,bytebuffer

7、通过bytebuffer写入缓冲

8、如果是SYNC_FLUSH刷盘方式,立即刷盘 ,刷盘类型有同步和异步两种

9、如果Broker的角色设置主从同步是SYNC_MASTER, 需要同步到从节点,这里是RocketMQ实现高可用的关键

如果是事务消息,发送流程是怎样的?

在Rocketmq中,事务消息是用来保证本地事务和发送消息逻辑同时成功的一种机制。

事务消息标记存在消息的properties,第一步是将properties解码

public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";

然后将消息发送到一个内置的topic里

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
   
   
        //暂存真实topic和队列id到properties    
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        //topic 转换成内置事务消息topic    
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

在客户端发送事务消息

首先需要实现一个事务消息监听器,实现TransactionListener接口,分别实现本地事务逻辑,检查本地事务状态的逻辑

public class TransactionListenerImpl implements TransactionListener {
   
   
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
   
   
        //执行本地事务逻辑
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    //查询本地事务执行结果
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
   
   
        System.out.println("check msg" + msg.getMsgId() +"===" + LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
   
   
            switch (status) {
   
   
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

然后通过事务消息发送器发送消息

public static void main(String[] args) throws MQClientException, InterruptedException {
   
   

        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
   
   
            @Override
            public Thread newThread(Runnable r) {
   
   
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        //指定事务消息发送监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);

        producer.start();

        String[] tags = new String[]{
   
   "TagA", "TagB", "TagC", "TagD", "TagE"};
        try {
   
   
            Message msg =
                    new Message("TopicTest1234", tags[ tags.length], "KEY",
                            ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
   
   
            e.printStackTrace();
        }
        producer.shutdown();
    }

看完源码,我们再看发送事务消息流程图就容易理解了。

image.png

事务消息流程总结

1、客户端使用同步发送半消息,broker将半消息存储到内置的事务消息topic和队列,这个时候事务消息不能被消费者消费到。

2、客户端接收发送消息返回,执行本地事务,然后发送本地事务执行结果到broker

3、broker如果发现成功,将半消息转移到真实topic和队列删除半消息,这个时候事务消息可被消费者消费,如果回滚,直接删除半消息

4、broker启用一个线程,扫描事务消息topic里的队列里面的消息,判断是否需要检查事务状态(最大检查15次)

5、通过oneway方式向客户端发起查询事务状态请求,客户端查询状态,客户端通过oneway发送事务状态到broker,broker执行第3步骤。 oneway的请求是不等待结果的,只管发请求, 这里使用oneway是为了提高回查效率,避免阻塞,同时客户端接收到事务状态查询请求后,会主动将事务状态发送给broker。

���息发送篇总结

本文分析了消息发送和broker处理消息发送请求的实现,得出结论

1、生产者发送消息会发送到指定的topic队列,默认采用轮训算法实现发送的负载均衡

2、发送消息类型有3种,分别是同步,异步,单次(oneway),其中同步会重试3次

3、broker存储消息采用mmap机制实现零拷贝,刷盘机制支持同步刷盘和异步刷盘

4、broker主从复制模式支持同步复制,异步复制

5、事务消息采用内置topic+消息回查机制实现本地事务和发送逻辑的事务一致性。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
4天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
16天前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
41 2
|
2月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
2月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
2月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
2月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
2月前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
51 0
|
3月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
3月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决