【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息

简介: 【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息

本文将主要介绍在SpringBoot项目中如何集成RocketMQ以实现普通消息和事务消息的。

首先是分别创建生产者的springboot项目 springboot-rocketmq-producer,创建消费者的springboot项目 springboot-rocketmq-consumer。

1. 引入依赖

本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。

可以通过mvnrepository进行查看。https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot/2.2.2

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot</artifactId>
    <version>2.2.2</version>
</dependency>

2. 配置文件修改

在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:

rocketmq:
  name-server: 172.31.184.89:9876
  producer:
    group: feige-producer-group
  consumer:
    topic: my-spring-boot-topic

在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:

server:
  port: 8080
rocketmq:
  name-server: 172.31.184.89:9876
  consumer:
    group: feige-consumer-group
    topic: my-spring-boot-topic

3. 实现生产者

定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend 方法进行消息发送。

@Component
public class MyProducer {
  @Autowired
  private RocketMQTemplate rocketMQTemplate;
  /**
   * 发送普通消息
   *
   * @param topic   主题
   * @param message 消息
   */
  public void sendMessage(String topic, String message) {
    rocketMQTemplate.convertAndSend(topic, message);
  }
3.1. 编写生产者单元测试
@Autowired
  private MyProducer myProducer;
  @Value("${rocketmq.consumer.topic:}")
  private String consumerTopic;
  @Test
  void sendMessage() {
    myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");
  }

4.实现消费者

定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。

@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
public class MyConsumer implements RocketMQListener<String> {
  @Override
  public void onMessage(String s) {
    System.out.println("收到的消息是=" + s);
  }
}

5. 实现事务消息

在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。

5.1. 实现事务消息的生产者

在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。

/**
   * 发送事务消息
   *
   * @param topic 话题
   * @param msg   消息
   */
  public void sendTransactionMessage(String topic, String msg) throws InterruptedException {
    String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
    for (int i = 0; i < 10; i++) {
      // 2. 将topic和tag整合在一起,以:分割,
      String destination = topic + ":" + tags[i % tags.length];
        // 1.注意该message是org.springframework.messaging.Message
      Message<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i)
          .setHeader("destination", destination).build();
      // 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数
      rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
      Thread.sleep(10);
    }
  }

这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。

5.2. 实现本地事务消息

接着在定义生产者本地事务实现类  MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将  org.springframework.messaging.Message 转成 org.apache.rocketmq.common.message.Message

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    // 将消息转成rocketmq下的message
    org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);
    String tags = message.getTags();
    if (tags.equals("tagA")) {
      return RocketMQLocalTransactionState.COMMIT;
    } else if (tags.equals("tagB")) {
      return RocketMQLocalTransactionState.ROLLBACK;
    }
    return RocketMQLocalTransactionState.UNKNOWN;
  }
@Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    // 将消息转成rocketmq下的message
    String destination = (String) msg.getHeaders().get("destination");
    org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),
        "utf-8",destination, msg);
    String tags = message.getTags();
    if (tags.equals("tagC")) {
      return RocketMQLocalTransactionState.COMMIT;
    } else if (tags.equals("tagD")) {
      return RocketMQLocalTransactionState.ROLLBACK;
    }
    return RocketMQLocalTransactionState.UNKNOWN;
  }
}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
65 3
|
3月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
82 2
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
37 1
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
779 2
|
3月前
|
消息中间件 Java Maven
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
4月前
|
监控 druid Java
spring boot 集成配置阿里 Druid监控配置
spring boot 集成配置阿里 Druid监控配置
290 6