SpringBoot整合RocketMQ发送普通消息

简介: RocketMQ是一个统一消息引擎、轻量级数据处理平台

消息发送分类

1、同步发送消息
生产者发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。此类消息可靠性最高,但消息发送效率低
2、异步发送消息
生产者发出消息后无需等待MQ返回ACK,直接发送下一条消息。此类消息可靠性可以得到保障,消息发送效率也可
3、单向发送消息
生产者仅负责发送消息,不等待、不处理MQ的ACK,此类方式MQ也不返回ACK,消息发送效率最高,但消息可靠性较差

配置信息

生产者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  producer:
    group: simple-producer-group
    send-message-timeout: 3000 #发送超时时间毫秒 默认3000
    retry-times-when-send-failed: 2 #同步发送失败时重试次数 默认2

消费者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  consumer:
    group: simple-consumer-group

生产者业务接口

public interface SimpleMessageService {

    /**
     * 发送消息
     * @param message
     */
    void sendMessage(String message);

    /**
     * 发送同步消息
     * @param id
     * @param message
     */
    void sendSyncMessage(String id, String message);

    /**
     * 发送异步消息
     * @param id
     * @param message
     */
    void sendAsyncMessage(String id, String message);

    /**
     * 发送单向消息
     * @param id
     * @param message
     */
    void sendOnewayMessage(String id, String message);
}

生产者业务接口实现类

@Service
public class SimpleMessageServiceImpl implements SimpleMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageServiceImpl.class);

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.convertAndSend("simple-message-topic", message);
        logger.info("发生消息成功!");
    }

    @Override
    public void sendSyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("simple-message-topic:sync-tags", strMessage);
        logger.info("发送简单同步消息成功!返回信息为:{}", JSON.toJSONString(result));
    }

    @Override
    public void sendAsyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.asyncSend("simple-message-topic:async-tags", strMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    logger.info("发送简单异步消息成功!返回信息为:{}", JSON.toJSONString(sendResult));
                }
            }
            @Override
            public void onException(Throwable throwable) {
                logger.error("发送简单异步消息失败!异常信息为:{}", throwable.getMessage());
            }
        });
    }

    @Override
    public void sendOnewayMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.sendOneWay("simple-message-topic:oneway-tags", strMessage);
    }
}
消费者类
@Component
@RocketMQMessageListener(topic = "simple-message-topic", consumerGroup = "${rocketmq.consumer.group}")
public class SimpleMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到消息:{}", message);
    }
}

测试

//测试同步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendSyncMessage(uuid, "hello world");
}
//测试异步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendAsyncMessage(uuid, "hello world");
    Thread.sleep(5000);
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
25天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
283 1
|
4月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
620 0
|
10月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
154 6
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
2079 3
|
消息中间件 Java Maven
|
2天前
|
搜索推荐 JavaScript Java
基于springboot的儿童家长教育能力提升学习系统
本系统聚焦儿童家长教育能力提升,针对家庭教育中理念混乱、时间不足、个性化服务缺失等问题,构建科学、系统、个性化的在线学习平台。融合Spring Boot、Vue等先进技术,整合优质教育资源,提供高效便捷的学习路径,助力家长掌握科学育儿方法,促进儿童全面健康发展,推动家庭和谐与社会进步。

热门文章

最新文章