一.新建spring boot项目
废话不多说。。。
二.配置application.properties
1. spring.application.name=rocketmq 2. server.port=8088 3. ###producer 4. #该应用是否启用生产者 5. rocketmq.producer.isOnOff=on 6. #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 7. rocketmq.producer.groupName=${spring.application.name} 8. #mq的nameserver地址 9. rocketmq.producer.namesrvAddr=127.0.0.1:9876 10. #消息最大长度 默认1024*4(4M) 11. rocketmq.producer.maxMessageSize=4096 12. #发送消息超时时间,默认3000 13. rocketmq.producer.sendMsgTimeout=3000 14. #发送消息失败重试次数,默认2 15. rocketmq.producer.retryTimesWhenSendFailed=2 16. ###consumer 17. ##该应用是否启用消费者 18. rocketmq.consumer.isOnOff=on 19. rocketmq.consumer.groupName=${spring.application.name} 20. #mq的nameserver地址 21. rocketmq.consumer.namesrvAddr=127.0.0.1:9876 22. #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*; 23. rocketmq.consumer.topics=DemoTopic~*; 24. rocketmq.consumer.consumeThreadMin=20 25. rocketmq.consumer.consumeThreadMax=64 26. #设置一次消费消息的条数,默认为1条 27. rocketmq.consumer.consumeMessageBatchMaxSize=1
三.新建生产者
1. package cn.baocl.rocketmq.producer; 2. 3. import com.alibaba.rocketmq.client.exception.MQClientException; 4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer; 5. import org.slf4j.Logger; 6. import org.slf4j.LoggerFactory; 7. import org.springframework.beans.factory.annotation.Value; 8. import org.springframework.boot.SpringBootConfiguration; 9. import org.springframework.context.annotation.Bean; 10. import org.springframework.util.StringUtils; 11. 12. @SpringBootConfiguration 13. public class MQProducerConfiguration { 14. 15. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class); 16. /** 17. * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 18. */ 19. @Value("${rocketmq.producer.groupName}") 20. private String groupName; 21. @Value("${rocketmq.producer.namesrvAddr}") 22. private String namesrvAddr; 23. /** 24. * 消息最大大小,默认4M 25. */ 26. @Value("${rocketmq.producer.maxMessageSize}") 27. private Integer maxMessageSize ; 28. /** 29. * 消息发送超时时间,默认3秒 30. */ 31. @Value("${rocketmq.producer.sendMsgTimeout}") 32. private Integer sendMsgTimeout; 33. /** 34. * 消息发送失败重试次数,默认2次 35. */ 36. @Value("${rocketmq.producer.retryTimesWhenSendFailed}") 37. private Integer retryTimesWhenSendFailed; 38. 39. @Bean 40. public DefaultMQProducer getRocketMQProducer() throws Exception { 41. if (StringUtils.isEmpty(this.groupName)) { 42. throw new Exception("groupName is blank"); 43. } 44. if (StringUtils.isEmpty(this.namesrvAddr)) { 45. throw new Exception("nameServerAddr is blank"); 46. } 47. DefaultMQProducer producer; 48. producer = new DefaultMQProducer(this.groupName); 49. producer.setNamesrvAddr(this.namesrvAddr); 50. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName 51. //producer.setInstanceName(instanceName); 52. producer.setVipChannelEnabled(false); 53. if(this.maxMessageSize!=null){ 54. producer.setMaxMessageSize(this.maxMessageSize); 55. } 56. if(this.sendMsgTimeout!=null){ 57. producer.setSendMsgTimeout(this.sendMsgTimeout); 58. } 59. //如果发送消息失败,设置重试次数,默认为2次 60. if(this.retryTimesWhenSendFailed!=null){ 61. producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); 62. } 63. 64. try { 65. producer.start(); 66. LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]" 67. , this.groupName, this.namesrvAddr)); 68. } catch (MQClientException e) { 69. LOGGER.error(String.format("producer is error {}" 70. , e.getMessage(),e)); 71. throw new Exception(e); 72. } 73. return producer; 74. } 75. }
4.新建消费者
1. package cn.baocl.rocketmq.consumer; 2. 3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor; 4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; 5. import com.alibaba.rocketmq.client.exception.MQClientException; 6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; 7. import org.slf4j.Logger; 8. import org.slf4j.LoggerFactory; 9. import org.springframework.beans.factory.annotation.Autowired; 10. import org.springframework.beans.factory.annotation.Value; 11. import org.springframework.boot.SpringBootConfiguration; 12. import org.springframework.context.annotation.Bean; 13. import org.springframework.util.StringUtils; 14. 15. 16. @SpringBootConfiguration 17. public class MQConsumerConfiguration { 18. 19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); 20. @Value("${rocketmq.consumer.namesrvAddr}") 21. private String namesrvAddr; 22. @Value("${rocketmq.consumer.groupName}") 23. private String groupName; 24. @Value("${rocketmq.consumer.consumeThreadMin}") 25. private int consumeThreadMin; 26. @Value("${rocketmq.consumer.consumeThreadMax}") 27. private int consumeThreadMax; 28. @Value("${rocketmq.consumer.topics}") 29. private String topics; 30. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") 31. private int consumeMessageBatchMaxSize; 32. @Autowired 33. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; 34. 35. @Bean 36. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception { 37. if (StringUtils.isEmpty(groupName)){ 38. throw new Exception("groupName is null !!!"); 39. } 40. if (StringUtils.isEmpty(namesrvAddr)){ 41. throw new Exception("namesrvAddr is null !!!"); 42. } 43. if(StringUtils.isEmpty(topics)){ 44. throw new Exception("topics is null !!!"); 45. } 46. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); 47. consumer.setNamesrvAddr(namesrvAddr); 48. consumer.setConsumeThreadMin(consumeThreadMin); 49. consumer.setConsumeThreadMax(consumeThreadMax); 50. consumer.registerMessageListener(mqMessageListenerProcessor); 51. 52. /** 53. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 54. * 如果非第一次启动,那么按照上次消费的位置继续消费 55. */ 56. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 57. /** 58. * 设置消费模型,集群还是广播,默认为集群 59. */ 60. //consumer.setMessageModel(MessageModel.CLUSTERING); 61. /** 62. * 设置一次消费消息的条数,默认为1条 63. */ 64. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); 65. try { 66. /** 67. * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3 68. */ 69. String[] topicTagsArr = topics.split(";"); 70. for (String topicTags : topicTagsArr) { 71. String[] topicTag = topicTags.split("~"); 72. consumer.subscribe(topicTag[0],topicTag[1]); 73. } 74. consumer.start(); 75. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); 76. }catch (MQClientException e){ 77. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); 78. throw new Exception(e); 79. } 80. return consumer; 81. } 82. }
5.新建处理类
1. package cn.baocl.rocketmq.processor; 2. 3. 4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; 7. import com.alibaba.rocketmq.common.message.MessageExt; 8. import org.slf4j.Logger; 9. import org.slf4j.LoggerFactory; 10. import org.springframework.stereotype.Component; 11. import org.springframework.util.CollectionUtils; 12. 13. import java.util.List; 14. 15. @Component 16. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { 17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); 18. /** 19. * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/> 20. * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS 21. */ 22. @Override 23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 24. if(CollectionUtils.isEmpty(msgs)){ 25. logger.info("接受到的消息为空,不处理,直接返回成功"); 26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 27. } 28. MessageExt messageExt = msgs.get(0); 29. logger.info("接受到的消息为:"+messageExt.toString()); 30. if(messageExt.getTopic().equals("你的Topic")){ 31. if(messageExt.getTags().equals("你的Tag")){ 32. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重) 33. //TODO 获取该消息重试次数 34. int reconsume = messageExt.getReconsumeTimes(); 35. if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功 36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 37. } 38. //TODO 处理对应的业务逻辑 39. } 40. } 41. // 如果没有return success ,consumer会重新消费该消息,直到return success 42. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 43. } 44. }
6.接口调用
1. package cn.baocl.rocketmq.controllor; 2. 3. import cn.baocl.rocketmq.entity.TestVo; 4. import com.alibaba.rocketmq.client.exception.MQBrokerException; 5. import com.alibaba.rocketmq.client.exception.MQClientException; 6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer; 7. import com.alibaba.rocketmq.client.producer.SendCallback; 8. import com.alibaba.rocketmq.client.producer.SendResult; 9. import com.alibaba.rocketmq.common.message.Message; 10. import com.alibaba.rocketmq.remoting.exception.RemotingException; 11. import org.slf4j.Logger; 12. import org.slf4j.LoggerFactory; 13. import org.springframework.beans.factory.annotation.Autowired; 14. import org.springframework.web.bind.annotation.RequestMapping; 15. import org.springframework.web.bind.annotation.RestController; 16. 17. 18. @RestController 19. @RequestMapping("/test") 20. public class TestControllor { 21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 22. 23. /** 24. * 使用RocketMq的生产者 25. */ 26. @Autowired 27. private DefaultMQProducer defaultMQProducer; 28. 29. @RequestMapping("/send") 30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 31. String msg = "demo msg test";; 32. logger.info("开始发送消息:" + msg); 33. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes()); 34. //默认3秒超时 35. SendResult sendResult = defaultMQProducer.send(sendMsg); 36. logger.info("消息发送响应信息:" + sendResult.toString()); 37. } 38. }
如果出现org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.99.1:10909> fail 在生产者中加入
producer.setVipChannelEnabled(false);