开发者社区> 问答> 正文

canal.mq.partitionsNum 配置后,rocketmq的queue都发了消息,现象好

canal server 版本为 1.1.5-snapshot

canal中的instance配置文件中,配置了

canal.mq.topic=customer_info canal.mq.partitionsNum=4 canal.mq.partitionHash=scb_customer.customer_info:sid,scb_customer.label_customer:sid

新增customer_info表一条记录

Broker | 队列 | 消费者终端 | 代理者位点 | 消费者位点 | 差值 | 上次时间 broker-a | 0 | | 2 | 0 | 2 | 2020-12-10 22:34:24 broker-a | 1 | | 2 | 0 | 2 | 1970-01-01 08:00:00 broker-a | 2 | | 2 | 0 | 2 | 1970-01-01 08:00:00 broker-a | 3 | | 2 | 0 | 2 | 1970-01-01 08:00:00

上面的4个队列 都增加了消息;;;每新增一条记录,4个队列都插入消息

也就是会产生4个消息(只有1个消息的消息body里面的内容是正确的;其他3个消息 MessageBody 为 null )

原提问者GitHub用户gujiachun

展开
收起
山海行 2023-04-27 19:16:55 135 0
1 条回答
写回答
取消 提交回答
  • 原因应该找到了

    // 并发构造 MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor); // 串行分区 List flatMessages = MQMessageUtils.messageConverter(datas, message.getId()); // 初始化分区合并队列 if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) { List partitionFlatMessages = new ArrayList<>(); for (int i = 0; i < destination.getPartitionsNum(); i++) { partitionFlatMessages.add(new ArrayList<>()); }

            for (FlatMessage flatMessage : flatMessages) {
                FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                    partitionNum,
                    destination.getPartitionHash(),
                    mqProperties.isDatabaseHash());
                int length = partitionFlatMessage.length;
                for (int i = 0; i < length; i++) {
                    partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                }
            }
    
            ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
            for (int i = 0; i < partitionFlatMessages.size(); i++) {
                final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                **if (flatMessagePart != null) {**
                    final int index = i;
                    template.submit(() -> {
                        List<Message> messages = flatMessagePart.stream()
                            .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
                                SerializerFeature.WriteMapNullValue)))
                            .collect(Collectors.toList());
                        // 批量发送
                        sendMessage(messages, index);
                    });
                }
            }
    

    源码中 if (flatMessagePart != null) 这个判断 有问题;;这个是不可能为null的;调试后,[null];(有值,只是值为null而已)

    这个是上面的2个for循环导致的

    改了源码,修复了此bug了 改动的地方

    for (int i = 0; i < length; i++) { if (partitionFlatMessage[i] != null) {//增加null判断 partitionFlatMessages.get(i).add(partitionFlatMessage[i]); } }

    改动的第二个地方

    if (flatMessagePart != null && flatMessagePart.size() > 0) {//判断加上size要大于0 final int index = i; template.submit(() -> { List messages = flatMessagePart.stream() .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue))) .collect(Collectors.toList()); // 批量发送 sendMessage(messages, index); }); }

    原回答者GitHub用户gujiachun

    2023-04-28 14:03:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载