SpringBoot整合RocketMQ发送普通消息

简介: RocketMQ是一个统一消息引擎、轻量级数据处理平台

消息发送分类

1、同步发送消息
生产者发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。此类消息可靠性最高,但消息发送效率低
2、异步发送消息
生产者发出消息后无需等待MQ返回ACK,直接发送下一条消息。此类消息可靠性可以得到保障,消息发送效率也可
3、单向发送消息
生产者仅负责发送消息,不等待、不处理MQ的ACK,此类方式MQ也不返回ACK,消息发送效率最高,但消息可靠性较差

配置信息

生产者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  producer:
    group: simple-producer-group
    send-message-timeout: 3000 #发送超时时间毫秒 默认3000
    retry-times-when-send-failed: 2 #同步发送失败时重试次数 默认2

消费者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  consumer:
    group: simple-consumer-group

生产者业务接口

public interface SimpleMessageService {

    /**
     * 发送消息
     * @param message
     */
    void sendMessage(String message);

    /**
     * 发送同步消息
     * @param id
     * @param message
     */
    void sendSyncMessage(String id, String message);

    /**
     * 发送异步消息
     * @param id
     * @param message
     */
    void sendAsyncMessage(String id, String message);

    /**
     * 发送单向消息
     * @param id
     * @param message
     */
    void sendOnewayMessage(String id, String message);
}

生产者业务接口实现类

@Service
public class SimpleMessageServiceImpl implements SimpleMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageServiceImpl.class);

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.convertAndSend("simple-message-topic", message);
        logger.info("发生消息成功!");
    }

    @Override
    public void sendSyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("simple-message-topic:sync-tags", strMessage);
        logger.info("发送简单同步消息成功!返回信息为:{}", JSON.toJSONString(result));
    }

    @Override
    public void sendAsyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.asyncSend("simple-message-topic:async-tags", strMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    logger.info("发送简单异步消息成功!返回信息为:{}", JSON.toJSONString(sendResult));
                }
            }
            @Override
            public void onException(Throwable throwable) {
                logger.error("发送简单异步消息失败!异常信息为:{}", throwable.getMessage());
            }
        });
    }

    @Override
    public void sendOnewayMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.sendOneWay("simple-message-topic:oneway-tags", strMessage);
    }
}
消费者类
@Component
@RocketMQMessageListener(topic = "simple-message-topic", consumerGroup = "${rocketmq.consumer.group}")
public class SimpleMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到消息:{}", message);
    }
}

测试

//测试同步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendSyncMessage(uuid, "hello world");
}
//测试异步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendAsyncMessage(uuid, "hello world");
    Thread.sleep(5000);
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1044 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
412 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
791 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67800 2
3 张图带你彻底理解 RocketMQ 事务消息