五分钟带你玩转rocketMQ(三)spring boot整合rocketMQ

简介: 五分钟带你玩转rocketMQ(三)spring boot整合rocketMQ


一.新建spring boot项目

废话不多说。。。

二.配置application.properties

1. spring.application.name=rocketmq
2. server.port=8088
3. ###producer
4. #该应用是否启用生产者
5. rocketmq.producer.isOnOff=on
6. #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7. rocketmq.producer.groupName=${spring.application.name}
8. #mq的nameserver地址
9. rocketmq.producer.namesrvAddr=127.0.0.1:9876
10. #消息最大长度 默认1024*4(4M)
11. rocketmq.producer.maxMessageSize=4096
12. #发送消息超时时间,默认3000
13. rocketmq.producer.sendMsgTimeout=3000
14. #发送消息失败重试次数,默认2
15. rocketmq.producer.retryTimesWhenSendFailed=2
16. ###consumer
17. ##该应用是否启用消费者
18. rocketmq.consumer.isOnOff=on
19. rocketmq.consumer.groupName=${spring.application.name}
20. #mq的nameserver地址
21. rocketmq.consumer.namesrvAddr=127.0.0.1:9876
22. #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
23. rocketmq.consumer.topics=DemoTopic~*;
24. rocketmq.consumer.consumeThreadMin=20
25. rocketmq.consumer.consumeThreadMax=64
26. #设置一次消费消息的条数,默认为1条
27. rocketmq.consumer.consumeMessageBatchMaxSize=1

三.新建生产者

1. package cn.baocl.rocketmq.producer;
2. 
3. import com.alibaba.rocketmq.client.exception.MQClientException;
4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
5. import org.slf4j.Logger;
6. import org.slf4j.LoggerFactory;
7. import org.springframework.beans.factory.annotation.Value;
8. import org.springframework.boot.SpringBootConfiguration;
9. import org.springframework.context.annotation.Bean;
10. import org.springframework.util.StringUtils;
11. 
12. @SpringBootConfiguration
13. public class MQProducerConfiguration {
14. 
15. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
16. /**
17.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
18.      */
19. @Value("${rocketmq.producer.groupName}")
20. private String groupName;
21. @Value("${rocketmq.producer.namesrvAddr}")
22. private String namesrvAddr;
23. /**
24.      * 消息最大大小,默认4M
25.      */
26. @Value("${rocketmq.producer.maxMessageSize}")
27. private Integer maxMessageSize ;
28. /**
29.      * 消息发送超时时间,默认3秒
30.      */
31. @Value("${rocketmq.producer.sendMsgTimeout}")
32. private Integer sendMsgTimeout;
33. /**
34.      * 消息发送失败重试次数,默认2次
35.      */
36. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
37. private Integer retryTimesWhenSendFailed;
38. 
39. @Bean
40. public DefaultMQProducer getRocketMQProducer() throws Exception {
41. if (StringUtils.isEmpty(this.groupName)) {
42. throw new Exception("groupName is blank");
43.         }
44. if (StringUtils.isEmpty(this.namesrvAddr)) {
45. throw new Exception("nameServerAddr is blank");
46.         }
47.         DefaultMQProducer producer;
48.         producer = new DefaultMQProducer(this.groupName);
49.         producer.setNamesrvAddr(this.namesrvAddr);
50. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
51. //producer.setInstanceName(instanceName);
52.         producer.setVipChannelEnabled(false);
53. if(this.maxMessageSize!=null){
54.             producer.setMaxMessageSize(this.maxMessageSize);
55.         }
56. if(this.sendMsgTimeout!=null){
57.             producer.setSendMsgTimeout(this.sendMsgTimeout);
58.         }
59. //如果发送消息失败,设置重试次数,默认为2次
60. if(this.retryTimesWhenSendFailed!=null){
61.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
62.         }
63. 
64. try {
65.             producer.start();
66.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
67.                     , this.groupName, this.namesrvAddr));
68.         } catch (MQClientException e) {
69.             LOGGER.error(String.format("producer is error {}"
70.                     , e.getMessage(),e));
71. throw new Exception(e);
72.         }
73. return producer;
74.     }
75. }

4.新建消费者

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import org.slf4j.Logger;
8. import org.slf4j.LoggerFactory;
9. import org.springframework.beans.factory.annotation.Autowired;
10. import org.springframework.beans.factory.annotation.Value;
11. import org.springframework.boot.SpringBootConfiguration;
12. import org.springframework.context.annotation.Bean;
13. import org.springframework.util.StringUtils;
14. 
15. 
16. @SpringBootConfiguration
17. public class MQConsumerConfiguration {
18. 
19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
20. @Value("${rocketmq.consumer.namesrvAddr}")
21. private String namesrvAddr;
22. @Value("${rocketmq.consumer.groupName}")
23. private String groupName;
24. @Value("${rocketmq.consumer.consumeThreadMin}")
25. private int consumeThreadMin;
26. @Value("${rocketmq.consumer.consumeThreadMax}")
27. private int consumeThreadMax;
28. @Value("${rocketmq.consumer.topics}")
29. private String topics;
30. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
31. private int consumeMessageBatchMaxSize;
32. @Autowired
33. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
34. 
35. @Bean
36. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
37. if (StringUtils.isEmpty(groupName)){
38. throw new Exception("groupName is null !!!");
39.         }
40. if (StringUtils.isEmpty(namesrvAddr)){
41. throw new Exception("namesrvAddr is null !!!");
42.         }
43. if(StringUtils.isEmpty(topics)){
44. throw new Exception("topics is null !!!");
45.         }
46.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
47.         consumer.setNamesrvAddr(namesrvAddr);
48.         consumer.setConsumeThreadMin(consumeThreadMin);
49.         consumer.setConsumeThreadMax(consumeThreadMax);
50.         consumer.registerMessageListener(mqMessageListenerProcessor);
51. 
52. /**
53.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
54.          * 如果非第一次启动,那么按照上次消费的位置继续消费
55.          */
56.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
57. /**
58.          * 设置消费模型,集群还是广播,默认为集群
59.          */
60. //consumer.setMessageModel(MessageModel.CLUSTERING);
61. /**
62.          * 设置一次消费消息的条数,默认为1条
63.          */
64.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
65. try {
66. /**
67.              * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
68.              */
69.             String[] topicTagsArr = topics.split(";");
70. for (String topicTags : topicTagsArr) {
71.                 String[] topicTag = topicTags.split("~");
72.                 consumer.subscribe(topicTag[0],topicTag[1]);
73.             }
74.             consumer.start();
75.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
76.         }catch (MQClientException e){
77.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
78. throw new Exception(e);
79.         }
80. return consumer;
81.     }
82. }

5.新建处理类

1. package cn.baocl.rocketmq.processor;
2. 
3. 
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. import org.springframework.util.CollectionUtils;
12. 
13. import java.util.List;
14. 
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. /**
19.      *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
20.      *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
21.      */
22. @Override
23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
24. if(CollectionUtils.isEmpty(msgs)){
25.             logger.info("接受到的消息为空,不处理,直接返回成功");
26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
27.         }
28. MessageExt messageExt = msgs.get(0);
29.         logger.info("接受到的消息为:"+messageExt.toString());
30. if(messageExt.getTopic().equals("你的Topic")){
31. if(messageExt.getTags().equals("你的Tag")){
32. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
33. //TODO 获取该消息重试次数
34. int reconsume = messageExt.getReconsumeTimes();
35. if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
37.                 }
38. //TODO 处理对应的业务逻辑
39.             }
40.         }
41. // 如果没有return success ,consumer会重新消费该消息,直到return success
42. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43.     }
44. }

6.接口调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.SendCallback;
8. import com.alibaba.rocketmq.client.producer.SendResult;
9. import com.alibaba.rocketmq.common.message.Message;
10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
11. import org.slf4j.Logger;
12. import org.slf4j.LoggerFactory;
13. import org.springframework.beans.factory.annotation.Autowired;
14. import org.springframework.web.bind.annotation.RequestMapping;
15. import org.springframework.web.bind.annotation.RestController;
16. 
17. 
18. @RestController
19. @RequestMapping("/test")
20. public class TestControllor {
21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
22. 
23. /**
24.      * 使用RocketMq的生产者
25.      */
26. @Autowired
27. private DefaultMQProducer defaultMQProducer;
28. 
29. @RequestMapping("/send")
30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
31. String msg = "demo msg test";;
32.         logger.info("开始发送消息:" + msg);
33. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes());        
34. //默认3秒超时
35. SendResult sendResult = defaultMQProducer.send(sendMsg);
36.         logger.info("消息发送响应信息:" + sendResult.toString());
37.     }
38. }

如果出现org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.99.1:10909> fail  在生产者中加入

producer.setVipChannelEnabled(false);


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
102 0
|
7月前
|
消息中间件 弹性计算 Java
Rocketmq-spring入门与实践
本场景带您体验如何在 Spring 生态中优雅地使用 Apache RocketMQ,感受最受欢迎业务开发框架与最受欢迎消息平台结合的魅力。
404 0
|
3月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息
|
7月前
|
消息中间件 XML Java
Kafka与Spring的整合使用
Kafka与Spring的整合使用
78 0
|
9月前
|
消息中间件 Java API
RocketMQ极简入门-在SpringBoot中使用RocketMQ
现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例
410 0
|
9月前
|
消息中间件 Java API
八.RocketMQ极简入门-在SpringBoot中使用RocketMQ
RocketMQ极简入门-在SpringBoot中使用RocketMQ
|
9月前
|
Java
SpringBoot整合RocketMQ
SpringBoot整合RocketMQ
304 0
|
11月前
|
消息中间件 存储 Java
Springboot整合RocketMq
Springboot整合RocketMq
|
消息中间件 Java RocketMQ
RocketMQ-Spring学习
们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。
1034 0
RocketMQ-Spring学习
|
消息中间件 Java 中间件
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
1509 0
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息