SpringBoot整合RocketMQ发送普通消息

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RocketMQ是一个统一消息引擎、轻量级数据处理平台

消息发送分类

1、同步发送消息
生产者发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。此类消息可靠性最高,但消息发送效率低
2、异步发送消息
生产者发出消息后无需等待MQ返回ACK,直接发送下一条消息。此类消息可靠性可以得到保障,消息发送效率也可
3、单向发送消息
生产者仅负责发送消息,不等待、不处理MQ的ACK,此类方式MQ也不返回ACK,消息发送效率最高,但消息可靠性较差

配置信息

生产者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  producer:
    group: simple-producer-group
    send-message-timeout: 3000 #发送超时时间毫秒 默认3000
    retry-times-when-send-failed: 2 #同步发送失败时重试次数 默认2

消费者

rocketmq:
  name-server: xx.xx.xx.xx:9876
  consumer:
    group: simple-consumer-group

生产者业务接口

public interface SimpleMessageService {

    /**
     * 发送消息
     * @param message
     */
    void sendMessage(String message);

    /**
     * 发送同步消息
     * @param id
     * @param message
     */
    void sendSyncMessage(String id, String message);

    /**
     * 发送异步消息
     * @param id
     * @param message
     */
    void sendAsyncMessage(String id, String message);

    /**
     * 发送单向消息
     * @param id
     * @param message
     */
    void sendOnewayMessage(String id, String message);
}

生产者业务接口实现类

@Service
public class SimpleMessageServiceImpl implements SimpleMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageServiceImpl.class);

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.convertAndSend("simple-message-topic", message);
        logger.info("发生消息成功!");
    }

    @Override
    public void sendSyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        SendResult result = rocketMQTemplate.syncSend("simple-message-topic:sync-tags", strMessage);
        logger.info("发送简单同步消息成功!返回信息为:{}", JSON.toJSONString(result));
    }

    @Override
    public void sendAsyncMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.asyncSend("simple-message-topic:async-tags", strMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    logger.info("发送简单异步消息成功!返回信息为:{}", JSON.toJSONString(sendResult));
                }
            }
            @Override
            public void onException(Throwable throwable) {
                logger.error("发送简单异步消息失败!异常信息为:{}", throwable.getMessage());
            }
        });
    }

    @Override
    public void sendOnewayMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.sendOneWay("simple-message-topic:oneway-tags", strMessage);
    }
}
消费者类
@Component
@RocketMQMessageListener(topic = "simple-message-topic", consumerGroup = "${rocketmq.consumer.group}")
public class SimpleMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到消息:{}", message);
    }
}

测试

//测试同步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendSyncMessage(uuid, "hello world");
}
//测试异步发送消息
@Test
void simpleMessage() {
    String uuid = UUID.randomUUID().toString();
    simpleMessageService.sendAsyncMessage(uuid, "hello world");
    Thread.sleep(5000);
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11月前
|
消息中间件 Java 数据安全/隐私保护
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(三)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1059 0
|
1月前
|
消息中间件 Java Maven
|
1月前
|
消息中间件 Java Kafka
|
2月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
141 1
|
4月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
|
11月前
|
消息中间件 存储 Java
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(一)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
103 0
|
11月前
|
消息中间件 Java 测试技术
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(四)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息
|
11月前
|
消息中间件 Java
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息(二)
搭建RabbitMQ消息服务,整合SpringBoot实现收发消息