开发者社区> 陌然浅笑-支> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

SpringBoot整合RocketMQ发送批量消息

简介: SpringBoot整合RocketMQ发送批量消息
+关注继续查看
  1. 发送限制

生产者进行消息发送时可以一次发送多条消息,这样可以提升发送效率,需注意以下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息

  1. 批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性

Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性

  1. 生产者业务接口
public interface BatchMessageService {

    /**
     * 发送批量消息
     * @param messageList
     */
    void sendBatchMessage(List<Message<String>> messageList);
}
  1. 生产者业务接口实现类
@Service
public class BatchMessageServiceImpl implements BatchMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(BatchMessageServiceImpl.class);

    @Override
    public void sendBatchMessage(List<Message<String>> messageList) {
        //限制数据大小
        ListSplitter splitter = new ListSplitter(1024 * 1024 * 1, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            SendResult result = rocketMQTemplate.syncSend("batch-message-topic:sync-tags", nextList);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
            } else {
                logger.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
            }
        }
    }
}
  1. 消息列表分割器类
public class ListSplitter<T> implements Iterator<List<T>> {

    /**
     * 分割数据大小
     */
    private int sizeLimit;

    /**
     * 分割数据列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }


    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
  1. 消费者类
@Component
@RocketMQMessageListener(topic = "batch-message-topic", consumerGroup = "batch-consumer-group")
public class BatchMessageListener implements RocketMQListener<List<Message<String>>> {

    private static final Logger logger = LoggerFactory.getLogger(BatchMessageListener.class);

    @Override
    public void onMessage(List<Message<String>> message) {
        logger.info("接收到批量消息:{}", JSON.toJSONString(message));
    }
}
  1. 测试
@Test
void batchMessage() {
    List<Message<String>> messageList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        String uuid = UUID.randomUUID().toString();
        Message<String> message = MessageBuilder.withPayload("hello" + i).setHeader(RocketMQHeaders.KEYS, uuid).build();
        messageList.add(message);
    }
    batchMessageService.sendBatchMessage(messageList);
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SpringBoot——SpringBoot集成WebSocket实现简单的多人聊天室
SpringBoot——SpringBoot集成WebSocket实现简单的多人聊天室
248 0
SpringBoot——借助Maven多模块管理实现集成SSM、Dubbo、Thymeleaf的汇总案例
SpringBoot——借助Maven多模块管理实现集成SSM、Dubbo、Thymeleaf的汇总案例
61 0
基于SpringBoot 2.x开发的简易版图书管理系统(实现对图书的CRUD)
基于SpringBoot 2.x开发的简易版图书管理系统(实现对图书的CRUD)
69 0
Java项目-基于Springboot实现英语在线学习系统
本项目基本Springboot开发实现,并同时使用Springmvc+mybatis框架来进行开发实现,主要实现一个在线学习英语的基于B/S结构的学习系统。本英语学习项目是为了满足学生学习英语的需求而开发,在本系统中学生可以通过背单词,每日一句,听听力,看阅读等方式加深对英语的学习与了解。 本系统的用户角色分为前端用户和后端管理用户角色,前端用户可以在本系统中进行注册后登陆,注册后会向用户的邮箱发送一个激活账户的邮箱,用户登陆邮箱后在线激活账户方可登陆系统。登陆系统后可以在线学习英语单词 ,进行听力训练,进行在线阅读学习等操作。后台管理账户登陆系统后可以对学习单词 、书籍、用户等 信息进行相
67 0
SpringBoot+Dubbo项目简单搭建实现斐波那契第n项
step1 新建项目 方法1:直接在IDEA里新建如图: 方法2:在start.spring.io新建 step2 新建需要的包和接口以及实现类 step3 在两个项目的resource下新建配置文件 step4 代码编写 导入依赖 provider consumer 端口冲突更改 step5 运行
64 0
SpringBoot 结合 Mybatis 实现创建数据库表
SpringBoot 结合 Mybatis 实现创建数据库表
119 0
(Redis使用系列) Springboot 使用redis的List数据结构实现简单的排队功能场景 九
(Redis使用系列) Springboot 使用redis的List数据结构实现简单的排队功能场景 九
335 0
SpringBoot RabbitMQ实现消息队列 邮箱
SpringBoot RabbitMQ实现消息队列 邮箱
81 0
Springboot 自定义注解AOP实现时间参数格式转换
Springboot 自定义注解AOP实现时间参数格式转换
134 0
Springboot 自定义mybatis 拦截器,实现我们要的扩展
Springboot 自定义mybatis 拦截器,实现我们要的扩展
173 0
+关注
陌然浅笑-支
格物致知
文章
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载