SpringBoot整合RocketMQ发送批量消息

简介: SpringBoot整合RocketMQ发送批量消息
  1. 发送限制

生产者进行消息发送时可以一次发送多条消息,这样可以提升发送效率,需注意以下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息

  1. 批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性

Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性

  1. 生产者业务接口
public interface BatchMessageService {

    /**
     * 发送批量消息
     * @param messageList
     */
    void sendBatchMessage(List<Message<String>> messageList);
}
  1. 生产者业务接口实现类
@Service
public class BatchMessageServiceImpl implements BatchMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(BatchMessageServiceImpl.class);

    @Override
    public void sendBatchMessage(List<Message<String>> messageList) {
        //限制数据大小
        ListSplitter splitter = new ListSplitter(1024 * 1024 * 1, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            SendResult result = rocketMQTemplate.syncSend("batch-message-topic:sync-tags", nextList);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
            } else {
                logger.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
            }
        }
    }
}
  1. 消息列表分割器类
public class ListSplitter<T> implements Iterator<List<T>> {

    /**
     * 分割数据大小
     */
    private int sizeLimit;

    /**
     * 分割数据列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }


    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
  1. 消费者类
@Component
@RocketMQMessageListener(topic = "batch-message-topic", consumerGroup = "batch-consumer-group")
public class BatchMessageListener implements RocketMQListener<List<Message<String>>> {

    private static final Logger logger = LoggerFactory.getLogger(BatchMessageListener.class);

    @Override
    public void onMessage(List<Message<String>> message) {
        logger.info("接收到批量消息:{}", JSON.toJSONString(message));
    }
}
  1. 测试
@Test
void batchMessage() {
    List<Message<String>> messageList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        String uuid = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder.withPayload("hello" + i).setHeader(RocketMQHeaders.KEYS, uuid).build();
        messageList.add(message);
    }
    batchMessageService.sendBatchMessage(messageList);
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java 数据安全/隐私保护
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(三)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
|
3月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
|
6月前
|
消息中间件 Java
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(二)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
6月前
|
消息中间件 存储 Java
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(一)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
6月前
|
消息中间件 Java 测试技术
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(四)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
9月前
|
消息中间件 RocketMQ
RocketMQ极简入门-RocketMQ消息批量发送
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB,如果超过可以有2种处理方案: 1.将消息进行切割成多个小于4M的内容进行发送 2.修改4M的限制改成更大 可以设置Producer的maxMessageSize属性 修改配置文件中的maxMessageSize属性
335 0
|
9月前
|
消息中间件 RocketMQ
六.RocketMQ极简入门-RocketMQ消息批量发送
RocketMQ极简入门-RocketMQ消息批量发送
|
消息中间件 Java 中间件
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
1509 0
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息