学习canal时,发现CanalRocketMQProducer的sendMessage不支持批量发送message。
CanalRocketMQProducer.java Line 141-148
1,当MQMessageUtils解析data,循环分配PartitionHash时,此时设计为收集到集合中,并批量发送比较合理
2,Line 149行,因为partitionFlatMessage中只有一行数据,此时的partitonFlatMessage不循环比较好。
刚学习JAVA,也不知道是不是看错了。
自己尝试修改了一下,有兴趣的人批评批评,瞎搞。
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
if (flatMessages != null) {
Integer partitionsNum = destination.getPartitionsNum();
if (partitionsNum == null) {
partitionsNum = 1;
}
List<Message>[] producerMessages = new ArrayList[partitionsNum];
for (int i = 0; i < partitionsNum; i++) {
producerMessages[i] = new ArrayList();
}
for (FlatMessage flatMessage : flatMessages) {
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionsNum,
destination.getPartitionHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
if (flatMessagePart != null) {
if (logger.isDebugEnabled()) {
logger.debug("flatMessagePart: {}, partition: {}",
JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
i);
}
final int index = i;
try {
Message message = new Message(topicName, JSON.toJSONString(flatMessagePart.getData(),
SerializerFeature.WriteMapNullValue).getBytes());
producerMessages[index].add(message);
} catch (Exception e) {
logger.error("send flat message to hashed partition error", e);
throw e;
}
}
}
} else {
try {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
if (logger.isDebugEnabled()) {
logger.debug("send message: {} to topic: {} fixed partition: {}",
JSON.toJSONString(flatMessage.getData(), SerializerFeature.WriteMapNullValue),
topicName,
partition);
}
Message message = new Message(topicName, JSON.toJSONString(flatMessage,
SerializerFeature.WriteMapNullValue).getBytes());
producerMessages[partition].add(message);
} catch (Exception e) {
logger.error("send flat message to fixed partition error", e);
throw e;
}
}
}
//获取rocketmq queue list, 分批发送消息,增强对消息的处理能力
List<MessageQueue> msgQueues = this.defaultMQProducer.fetchPublishMessageQueues(topicName);
for (int i=0; i<partitionsNum; i++) {
final Integer index = i;
MessageQueue msgQ;
final int size = msgQueues.size();
if (index > size) {
msgQ = msgQueues.get(index % size);
} else {
msgQ = msgQueues.get(index);
}
MessageListSplitter splitter = new MessageListSplitter(producerMessages[index]);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
this.defaultMQProducer.send(listItem, msgQ);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("send message to rocket topic: {}", destination.getTopic());
}
}
原提问者GitHub用户newlipeng
你这版本看的有点老吧,默认已经做了相同分区的message合并,多个分区之间并行提交
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)