猜测
当我执行下面代码时,主题不存在,那么什么时候创建的主题"TopicTest202112151152"呢?
Message msg = new Message("TopicTest202112151152" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); SendResult sendResult = producer.send(msg,1000000000);
其实我当时猜测的是可能发现主题不存在时先给服务器发个消息,让其创建主题,然后再发送消息。
结果是:发送消息的时候创建主题
问题1:client发送消息,主题不存在给谁发?
源码跟踪
以下面一段代码为例,要给“TopicTest202112151154”主题发送消息,发送的内容是时间字符串,跟producer.send方法
// Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); // Launch the instance. producer.start(); // Create a message instance, specifying topic, tag and message body. Message msg = new Message( "TopicTest202112151154", "TagA", (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)); // Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg, 1000000000); System.out.printf("%s%n", sendResult); // Shut down once the producer instance is not longer in use. producer.shutdown();
跟到DefaultMQProducerImpl###sendDefaultImpl方法
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //... TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); //.... //...发送消息 }
跟到DefaultMQProducerImpl###tryToFindTopicPublishInfo方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //首先从本地缓存中获取,因为主题不存在,所以返回null TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //然后从NameServer获取,因为主题不存在,所以返回一个不Ok的TopicPublishInfo this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } //因为TopicPublishInfo不Ok if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //重新获取主题,该方法是重点,跟进去 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
跟到MQClientInstance###updateTopicRouteInfoFromNameServer方法
在该方法中获取默认的主题“TBW102”主题在NameServer的路由信息,把新主题的路由信息参考“TBW102”复制一份,此时在客户端上已经认为新主题已经创建好,不过在服务器端是没有创建好改主题的。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { //获取默认主题defaultMQProducer.getCreateTopicKey(),即TBW102的路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); //省略。。。 } //然后按照TBW102的topicRouteData把新主题的topicRouteData创建出来,此时客户端就有了新主题的路由信息(实际是TBW102的路由信息) return false; }
此时客户端就有新主题的路由信息了,但是路由信息对应的broker上是没有该主题的信息的,不过客户端此时已经知道把消息发给哪个IP了。
问题回答
客户端如果获取的主题信息不存在,会根据“TBW102”主题的信息创建新主题,然后把该新主题的信息存储到客户端本地,此时客户端知道给哪个IP发数据了,然后客户端就会和那个IP的Netty建立连接,然后发数据,Ok了。
问题2:broker收到消息后发现主题不存在,什么时候创建?
从哪开始打断点
首先你要会Netty,这样按照常理你就能知道逻辑在SimpleChannelInboundHandler里。
那么去哪找SimpleChannelInboundHandler呢,应该先找到NettyServer。NettyServer应该在Broker的启动源码里去找。
BrokerController###start方法里有下面的代码
if (this.remotingServer != null) { this.remotingServer.start(); }
remotingServer的实现类选择NettyRemotingServer,里面的start方法里有如下代码
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } });
其中serverHandler就是MQ自定义的方法,顺藤摸瓜,就找到了NettyServerHandler的channelRead0方法
NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
开始跟源码
当客户端发送消息时,broker的断点会停在下面的processRequestCommand这一行
NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
NettyRemotingAbstract###processRequestCommand方法
RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd)把任务提交,会到下面代码里的run匿名类里
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks( RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { System.out.println(response); ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand( RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { //使用线程池把任务提交 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { }
然后跟SendMessageProcessor###asyncProcessRequest方法
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor()); }
然后跟SendMessageProcessor###asyncProcessRequest方法
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { //走这个分支 return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
然后跟SendMessageProcessor###asyncSendMessage方法
方法里有一个preSend方法
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { final RemotingCommand response = preSend(ctx, request, requestHeader); //省略 }
然后跟SendMessageProcessor###asyncSendMessage方法
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, SendMessageRequestHeader requestHeader) { //省略 //检查主题的问题 super.msgCheck(ctx, requestHeader, response); //省略 }
跟进AbstractSendMessageProcessor###msgCheck方法
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { //省略 //broker上创建主题,跟进去 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); //省略 }
TopicConfigManager###createTopicInSendMessageMethod
该方法会创建主题并且持久化,此时主题在broker中存在但是NameServer不存在
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { if (PermName.isInherited(defaultTopicConfig.getPerm())) { //创建新主题的topic信息 topicConfig = new TopicConfig(topic); int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums()); if (queueNums < 0) { queueNums = 0; } topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); int perm = defaultTopicConfig.getPerm(); perm &= ~PermName.PERM_INHERIT; topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } } if (topicConfig != null) { //持久化 this.persist(); } } return topicConfig; }
###ConfigManager###persist方法
public synchronized void persist() { String jsonString = this.encode(true); if (jsonString != null) { //我的值是C:\Users\25682\store\config\topics.json String fileName = this.configFilePath(); try { MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
MixAll###string2File
//str为最新的全部topic信息 public static void string2File(final String str, final String fileName) throws IOException { //先把str存到topics.json.tmp里 String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); //把topics.json里的数据存储到topics.json.bk里 String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } //删除topics.json File file = new File(fileName); file.delete(); //把topics.json.tmp重命名为topics.json file = new File(tmpFile); file.renameTo(new File(fileName)); }
TBW102主题的作用
Producer 在发送消息时,默认情况下,不需要提前创建好 Topic,如果 Topic 不存在,Broker 会自动创建 Topic。但是新创建的 Topic 它的权限是什么?读写队列数是多少呢?这个时候就需要用到TBW102
了,RocketMQ 会基于该 Topic 的配置创建新的 Topic。