RocketMQ整合Spring的一个项目在apache中可以看到是rocketmq-spring。其提供了整合spring的方便使用方式。
一、rocketmq-spring中推拉模式的配置
下面我们来看看拉取和监听两种方式的消费模式的相关配置。
在Rocketmq整合Spring的过程中,我之前配置过这样的配置:
rocketmq name-server 192.168.0.103 8080; 192.168.0.1028080 tag TAG_TEST consumer access-key test_uat secret-key123456 tag TAG_TEST topic TEST_TOPIC group GID_TEST_GROUP enable-msg-tracetrue access-channel CLOUD pull-consumer group GID_TEST_GROUP topic TEST_TOPIC
这个配置其实是配置了两种模式的配置,可以在消息诊断可以看到会存在多个消费者线程的情况,虽然诊断的结果是没有问题的,但是消费会出现消费不到,产生消息丢失的情况。为什么这么说呢?
二、监听下的DefaultMQPushConsumer
从RocketMQ-Spring中,我们可以看到我们如果使用监听的方式进行消费的话,其实其会有一个配置是支持我们去做消费的,那就是RocketMQMessageListener这个类中的配置:org.apache.rocketmq.spring.annotation.RocketMQMessageListener
StringNAME_SERVER_PLACEHOLDER="${rocketmq.name-server:}"; StringACCESS_KEY_PLACEHOLDER="${rocketmq.push-consumer.access-key:}"; StringSECRET_KEY_PLACEHOLDER="${rocketmq.push-consumer.secret-key:}"; StringTRACE_TOPIC_PLACEHOLDER="${rocketmq.push-consumer.customized-trace-topic:}"; StringACCESS_CHANNEL_PLACEHOLDER="${rocketmq.access-channel:}"; StringconsumerGroup(); Stringtopic(); SelectorTypeselectorType() defaultSelectorType.TAG; StringselectorExpression() default"*"; ConsumeModeconsumeMode() defaultConsumeMode.CONCURRENTLY; MessageModelmessageModel() defaultMessageModel.CLUSTERING; intconsumeThreadMax() default64; intconsumeThreadNumber() default20; intmaxReconsumeTimes() default-1; longconsumeTimeout() default15L; intreplyTimeout() default3000; StringaccessKey() defaultACCESS_KEY_PLACEHOLDER; StringsecretKey() defaultSECRET_KEY_PLACEHOLDER; booleanenableMsgTrace() defaultfalse; StringcustomizedTraceTopic() defaultTRACE_TOPIC_PLACEHOLDER; StringnameServer() defaultNAME_SERVER_PLACEHOLDER; StringaccessChannel() defaultACCESS_CHANNEL_PLACEHOLDER; StringtlsEnable() default"false"; Stringnamespace() default""; /*** Message consume retry strategy in concurrently mode.** -1,no retry,put into DLQ directly* 0,broker control retry frequency* >0,client control retry frequency*/intdelayLevelWhenNextConsume() default0; intsuspendCurrentQueueTimeMillis() default1000; intawaitTerminationMillisWhenShutdown() default1000; StringinstanceName() default"DEFAULT";
从这个配置中,我们可以看到其存在相关的配置:其中可以看到最大的消费线程数已经被废弃,取而代之的是consumeThreadNumber,默认是20个线程执行拉取消费操作。消费的模式采用并行消费:ConsumeMode.CONCURRENTLY。消息模式是采用的集群模式。同时根据占位符可以看到基本覆盖了我们配置的yml配置。
我们知道只有注册到Spring中的bean才能被Spring处理。因此我们可以看到:
publicvoidafterSingletonsInstantiated() { //拿到带RocketMQMessageListener的注解,同时将其进行注册Map<String, Object>beans=this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class) .entrySet().stream().filter(entry->!ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); }
注册到容器中:
privatevoidregisterContainer(StringbeanName, Objectbean) { Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean); //拿到监听,获取消费者组、主题RocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class); StringconsumerGroup=this.environment.resolvePlaceholders(annotation.consumerGroup()); Stringtopic=this.environment.resolvePlaceholders(annotation.topic()); //获取监听,是否开启了监听booleanlistenerEnabled= (boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); //容器bean名称StringcontainerBeanName=String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContextgenericApplicationContext= (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () ->createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainercontainer=genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { //启动容器container.start(); } catch (Exceptione) { log.error("Started container failed. {}", container, e); thrownewRuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); }
启动容器的本质是启动DefaultMQPushConsumer。
publicvoidstart() throwsMQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); //启动默认推模式的消费方式this.defaultMQPushConsumerImpl.start(); if (null!=traceDispatcher) { try { //链路追踪traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientExceptione) { log.warn("trace dispatcher start failed ", e); } } }
三、litepull方式的消费DefaultListPushConsumer
那我们再来看看拉取的方式,其默认的使用litePull的方式RocketMQAutoConfiguration:这个自动注入的条件是
CONSUMER_BEAN_NAME) (DefaultLitePullConsumer.class) (prefix="rocketmq", value= {"name-server", "pull-consumer.group", "pull-consumer.topic"}) (publicDefaultLitePullConsumerdefaultLitePullConsumer(RocketMQPropertiesrocketMQProperties) throwsMQClientException { RocketMQProperties.PullConsumerconsumerConfig=rocketMQProperties.getPullConsumer(); StringnameServer=rocketMQProperties.getNameServer(); StringgroupName=consumerConfig.getGroup(); StringtopicName=consumerConfig.getTopic(); Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null"); Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null"); StringaccessChannel=rocketMQProperties.getAccessChannel(); MessageModelmessageModel=MessageModel.valueOf(consumerConfig.getMessageModel()); SelectorTypeselectorType=SelectorType.valueOf(consumerConfig.getSelectorType()); StringselectorExpression=consumerConfig.getSelectorExpression(); Stringak=consumerConfig.getAccessKey(); Stringsk=consumerConfig.getSecretKey(); intpullBatchSize=consumerConfig.getPullBatchSize(); booleanuseTLS=consumerConfig.isTlsEnable(); DefaultLitePullConsumerlitePullConsumer=RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS); litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace()); litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic()); litePullConsumer.setNamespace(consumerConfig.getNamespace()); litePullConsumer.setInstanceName(consumerConfig.getInstanceName()); log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s", groupName, topicName,nameServer)); returnlitePullConsumer; }
从ISSUE中可以看到其是为了支持litepull消费消息的,[ISSUE #306] Support real LitePullMessage in RocketMQ-Spring (#307) 。因此,如果配置了rocketmq为前缀,同时采用pull-consumer的方式的话,此时就可以采用这种方式注入。同时其出现在ExtRocketMQConsumerConfiguration配置中。通常这种方式 适合于实现扩展类,比如下面这种:
topic="${demo.rocketmq.topic}", group="string_consumer", tlsEnable="${demo.ext.consumer.tlsEnable}") (publicclassExtRocketMQTemplateextendsRocketMQTemplate { }
因此,如果使用了pull,就不要使用push。否则就会产生冲突,在一个项目中。可以看到启动的是DefaultLitePullConsumer。
privatevoidregisterTemplate(StringbeanName, Objectbean) { Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean); // 获取扩展配置消费者的配置ExtRocketMQConsumerConfigurationannotation=clazz.getAnnotation(ExtRocketMQConsumerConfiguration.class); GenericApplicationContextgenericApplicationContext= (GenericApplicationContext) applicationContext; DefaultLitePullConsumerconsumer=null; try { //创建消费者consumer=createConsumer(annotation); //启动消费者DefaultLitePullConsumerconsumer.start(); } catch (Exceptione) { log.error("Failed to startup PullConsumer for RocketMQTemplate {}", beanName, e); } RocketMQTemplaterocketMQTemplate= (RocketMQTemplate) bean; rocketMQTemplate.setConsumer(consumer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); log.info("Set real consumer to :{} {}", beanName, annotation.value()); }
也即此时验证我们看到诊断信息出现的两个消费者的情况。通过 实践可以知道两者目前不可以共存。否则会出现竞争的情况。
同时可以看到基于RocketMQ模板方式的生产者和消费者分别为:
privateDefaultMQProducerproducer; privateDefaultLitePullConsumerconsumer;
也即模板中的生产者是基于默认MQ生产,而消费者则是采用默认的litepull消费。
四、两者有什么区别
litepull模式下只是通过poll方法批量拉取消息,缺乏重试机制,而push方式可以进行重试,并且失败后,会重试16次。那如果使用了litepull的方式,如果出现了失败,需要如何补偿呢?此时需要在业务系统中,获取offset,然后重置offset,进行重新消费,同时需要进行幂等处理。