学习canal时,发现CanalRocketMQProducer的sendMessage不支持批量发送message。
CanalRocketMQProducer.java Line 141-148
1,当MQMessageUtils解析data,循环分配PartitionHash时,此时设计为收集到集合中,并批量发送比较合理
2,Line 149行,因为partitionFlatMessage中只有一行数据,此时的partitonFlatMessage不循环比较好。
刚学习JAVA,也不知道是不是看错了。
自己尝试修改了一下,有兴趣的人批评批评,瞎搞。
        List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
        if (flatMessages != null) {
            Integer partitionsNum = destination.getPartitionsNum();
            if (partitionsNum == null) {
                partitionsNum = 1;
            }
            List<Message>[] producerMessages = new ArrayList[partitionsNum];
            for (int i = 0; i < partitionsNum; i++) {
                producerMessages[i] = new ArrayList();
            }
            for (FlatMessage flatMessage : flatMessages) {
                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionsNum,
                        destination.getPartitionHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        FlatMessage flatMessagePart = partitionFlatMessage[i];
                        if (flatMessagePart != null) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("flatMessagePart: {}, partition: {}",
                                    JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
                                    i);
                            }
                            final int index = i;
                            try {
                                Message message = new Message(topicName, JSON.toJSONString(flatMessagePart.getData(),
                                    SerializerFeature.WriteMapNullValue).getBytes());
                                producerMessages[index].add(message);
                            } catch (Exception e) {
                                logger.error("send flat message to hashed partition error", e);
                                throw e;
                            }
                        }
                    }
                } else {
                    try {
                        final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                        if (logger.isDebugEnabled()) {
                            logger.debug("send message: {} to topic: {} fixed partition: {}",
                                JSON.toJSONString(flatMessage.getData(), SerializerFeature.WriteMapNullValue),
                                topicName,
                                partition);
                        }
                        Message message = new Message(topicName, JSON.toJSONString(flatMessage,
                            SerializerFeature.WriteMapNullValue).getBytes());
                        producerMessages[partition].add(message);
                    } catch (Exception e) {
                        logger.error("send flat message to fixed partition error", e);
                        throw e;
                    }
                }
            }
            //获取rocketmq queue list, 分批发送消息,增强对消息的处理能力
            List<MessageQueue> msgQueues = this.defaultMQProducer.fetchPublishMessageQueues(topicName);
            for (int i=0; i<partitionsNum; i++) {
                final Integer index = i;
                MessageQueue msgQ;
                final int size = msgQueues.size();
                if (index > size) {
                    msgQ = msgQueues.get(index % size);
                } else {
                    msgQ = msgQueues.get(index);
                }
                MessageListSplitter splitter = new MessageListSplitter(producerMessages[index]);
                while (splitter.hasNext()) {
                    try {
                        List<Message>  listItem = splitter.next();
                        this.defaultMQProducer.send(listItem, msgQ);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //处理error
                    }
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("send message to rocket topic: {}", destination.getTopic());
            }
        }
原提问者GitHub用户newlipeng
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。