SpringBoot整合RocketMQ发送批量消息

简介: SpringBoot整合RocketMQ发送批量消息
  1. 发送限制

生产者进行消息发送时可以一次发送多条消息,这样可以提升发送效率,需注意以下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息

  1. 批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性

Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性

  1. 生产者业务接口
public interface BatchMessageService {

    /**
     * 发送批量消息
     * @param messageList
     */
    void sendBatchMessage(List<Message<String>> messageList);
}
  1. 生产者业务接口实现类
@Service
public class BatchMessageServiceImpl implements BatchMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(BatchMessageServiceImpl.class);

    @Override
    public void sendBatchMessage(List<Message<String>> messageList) {
        //限制数据大小
        ListSplitter splitter = new ListSplitter(1024 * 1024 * 1, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            SendResult result = rocketMQTemplate.syncSend("batch-message-topic:sync-tags", nextList);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
            } else {
                logger.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
            }
        }
    }
}
  1. 消息列表分割器类
public class ListSplitter<T> implements Iterator<List<T>> {

    /**
     * 分割数据大小
     */
    private int sizeLimit;

    /**
     * 分割数据列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }


    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
  1. 消费者类
@Component
@RocketMQMessageListener(topic = "batch-message-topic", consumerGroup = "batch-consumer-group")
public class BatchMessageListener implements RocketMQListener<List<Message<String>>> {

    private static final Logger logger = LoggerFactory.getLogger(BatchMessageListener.class);

    @Override
    public void onMessage(List<Message<String>> message) {
        logger.info("接收到批量消息:{}", JSON.toJSONString(message));
    }
}
  1. 测试
@Test
void batchMessage() {
    List<Message<String>> messageList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        String uuid = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder.withPayload("hello" + i).setHeader(RocketMQHeaders.KEYS, uuid).build();
        messageList.add(message);
    }
    batchMessageService.sendBatchMessage(messageList);
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
38 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1099 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
421 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
消息中间件 Java 中间件
SpringBoot学习笔记-10:第十章-SpringBoot 与消息
SpringBoot学习笔记-10:第十章-SpringBoot 与消息
144 0
SpringBoot学习笔记-10:第十章-SpringBoot 与消息
|
Java 应用服务中间件 Maven
传统maven项目和现在spring boot项目的区别
Spring Boot:传统 Web 项目与采用 Spring Boot 项目区别
520 0
传统maven项目和现在spring boot项目的区别