- 发送限制
生产者进行消息发送时可以一次发送多条消息,这样可以提升发送效率,需注意以下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息
- 批量发送大小
默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:
- 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
- 方案二:在Producer端与Broker端修改属性
Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性
- 生产者业务接口
public interface BatchMessageService {
/**
* 发送批量消息
* @param messageList
*/
void sendBatchMessage(List<Message<String>> messageList);
}
- 生产者业务接口实现类
@Service
public class BatchMessageServiceImpl implements BatchMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(BatchMessageServiceImpl.class);
@Override
public void sendBatchMessage(List<Message<String>> messageList) {
//限制数据大小
ListSplitter splitter = new ListSplitter(1024 * 1024 * 1, messageList);
while (splitter.hasNext()) {
List<Message> nextList = splitter.next();
SendResult result = rocketMQTemplate.syncSend("batch-message-topic:sync-tags", nextList);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
} else {
logger.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
}
}
}
}
- 消息列表分割器类
public class ListSplitter<T> implements Iterator<List<T>> {
/**
* 分割数据大小
*/
private int sizeLimit;
/**
* 分割数据列表
*/
private final List<T> messages;
/**
* 分割索引
*/
private int currIndex;
public ListSplitter(int sizeLimit, List<T> messages) {
this.sizeLimit = sizeLimit;
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<T> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
T t = messages.get(nextIndex);
totalSize = totalSize + t.toString().length();
if (totalSize > sizeLimit) {
break;
}
}
List<T> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
- 消费者类
@Component
@RocketMQMessageListener(topic = "batch-message-topic", consumerGroup = "batch-consumer-group")
public class BatchMessageListener implements RocketMQListener<List<Message<String>>> {
private static final Logger logger = LoggerFactory.getLogger(BatchMessageListener.class);
@Override
public void onMessage(List<Message<String>> message) {
logger.info("接收到批量消息:{}", JSON.toJSONString(message));
}
}
- 测试
@Test
void batchMessage() {
List<Message<String>> messageList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
String uuid = UUID.randomUUID().toString();
Message<String> message = MessageBuilder.withPayload("hello" + i).setHeader(RocketMQHeaders.KEYS, uuid).build();
messageList.add(message);
}
batchMessageService.sendBatchMessage(messageList);
}