RocketMQ主题的自动创建机制

简介: 问题在学习RocketMQ的时候,有几个疑问。如果主题不存在,client把消息发给谁呢?当发送消息给不存在的主题时,主题是什么时候创建的呢?

猜测


当我执行下面代码时,主题不存在,那么什么时候创建的主题"TopicTest202112151152"呢?


  Message msg = new Message("TopicTest202112151152" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg,1000000000);


其实我当时猜测的是可能发现主题不存在时先给服务器发个消息,让其创建主题,然后再发送消息。

结果是:发送消息的时候创建主题


问题1:client发送消息,主题不存在给谁发?


源码跟踪


以下面一段代码为例,要给“TopicTest202112151154”主题发送消息,发送的内容是时间字符串,跟producer.send方法


// Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    // Launch the instance.
    producer.start();
    // Create a message instance, specifying topic, tag and message body.
    Message msg =
        new Message(
            "TopicTest202112151154",
            "TagA",
            (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // Call send message to deliver message to one of brokers.
    SendResult sendResult = producer.send(msg, 1000000000);
    System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();

跟到DefaultMQProducerImpl###sendDefaultImpl方法


private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      //...
      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
      //....
      //...发送消息
    }


跟到DefaultMQProducerImpl###tryToFindTopicPublishInfo方法


private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //首先从本地缓存中获取,因为主题不存在,所以返回null
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //然后从NameServer获取,因为主题不存在,所以返回一个不Ok的TopicPublishInfo 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //因为TopicPublishInfo不Ok
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //重新获取主题,该方法是重点,跟进去
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }


跟到MQClientInstance###updateTopicRouteInfoFromNameServer方法

在该方法中获取默认的主题“TBW102”主题在NameServer的路由信息,把新主题的路由信息参考“TBW102”复制一份,此时在客户端上已经认为新主题已经创建好,不过在服务器端是没有创建好改主题的。


    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //获取默认主题defaultMQProducer.getCreateTopicKey(),即TBW102的路由信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                            //省略。。。
                    }
                    //然后按照TBW102的topicRouteData把新主题的topicRouteData创建出来,此时客户端就有了新主题的路由信息(实际是TBW102的路由信息)   
        return false;
    }


  此时客户端就有新主题的路由信息了,但是路由信息对应的broker上是没有该主题的信息的,不过客户端此时已经知道把消息发给哪个IP了。


问题回答


客户端如果获取的主题信息不存在,会根据“TBW102”主题的信息创建新主题,然后把该新主题的信息存储到客户端本地,此时客户端知道给哪个IP发数据了,然后客户端就会和那个IP的Netty建立连接,然后发数据,Ok了。


问题2:broker收到消息后发现主题不存在,什么时候创建?


从哪开始打断点


首先你要会Netty,这样按照常理你就能知道逻辑在SimpleChannelInboundHandler里。

那么去哪找SimpleChannelInboundHandler呢,应该先找到NettyServer。NettyServer应该在Broker的启动源码里去找。


BrokerController###start方法里有下面的代码


if (this.remotingServer != null) {
            this.remotingServer.start();
        }


remotingServer的实现类选择NettyRemotingServer,里面的start方法里有如下代码


 ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });


其中serverHandler就是MQ自定义的方法,顺藤摸瓜,就找到了NettyServerHandler的channelRead0方法


NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)


   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }


开始跟源码


当客户端发送消息时,broker的断点会停在下面的processRequestCommand这一行


NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)


   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }


NettyRemotingAbstract###processRequestCommand方法

RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd)把任务提交,会到下面代码里的run匿名类里


public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
        if (pair != null) {
      Runnable run =
          new Runnable() {
            @Override
            public void run() {
              try {
                doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                final RemotingResponseCallback callback =
                    new RemotingResponseCallback() {
                      @Override
                      public void callback(RemotingCommand response) {
                        doAfterRpcHooks(
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        if (!cmd.isOnewayRPC()) {
                          if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                              System.out.println(response);
                              ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                              log.error("process request over, but response failed", e);
                              log.error(cmd.toString());
                              log.error(response.toString());
                            }
                          } else {
                          }
                        }
                      }
                    };
                if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                  AsyncNettyRequestProcessor processor =
                      (AsyncNettyRequestProcessor) pair.getObject1();
                  processor.asyncProcessRequest(ctx, cmd, callback);
                } else {
                  NettyRequestProcessor processor = pair.getObject1();
                  RemotingCommand response = processor.processRequest(ctx, cmd);
                  callback.callback(response);
                }
              } catch (Throwable e) {
                log.error("process request exception", e);
                log.error(cmd.toString());
                if (!cmd.isOnewayRPC()) {
                  final RemotingCommand response =
                      RemotingCommand.createResponseCommand(
                          RemotingSysResponseCode.SYSTEM_ERROR,
                          RemotingHelper.exceptionSimpleDesc(e));
                  response.setOpaque(opaque);
                  ctx.writeAndFlush(response);
                }
              }
            }
          };
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }
            try {
                 //使用线程池把任务提交
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
    }


然后跟SendMessageProcessor###asyncProcessRequest方法


public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
    }


然后跟SendMessageProcessor###asyncProcessRequest方法


public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    //走这个分支
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }


然后跟SendMessageProcessor###asyncSendMessage方法

方法里有一个preSend方法


private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        //省略
    }


然后跟SendMessageProcessor###asyncSendMessage方法


private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                    SendMessageRequestHeader requestHeader) {
       //省略
        //检查主题的问题
        super.msgCheck(ctx, requestHeader, response);
        //省略
    }


跟进AbstractSendMessageProcessor###msgCheck方法


 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
            //省略
            //broker上创建主题,跟进去
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
           //省略
    }


TopicConfigManager###createTopicInSendMessageMethod

该方法会创建主题并且持久化,此时主题在broker中存在但是NameServer不存在


public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                            //创建新主题的topic信息
                            topicConfig = new TopicConfig(topic);
                            int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
                            if (queueNums < 0) {
                                queueNums = 0;
                            }
                            topicConfig.setReadQueueNums(queueNums);
                            topicConfig.setWriteQueueNums(queueNums);
                            int perm = defaultTopicConfig.getPerm();
                            perm &= ~PermName.PERM_INHERIT;
                            topicConfig.setPerm(perm);
                            topicConfig.setTopicSysFlag(topicSysFlag);
                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                        } 
                    } 
                    if (topicConfig != null) {
                        //持久化
                        this.persist();
                    }
                } 
        return topicConfig;
    }


###ConfigManager###persist方法


public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            //我的值是C:\Users\25682\store\config\topics.json
            String fileName = this.configFilePath();
            try {
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }


MixAll###string2File


//str为最新的全部topic信息
public static void string2File(final String str, final String fileName) throws IOException {
        //先把str存到topics.json.tmp里
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
        //把topics.json里的数据存储到topics.json.bk里
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }
        //删除topics.json
        File file = new File(fileName);
        file.delete();
        //把topics.json.tmp重命名为topics.json
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }


TBW102主题的作用


Producer 在发送消息时,默认情况下,不需要提前创建好 Topic,如果 Topic 不存在,Broker 会自动创建 Topic。但是新创建的 Topic 它的权限是什么?读写队列数是多少呢?这个时候就需要用到TBW102

了,RocketMQ 会基于该 Topic 的配置创建新的 Topic。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 安全 物联网
MQTT常见问题之新增自定义主题后平台侧收不到发布的数据如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
消息中间件 弹性计算 物联网
MQTT常见问题之发布MQTT主题消息失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
消息中间件 存储 监控
|
传感器 物联网
详解MQTT主题和通配符
详解MQTT主题和通配符
872 0
详解MQTT主题和通配符
|
7月前
|
消息中间件 存储 运维
|
7月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
105 0
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
105 0
|
5月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
71 0