八.RocketMQ极简入门-在SpringBoot中使用RocketMQ

简介: RocketMQ极简入门-在SpringBoot中使用RocketMQ

前言

现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例

SpringBoot集成RocketMQ

导入依赖

这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.4</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

配置文件

rocketmq:
  name-server: 127.0.0.1:9876 

  #生产者配置
  producer:
    #生产者组名字
    group: "service-producer"
    # 消息最大长度 默认 1024 * 4 (4M)
    max-message-size: 4096
    # 发送消息超时时间,默认 3000
    send-message-timeout: 3000
    # 发送消息失败重试次数,默认2
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2

简单消息发送

生产者

使用 RocketMQTemplate 发送消息,使用@Autowared注入RocketMQTemplate即可使用,其中包含的方法有

  • public void sendOneWay(String destination, Object payload) :单向消息
  • public SendResult syncSend(String destination, Message<?> message) :同步消息
  • public void asyncSend(String destination, Message<?> message, SendCallback sendCallback):异步消息
  • public TransactionSendResult sendMessageInTransaction :事务消息

destination指的是消息的目的地,格式为: topicName:tags ,发送单向消息案例如:

 rocketMQTemplate.sendOneWay( "message:sms", "我是短信消息"));

同步消息是有发送结果的,同步消息发送如:

SendResult result = rocketMQTemplate.syncSend("message:sms", "我是短信消息");
//打印结果
System.out.println(result);

异步消息需要指定发送回调,SendCallback,异步消息发送如:

rocketMQTemplate.asyncSend("message:sms", "我是短信消息", new SendCallback() {
   
                    @Override
                    public void onSuccess(SendResult sendResult) {
   
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable e) {
   
                        e.printStackTrace();
                    }
                }
        );

消费者端

通过 RocketMQListener 监听器来监听消息,@RocketMQMessageListener注解来指定消费者组以及topic和tags。

@Slf4j
@Component
@RocketMQMessageListener(topic = "message",
        selectorExpression="sms"    //tags
                ,consumerGroup = "service-consumer"
        ,messageModel = MessageModel.CLUSTERING )
public class MessageConsumer implements RocketMQListener<MessageExt> {
   



    @Override
    @Transactional
    public void onMessage(MessageExt message) {
   
        String msg = new String(message.getBody(), CharsetUtil.UTF_8);
        log.info("消费者 {} ",msg);
    }
}

onMessage方法是自动ack消息,如果方法中出现异常,ack失败,消息将会重试消费。

事务消息

对事务的理解见上一篇《事务消息

事务监听器

通过实现 RocketMQLocalTransactionListener 编写本地事务监听器

@Component
//订单事务组
@RocketMQTransactionListener(txProducerGroup = "TX_GROUP")
@Slf4j
public class MyTransactionListener implements RocketMQLocalTransactionListener {
   

    //执行本地事务,返回commit事务消息才会被消费者消费,我们可以在该方法中对数据库做写操作保存本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
   

       log.info("执行本地事务,",msg,arg);

        //本地事务执行成功,返回commit
       return RocketMQLocalTransactionState.COMMIT;

        //本地事务执行失败,返回rollback ,事务消息不会被消费
        //return RocketMQLocalTransactionState.ROLLBACK;
    }

    //检查本地事务状态,MQ通过该方法回查本地事务状态
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
   
        byte[] payload = (byte[]) msg.getPayload();
        //回查本地事务状态,如果成功,返回commit
        return RocketMQLocalTransactionState.COMMIT;
        //回查本地事务状态,如果失败,返回rollback,事务消息不会被消费
        //return RocketMQLocalTransactionState.ROLLBACK;
    }
}

事务消息发送方

//构建消息
Message message =  MessageBuilder.withPayload("事务消息").build();
//发送下单的事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
      "TX-GROUP",    //事务组名字,需要和事务监听器的事务组名字一样。
      "txtopic:txtags",
      message, null);
if(result.getSendStatus() == SendStatus.SEND_OK){
   
    //消息发送成功
}

这里的message会传递给事务监听器的executeLocalTransaction方法中

事务消息消费方

消费者就是普通的消费者即可

@Slf4j
@Component
@RocketMQMessageListener(topic = "txtopic",
        selectorExpression="txtags"        ,consumerGroup = "service-consumer"
        ,messageModel = MessageModel.CLUSTERING
    )
public class MessageConsumer implements RocketMQListener<MessageExt> {
   



    @Override
    public void onMessage(MessageExt message) {
   
        String msg = new String(message.getBody(), CharsetUtil.UTF_8);
        //消费消息
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
18天前
|
消息中间件 Java 网络架构
|
1天前
|
小程序 前端开发 Java
SpringBoot+uniapp+uview打造H5+小程序+APP入门学习的聊天小项目
JavaDog Chat v1.0.0 是一款基于 SpringBoot、MybatisPlus 和 uniapp 的简易聊天软件,兼容 H5、小程序和 APP,提供丰富的注释和简洁代码,适合初学者。主要功能包括登录注册、消息发送、好友管理及群组交流。
8 0
SpringBoot+uniapp+uview打造H5+小程序+APP入门学习的聊天小项目
|
12天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
8天前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
14 0
分享一下rocketmq入门小知识
|
16天前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
70 2
|
18天前
|
消息中间件 Java Maven
|
25天前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
93 2
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
34 3
|
2月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
145 10
|
28天前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
45 0
下一篇
云函数