rocketmq分为push与pull
MQ中Pull和Push的两种消费方式
对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
(1)Pull方式
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;
(2)Push方式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
代码实现pull模式
调用方法
1. @RestController 2. @RequestMapping("/test") 3. public class TestControllor { 4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class); 5. 6. /** 7. * 使用RocketMq的生产者 8. */ 9. @Autowired 10. private DefaultMQProducer defaultMQProducer; 11. 12. @RequestMapping("/send") 13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 14. Date day = new Date(); 15. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 16. System.out.println(df.format(day)); 17. //定义tags 18. String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; 19. for (int i = 0; i < 100; i++) { 20. final int index = i; 21. String msg = "demo msg test" + ",这是第" + i + "条" + df.format(day); 22. logger.info("开始发送消息:" + i); 23. //指定每条消息发送到某个tags 24. Message sendMsg = new Message("NewMessage", tags[i % tags.length], "KEY" + i + "时间为" + df.format(day), 25. ("现在排号到:" + i).getBytes()); 26. //默认3秒超时 27. SendResult sendResult = defaultMQProducer.send(sendMsg); 28. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次"); 29. } 30. }
生产者
1. @SpringBootConfiguration 2. public class MQProducerConfiguration { 3. 4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class); 5. /** 6. * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 7. */ 8. @Value("${rocketmq.producer.groupName}") 9. private String groupName; 10. @Value("${rocketmq.producer.namesrvAddr}") 11. private String namesrvAddr; 12. /** 13. * 消息最大大小,默认4M 14. */ 15. @Value("${rocketmq.producer.maxMessageSize}") 16. private Integer maxMessageSize ; 17. /** 18. * 消息发送超时时间,默认3秒 19. */ 20. @Value("${rocketmq.producer.sendMsgTimeout}") 21. private Integer sendMsgTimeout; 22. /** 23. * 消息发送失败重试次数,默认2次 24. */ 25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}") 26. private Integer retryTimesWhenSendFailed; 27. 28. @Bean 29. public DefaultMQProducer getRocketMQProducer() throws Exception { 30. if (StringUtils.isEmpty(this.groupName)) { 31. throw new Exception("groupName is blank"); 32. } 33. if (StringUtils.isEmpty(this.namesrvAddr)) { 34. throw new Exception("nameServerAddr is blank"); 35. } 36. DefaultMQProducer producer; 37. producer = new DefaultMQProducer(this.groupName); 38. producer.setNamesrvAddr(this.namesrvAddr); 39. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName 40. //producer.setInstanceName(instanceName); 41. producer.setVipChannelEnabled(false); 42. if(this.maxMessageSize!=null){ 43. producer.setMaxMessageSize(this.maxMessageSize); 44. } 45. if(this.sendMsgTimeout!=null){ 46. producer.setSendMsgTimeout(this.sendMsgTimeout); 47. } 48. //如果发送消息失败,设置重试次数,默认为2次 49. if(this.retryTimesWhenSendFailed!=null){ 50. producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); 51. } 52. 53. try { 54. producer.start(); 55. LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]" 56. , this.groupName, this.namesrvAddr)); 57. } catch (MQClientException e) { 58. LOGGER.error(String.format("producer is error {}" 59. , e.getMessage(),e)); 60. throw new Exception(e); 61. } 62. return producer; 63. } 64. }
消费者
以负载均衡的方式拉取消息 不能指定具体队列 --MQPullConsumerScheduleService 可以指定轮询时间
1. @SpringBootConfiguration 2. public class MQConsumerConfiguration { 3. 4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); 5. @Value("${rocketmq.consumer.namesrvAddr}") 6. private String namesrvAddr; 7. @Value("${rocketmq.consumer.groupName}") 8. private String groupName; 9. @Value("${rocketmq.consumer.consumeThreadMin}") 10. private int consumeThreadMin; 11. @Value("${rocketmq.consumer.consumeThreadMax}") 12. private int consumeThreadMax; 13. @Value("${rocketmq.consumer.topics}") 14. private String topics; 15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") 16. private int consumeMessageBatchMaxSize; 17. @Autowired 18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; 19. 20. @Bean 21. public MQPullConsumerScheduleService testRocketMQConsumer() throws Exception { 22. // 1. 实例化对象 23. final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("rocketmq"); 24. 25. // 2. 设置NameServer 26. scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876"); 27. // 3. 设置消费组为集群模式 28. scheduleService.setMessageModel(MessageModel.CLUSTERING); 29. 30. // 4. 注册拉取回调函数 31. scheduleService.registerPullTaskCallback("NewMessage", new PullTaskCallback() { 32. @Override 33. public void doPullTask(MessageQueue mq, PullTaskContext context) { 34. // 5.从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。 35. MQPullConsumer consumer = context.getPullConsumer(); 36. try { 37. // 6.获取该消费组的该队列的消费进度 38. long offset = consumer.fetchConsumeOffset(mq, false); 39. if (offset < 0) { 40. offset = 0; 41. } 42. // 7.拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍 43. PullResult pullResult = consumer.pull(mq, "*", offset, 2); 44. System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); 45. switch (pullResult.getPullStatus()) { 46. case FOUND: 47. //打印消息 48. List<MessageExt> messageExtList = pullResult 49. .getMsgFoundList(); 50. for (MessageExt m : messageExtList) { 51. System.out.println(m.toString()); 52. } 53. break; 54. case NO_MATCHED_MSG: 55. break; 56. case NO_NEW_MSG: 57. case OFFSET_ILLEGAL: 58. break; 59. default: 60. break; 61. } 62. // 8.更新消费组该队列消费进度 63. consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); 64. // 9.设置下次拉取消息时间间隔,单位毫秒 65. context.setPullNextDelayTimeMillis(10000); 66. } catch (Exception e) { 67. e.printStackTrace(); 68. } 69. } 70. }); 71. 72. scheduleService.start(); 73. return scheduleService; 74. } 75. }
其中返回的结果为
PullStatus.FOUND:成功拉取消息
PullStatus.NO_NEW_MSG:没有新的消息可被拉取
PullStatus.NO_MATCHED_MSG:过滤结果不匹配
PullStatus.OFFSET_ILLEGAL:offset非法
同时在我们的控制有pull的体现
参考:https://www.cnblogs.com/zhyg/p/10451518.html
https://blog.csdn.net/zhaohongfei_358/article/details/101457563