五分钟带你玩转rocketMQ(三)spring boot整合rocketMQ

简介: 五分钟带你玩转rocketMQ(三)spring boot整合rocketMQ


一.新建spring boot项目

废话不多说。。。

二.配置application.properties

1. spring.application.name=rocketmq
2. server.port=8088
3. ###producer
4. #该应用是否启用生产者
5. rocketmq.producer.isOnOff=on
6. #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7. rocketmq.producer.groupName=${spring.application.name}
8. #mq的nameserver地址
9. rocketmq.producer.namesrvAddr=127.0.0.1:9876
10. #消息最大长度 默认1024*4(4M)
11. rocketmq.producer.maxMessageSize=4096
12. #发送消息超时时间,默认3000
13. rocketmq.producer.sendMsgTimeout=3000
14. #发送消息失败重试次数,默认2
15. rocketmq.producer.retryTimesWhenSendFailed=2
16. ###consumer
17. ##该应用是否启用消费者
18. rocketmq.consumer.isOnOff=on
19. rocketmq.consumer.groupName=${spring.application.name}
20. #mq的nameserver地址
21. rocketmq.consumer.namesrvAddr=127.0.0.1:9876
22. #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
23. rocketmq.consumer.topics=DemoTopic~*;
24. rocketmq.consumer.consumeThreadMin=20
25. rocketmq.consumer.consumeThreadMax=64
26. #设置一次消费消息的条数,默认为1条
27. rocketmq.consumer.consumeMessageBatchMaxSize=1

三.新建生产者

1. package cn.baocl.rocketmq.producer;
2. 
3. import com.alibaba.rocketmq.client.exception.MQClientException;
4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
5. import org.slf4j.Logger;
6. import org.slf4j.LoggerFactory;
7. import org.springframework.beans.factory.annotation.Value;
8. import org.springframework.boot.SpringBootConfiguration;
9. import org.springframework.context.annotation.Bean;
10. import org.springframework.util.StringUtils;
11. 
12. @SpringBootConfiguration
13. public class MQProducerConfiguration {
14. 
15. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
16. /**
17.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
18.      */
19. @Value("${rocketmq.producer.groupName}")
20. private String groupName;
21. @Value("${rocketmq.producer.namesrvAddr}")
22. private String namesrvAddr;
23. /**
24.      * 消息最大大小,默认4M
25.      */
26. @Value("${rocketmq.producer.maxMessageSize}")
27. private Integer maxMessageSize ;
28. /**
29.      * 消息发送超时时间,默认3秒
30.      */
31. @Value("${rocketmq.producer.sendMsgTimeout}")
32. private Integer sendMsgTimeout;
33. /**
34.      * 消息发送失败重试次数,默认2次
35.      */
36. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
37. private Integer retryTimesWhenSendFailed;
38. 
39. @Bean
40. public DefaultMQProducer getRocketMQProducer() throws Exception {
41. if (StringUtils.isEmpty(this.groupName)) {
42. throw new Exception("groupName is blank");
43.         }
44. if (StringUtils.isEmpty(this.namesrvAddr)) {
45. throw new Exception("nameServerAddr is blank");
46.         }
47.         DefaultMQProducer producer;
48.         producer = new DefaultMQProducer(this.groupName);
49.         producer.setNamesrvAddr(this.namesrvAddr);
50. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
51. //producer.setInstanceName(instanceName);
52.         producer.setVipChannelEnabled(false);
53. if(this.maxMessageSize!=null){
54.             producer.setMaxMessageSize(this.maxMessageSize);
55.         }
56. if(this.sendMsgTimeout!=null){
57.             producer.setSendMsgTimeout(this.sendMsgTimeout);
58.         }
59. //如果发送消息失败,设置重试次数,默认为2次
60. if(this.retryTimesWhenSendFailed!=null){
61.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
62.         }
63. 
64. try {
65.             producer.start();
66.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
67.                     , this.groupName, this.namesrvAddr));
68.         } catch (MQClientException e) {
69.             LOGGER.error(String.format("producer is error {}"
70.                     , e.getMessage(),e));
71. throw new Exception(e);
72.         }
73. return producer;
74.     }
75. }

4.新建消费者

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import org.slf4j.Logger;
8. import org.slf4j.LoggerFactory;
9. import org.springframework.beans.factory.annotation.Autowired;
10. import org.springframework.beans.factory.annotation.Value;
11. import org.springframework.boot.SpringBootConfiguration;
12. import org.springframework.context.annotation.Bean;
13. import org.springframework.util.StringUtils;
14. 
15. 
16. @SpringBootConfiguration
17. public class MQConsumerConfiguration {
18. 
19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
20. @Value("${rocketmq.consumer.namesrvAddr}")
21. private String namesrvAddr;
22. @Value("${rocketmq.consumer.groupName}")
23. private String groupName;
24. @Value("${rocketmq.consumer.consumeThreadMin}")
25. private int consumeThreadMin;
26. @Value("${rocketmq.consumer.consumeThreadMax}")
27. private int consumeThreadMax;
28. @Value("${rocketmq.consumer.topics}")
29. private String topics;
30. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
31. private int consumeMessageBatchMaxSize;
32. @Autowired
33. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
34. 
35. @Bean
36. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
37. if (StringUtils.isEmpty(groupName)){
38. throw new Exception("groupName is null !!!");
39.         }
40. if (StringUtils.isEmpty(namesrvAddr)){
41. throw new Exception("namesrvAddr is null !!!");
42.         }
43. if(StringUtils.isEmpty(topics)){
44. throw new Exception("topics is null !!!");
45.         }
46.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
47.         consumer.setNamesrvAddr(namesrvAddr);
48.         consumer.setConsumeThreadMin(consumeThreadMin);
49.         consumer.setConsumeThreadMax(consumeThreadMax);
50.         consumer.registerMessageListener(mqMessageListenerProcessor);
51. 
52. /**
53.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
54.          * 如果非第一次启动,那么按照上次消费的位置继续消费
55.          */
56.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
57. /**
58.          * 设置消费模型,集群还是广播,默认为集群
59.          */
60. //consumer.setMessageModel(MessageModel.CLUSTERING);
61. /**
62.          * 设置一次消费消息的条数,默认为1条
63.          */
64.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
65. try {
66. /**
67.              * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
68.              */
69.             String[] topicTagsArr = topics.split(";");
70. for (String topicTags : topicTagsArr) {
71.                 String[] topicTag = topicTags.split("~");
72.                 consumer.subscribe(topicTag[0],topicTag[1]);
73.             }
74.             consumer.start();
75.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
76.         }catch (MQClientException e){
77.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
78. throw new Exception(e);
79.         }
80. return consumer;
81.     }
82. }

5.新建处理类

1. package cn.baocl.rocketmq.processor;
2. 
3. 
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. import org.springframework.util.CollectionUtils;
12. 
13. import java.util.List;
14. 
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. /**
19.      *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
20.      *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
21.      */
22. @Override
23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
24. if(CollectionUtils.isEmpty(msgs)){
25.             logger.info("接受到的消息为空,不处理,直接返回成功");
26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
27.         }
28. MessageExt messageExt = msgs.get(0);
29.         logger.info("接受到的消息为:"+messageExt.toString());
30. if(messageExt.getTopic().equals("你的Topic")){
31. if(messageExt.getTags().equals("你的Tag")){
32. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
33. //TODO 获取该消息重试次数
34. int reconsume = messageExt.getReconsumeTimes();
35. if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
37.                 }
38. //TODO 处理对应的业务逻辑
39.             }
40.         }
41. // 如果没有return success ,consumer会重新消费该消息,直到return success
42. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43.     }
44. }

6.接口调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.SendCallback;
8. import com.alibaba.rocketmq.client.producer.SendResult;
9. import com.alibaba.rocketmq.common.message.Message;
10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
11. import org.slf4j.Logger;
12. import org.slf4j.LoggerFactory;
13. import org.springframework.beans.factory.annotation.Autowired;
14. import org.springframework.web.bind.annotation.RequestMapping;
15. import org.springframework.web.bind.annotation.RestController;
16. 
17. 
18. @RestController
19. @RequestMapping("/test")
20. public class TestControllor {
21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
22. 
23. /**
24.      * 使用RocketMq的生产者
25.      */
26. @Autowired
27. private DefaultMQProducer defaultMQProducer;
28. 
29. @RequestMapping("/send")
30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
31. String msg = "demo msg test";;
32.         logger.info("开始发送消息:" + msg);
33. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes());        
34. //默认3秒超时
35. SendResult sendResult = defaultMQProducer.send(sendMsg);
36.         logger.info("消息发送响应信息:" + sendResult.toString());
37.     }
38. }

如果出现org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.99.1:10909> fail  在生产者中加入

producer.setVipChannelEnabled(false);


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
7月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1055 3
|
5月前
|
消息中间件 Java Maven
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
78 0
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
413 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成

热门文章

最新文章