开发者社区> 游客bzudnnsbr7w4e> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

SpringBoot整合RocketMQ发送批量消息

简介: SpringBoot整合RocketMQ发送批量消息
+关注继续查看
  1. 发送限制

生产者进行消息发送时可以一次发送多条消息,这样可以提升发送效率,需注意以下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息

  1. 批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性

Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性

  1. 生产者业务接口
public interface BatchMessageService {

    /**
     * 发送批量消息
     * @param messageList
     */
    void sendBatchMessage(List<Message<String>> messageList);
}
  1. 生产者业务接口实现类
@Service
public class BatchMessageServiceImpl implements BatchMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(BatchMessageServiceImpl.class);

    @Override
    public void sendBatchMessage(List<Message<String>> messageList) {
        //限制数据大小
        ListSplitter splitter = new ListSplitter(1024 * 1024 * 1, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            SendResult result = rocketMQTemplate.syncSend("batch-message-topic:sync-tags", nextList);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
            } else {
                logger.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
            }
        }
    }
}
  1. 消息列表分割器类
public class ListSplitter<T> implements Iterator<List<T>> {

    /**
     * 分割数据大小
     */
    private int sizeLimit;

    /**
     * 分割数据列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }


    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
  1. 消费者类
@Component
@RocketMQMessageListener(topic = "batch-message-topic", consumerGroup = "batch-consumer-group")
public class BatchMessageListener implements RocketMQListener<List<Message<String>>> {

    private static final Logger logger = LoggerFactory.getLogger(BatchMessageListener.class);

    @Override
    public void onMessage(List<Message<String>> message) {
        logger.info("接收到批量消息:{}", JSON.toJSONString(message));
    }
}
  1. 测试
@Test
void batchMessage() {
    List<Message<String>> messageList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        String uuid = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder.withPayload("hello" + i).setHeader(RocketMQHeaders.KEYS, uuid).build();
        messageList.add(message);
    }
    batchMessageService.sendBatchMessage(messageList);
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
0 0
SpringBoot整合RocketMQ发送消息过滤
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。 对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
0 0
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
0 0
SpringBoot整合RocketMQ发送事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
0 0
SpringBoot整合RocketMQ发送顺序消息
严格按照消息的发送顺序进行消费的消息。默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列,而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的,如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性
0 0
SpringBoot整合RocketMQ发送普通消息
RocketMQ是一个统一消息引擎、轻量级数据处理平台
0 0
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
0 0
方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消息
方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消息
0 0
Rocketmq spring 上手:如何在优雅地Spring 中实现消息的发送和消费
Rocketmq spring 上手:如何在优雅地Spring 中实现消息的发送和消费
0 0
消息队列AMQP Spring Boot发送延时消息
消息队列AMQP Spring Boot发送延时消息
461 0
文章
问答
文章排行榜
最热
最新
相关电子书
更多
AliSQL 内核定制方案
立即下载
Egg— 企业级 Node 框架
立即下载
数据库上云经典案例分析
立即下载