开发者社区> 问答> 正文
1
0
分享

CanalRocketMQProducer.java不支持批量发送MQ,并有空循环的问题

学习canal时,发现CanalRocketMQProducer的sendMessage不支持批量发送message。

CanalRocketMQProducer.java Line 141-148

1,当MQMessageUtils解析data,循环分配PartitionHash时,此时设计为收集到集合中,并批量发送比较合理

2,Line 149行,因为partitionFlatMessage中只有一行数据,此时的partitonFlatMessage不循环比较好。

刚学习JAVA,也不知道是不是看错了。

自己尝试修改了一下,有兴趣的人批评批评,瞎搞。

        List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
        if (flatMessages != null) {
            Integer partitionsNum = destination.getPartitionsNum();
            if (partitionsNum == null) {
                partitionsNum = 1;
            }
            List<Message>[] producerMessages = new ArrayList[partitionsNum];
            for (int i = 0; i < partitionsNum; i++) {
                producerMessages[i] = new ArrayList();
            }
            for (FlatMessage flatMessage : flatMessages) {
                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionsNum,
                        destination.getPartitionHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        FlatMessage flatMessagePart = partitionFlatMessage[i];
                        if (flatMessagePart != null) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("flatMessagePart: {}, partition: {}",
                                    JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
                                    i);
                            }
                            final int index = i;
                            try {
                                Message message = new Message(topicName, JSON.toJSONString(flatMessagePart.getData(),
                                    SerializerFeature.WriteMapNullValue).getBytes());
                                producerMessages[index].add(message);
                            } catch (Exception e) {
                                logger.error("send flat message to hashed partition error", e);
                                throw e;
                            }
                        }
                    }
                } else {
                    try {
                        final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                        if (logger.isDebugEnabled()) {
                            logger.debug("send message: {} to topic: {} fixed partition: {}",
                                JSON.toJSONString(flatMessage.getData(), SerializerFeature.WriteMapNullValue),
                                topicName,
                                partition);
                        }
                        Message message = new Message(topicName, JSON.toJSONString(flatMessage,
                            SerializerFeature.WriteMapNullValue).getBytes());
                        producerMessages[partition].add(message);
                    } catch (Exception e) {
                        logger.error("send flat message to fixed partition error", e);
                        throw e;
                    }
                }
            }

            //获取rocketmq queue list, 分批发送消息,增强对消息的处理能力
            List<MessageQueue> msgQueues = this.defaultMQProducer.fetchPublishMessageQueues(topicName);
            for (int i=0; i<partitionsNum; i++) {
                final Integer index = i;
                MessageQueue msgQ;
                final int size = msgQueues.size();
                if (index > size) {
                    msgQ = msgQueues.get(index % size);
                } else {
                    msgQ = msgQueues.get(index);
                }
                MessageListSplitter splitter = new MessageListSplitter(producerMessages[index]);
                while (splitter.hasNext()) {
                    try {
                        List<Message>  listItem = splitter.next();
                        this.defaultMQProducer.send(listItem, msgQ);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //处理error
                    }
                }
            }

            if (logger.isDebugEnabled()) {
                logger.debug("send message to rocket topic: {}", destination.getTopic());
            }
        }

原提问者GitHub用户newlipeng

展开
收起
Java工程师 2023-05-03 09:46:59 77 0 发布于北京
举报
飞天免费试用计划
领取免费云资源,开启云上实践第一步
全局流量管理 GTM
标准版 1个月
额度1个月内有效
云解析 DNS
旗舰版 1个月
额度1个月内有效
公共DNS(含HTTPDNS解析)
每月1000万次HTTP解析
不限时长
1 条回答
写回答
取消 提交回答
  • 你这版本看的有点老吧,默认已经做了相同分区的message合并,多个分区之间并行提交

    原回答者GitHub用户agapple

    2023-05-04 13:30:18 发布于北京 举报
    赞同 评论 打赏

    评论

    全部评论 (0)

    登录后可评论
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等