前言
现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例
SpringBoot集成RocketMQ
导入依赖
这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
配置文件
rocketmq name-server 127.0.0.19876 #生产者配置 producer#生产者组名字 group"service-producer"# 消息最大长度 默认 1024 * 4 (4M) max-message-size4096# 发送消息超时时间,默认 3000 send-message-timeout3000# 发送消息失败重试次数,默认2 retry-times-when-send-failed2 retry-times-when-send-async-failed2
简单消息发送
生产者
使用 RocketMQTemplate 发送消息,使用@Autowared注入RocketMQTemplate即可使用,其中包含的方法有
- public void sendOneWay(String destination, Object payload) :单向消息
- public SendResult syncSend(String destination, Message<?> message) :同步消息
- public void asyncSend(String destination, Message<?> message, SendCallback sendCallback):异步消息
- public TransactionSendResult sendMessageInTransaction :事务消息
destination指的是消息的目的地,格式为: topicName:tags ,发送单向消息案例如:
rocketMQTemplate.sendOneWay( "message:sms", "我是短信消息"));
同步消息是有发送结果的,同步消息发送如:
SendResultresult=rocketMQTemplate.syncSend("message:sms", "我是短信消息"); //打印结果System.out.println(result);
异步消息需要指定发送回调,SendCallback,异步消息发送如:
rocketMQTemplate.asyncSend("message:sms", "我是短信消息", newSendCallback() { publicvoidonSuccess(SendResultsendResult) { System.out.println(sendResult); } publicvoidonException(Throwablee) { e.printStackTrace(); } } );
消费者端
通过 RocketMQListener 监听器来监听消息,@RocketMQMessageListener注解来指定消费者组以及topic和tags。
topic="message", (selectorExpression="sms"//tags ,consumerGroup="service-consumer" ,messageModel=MessageModel.CLUSTERING ) publicclassMessageConsumerimplementsRocketMQListener<MessageExt> { publicvoidonMessage(MessageExtmessage) { Stringmsg=newString(message.getBody(), CharsetUtil.UTF_8); log.info("消费者 {} ",msg); } }
onMessage方法是自动ack消息,如果方法中出现异常,ack失败,消息将会重试消费。
事务消息
对事务的理解见上一篇《事务消息》
事务监听器
通过实现 RocketMQLocalTransactionListener 编写本地事务监听器
//订单事务组txProducerGroup="TX_GROUP") (publicclassMyTransactionListenerimplementsRocketMQLocalTransactionListener { //执行本地事务,返回commit事务消息才会被消费者消费,我们可以在该方法中对数据库做写操作保存本地事务publicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg, Objectarg) { log.info("执行本地事务,",msg,arg); //本地事务执行成功,返回commitreturnRocketMQLocalTransactionState.COMMIT; //本地事务执行失败,返回rollback ,事务消息不会被消费//return RocketMQLocalTransactionState.ROLLBACK; } //检查本地事务状态,MQ通过该方法回查本地事务状态publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg) { byte[] payload= (byte[]) msg.getPayload(); //回查本地事务状态,如果成功,返回commitreturnRocketMQLocalTransactionState.COMMIT; //回查本地事务状态,如果失败,返回rollback,事务消息不会被消费//return RocketMQLocalTransactionState.ROLLBACK; } }
事务消息发送方
//构建消息Messagemessage=MessageBuilder.withPayload("事务消息").build(); //发送下单的事务消息TransactionSendResultresult=rocketMQTemplate.sendMessageInTransaction( "TX-GROUP", //事务组名字,需要和事务监听器的事务组名字一样。"txtopic:txtags", message, null); if(result.getSendStatus() ==SendStatus.SEND_OK){ //消息发送成功}
这里的message会传递给事务监听器的executeLocalTransaction方法中
事务消息消费方
消费者就是普通的消费者即可
topic="txtopic", (selectorExpression="txtags" ,consumerGroup="service-consumer" ,messageModel=MessageModel.CLUSTERING ) publicclassMessageConsumerimplementsRocketMQListener<MessageExt> { publicvoidonMessage(MessageExtmessage) { Stringmsg=newString(message.getBody(), CharsetUtil.UTF_8); //消费消息 } }