RocketMQ极简入门-在SpringBoot中使用RocketMQ

简介: 现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例

前言

现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例

SpringBoot集成RocketMQ

导入依赖

这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

配置文件

rocketmq:  name-server: 127.0.0.1:9876 #生产者配置  producer:#生产者组名字    group: "service-producer"# 消息最大长度 默认 1024 * 4 (4M)    max-message-size: 4096# 发送消息超时时间,默认 3000    send-message-timeout: 3000# 发送消息失败重试次数,默认2    retry-times-when-send-failed: 2    retry-times-when-send-async-failed: 2

简单消息发送

生产者

使用 RocketMQTemplate 发送消息,使用@Autowared注入RocketMQTemplate即可使用,其中包含的方法有

  • public void sendOneWay(String destination, Object payload) :单向消息
  • public SendResult syncSend(String destination, Message<?> message) :同步消息
  • public void asyncSend(String destination, Message<?> message, SendCallback sendCallback):异步消息
  • public TransactionSendResult sendMessageInTransaction :事务消息

destination指的是消息的目的地,格式为: topicName:tags ,发送单向消息案例如:

rocketMQTemplate.sendOneWay( "message:sms", "我是短信消息"));

同步消息是有发送结果的,同步消息发送如:

SendResultresult=rocketMQTemplate.syncSend("message:sms", "我是短信消息");
//打印结果System.out.println(result);

异步消息需要指定发送回调,SendCallback,异步消息发送如:

rocketMQTemplate.asyncSend("message:sms", "我是短信消息", newSendCallback() {
@OverridepublicvoidonSuccess(SendResultsendResult) {
System.out.println(sendResult);
                    }
@OverridepublicvoidonException(Throwablee) {
e.printStackTrace();
                    }
                }
        );

消费者端

通过 RocketMQListener 监听器来监听消息,@RocketMQMessageListener注解来指定消费者组以及topic和tags。

@Slf4j@Component@RocketMQMessageListener(topic="message",
selectorExpression="sms"//tags                ,consumerGroup="service-consumer"        ,messageModel=MessageModel.CLUSTERING )
publicclassMessageConsumerimplementsRocketMQListener<MessageExt> {
@Override@TransactionalpublicvoidonMessage(MessageExtmessage) {
Stringmsg=newString(message.getBody(), CharsetUtil.UTF_8);
log.info("消费者 {} ",msg);
    }
}

onMessage方法是自动ack消息,如果方法中出现异常,ack失败,消息将会重试消费。

事务消息

对事务的理解见上一篇《事务消息

事务监听器

通过实现 RocketMQLocalTransactionListener 编写本地事务监听器

@Component//订单事务组@RocketMQTransactionListener(txProducerGroup="TX_GROUP")
@Slf4jpublicclassMyTransactionListenerimplementsRocketMQLocalTransactionListener {
//执行本地事务,返回commit事务消息才会被消费者消费,我们可以在该方法中对数据库做写操作保存本地事务@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg, Objectarg) {
log.info("执行本地事务,",msg,arg);
//本地事务执行成功,返回commitreturnRocketMQLocalTransactionState.COMMIT;
//本地事务执行失败,返回rollback ,事务消息不会被消费//return RocketMQLocalTransactionState.ROLLBACK;    }
//检查本地事务状态,MQ通过该方法回查本地事务状态@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg) {
byte[] payload= (byte[]) msg.getPayload();
//回查本地事务状态,如果成功,返回commitreturnRocketMQLocalTransactionState.COMMIT;
//回查本地事务状态,如果失败,返回rollback,事务消息不会被消费//return RocketMQLocalTransactionState.ROLLBACK;    }
}

事务消息发送方

//构建消息Messagemessage=MessageBuilder.withPayload("事务消息").build();
//发送下单的事务消息TransactionSendResultresult=rocketMQTemplate.sendMessageInTransaction(
"TX-GROUP",   //事务组名字,需要和事务监听器的事务组名字一样。"txtopic:txtags",
message, null);
if(result.getSendStatus() ==SendStatus.SEND_OK){
//消息发送成功}

这里的message会传递给事务监听器的executeLocalTransaction方法中

事务消息消费方

消费者就是普通的消费者即可

@Slf4j@Component@RocketMQMessageListener(topic="txtopic",
selectorExpression="txtags"        ,consumerGroup="service-consumer"        ,messageModel=MessageModel.CLUSTERING    )
publicclassMessageConsumerimplementsRocketMQListener<MessageExt> {
@OverridepublicvoidonMessage(MessageExtmessage) {
Stringmsg=newString(message.getBody(), CharsetUtil.UTF_8);
//消费消息    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
95 0
|
1月前
|
NoSQL Java Redis
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
247 1
|
2月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
32 1
|
3月前
|
消息中间件 存储 安全
SpringBoot与RabbitMQ详解与整合
SpringBoot与RabbitMQ详解与整合
57 0
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
59 0
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
8天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
RabbitMQ入门指南(九):消费者可靠性
|
8天前
|
消息中间件 存储 Java
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
|
8天前
|
消息中间件 微服务
RabbitMQ入门指南(四):交换机与案例解析
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了交换机在RabbitMQ中的作用与类型、交换机案例(Fanout交换机、Direct交换机、Topic交换机)等内容。
|
8天前
|
消息中间件 存储 数据库
RabbitMQ入门指南(二):架构和管理控制台的使用
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ架构和管理控制台的使用等内容。
RabbitMQ入门指南(二):架构和管理控制台的使用