RocketMQ Binder集成消息订阅

简介: RocketMQ Binder集成消息订阅

RocketMQ Binder集成消息订阅

AbstractMessageChannelBinder类中提供了创建MessageProducer的协议,在初始化Binder的时候加载createConsumerEndpoint方法

RocketMQMessageChannelBinder完成RocketMQInboundChannelAdapter的创建和初始化

RocketMQMessageChannelBinder的createConsumerEndpoint方法:

RocketMQInboundChannelAdapter是适配器,需要适配Spring Framework的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer是对RocketMQ客户端API的封装,适配器中持有它的对象。

RocketMQ提供两种消费模式:顺序消费和并发消费。RocketMQ客户端API中顺序消费的默认监听器是DefaultMessageListenerOrderly,并发消费的默认监听器是DefaultMessageListenerConcurrently类,无论哪个消费模式,监听器收到的消息都会回调RocketMQListener

RocketMQInboundChannelAdapter中创建和初始化RocketMQListener的实现类

RocketMQInboundChannelAdapter

DefaultMessageListenerOrderly收到RocketMQ消息后,先回调BindingRocketMQListener的onMessage方法,再调用RocketMQInboundChannelAdapter父类的sendMessage方法将消息发送到DirectChannel

Spring Cloud Stream的接收消息和发送消息的消息模型是一致的,Binder中接收的消息先发送到MessageChannel,由订阅的MessageChannel通过Dispatcher转发到对应的MessageHandler进行处理。

image-20211008204111598

RocketMQInboundChannelAdapter的父类MessageProducerSupport的getOutputChannel()得到的MessageChannel是在初始化RocketMQ Binder时传入的DirectChannel

MessageProducerSupport的getOutputChannel方法:

MessagingTemplate继承GenericMessagingTemplate类,实际执行doSend()方法发送消息

MessageChannel的实例是DirectChannel对象,复用前面消息发送流程,通过消息分发类MessageDispatcher把消息分发给MessageHandler

DirectChannel对应的消息处理器是StreamListenerMessageHandler

InvocableHandlerMethod使用java反射机制完成回调,StreamListenerMessageHandler与@

StreamListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法:

在Spring容器管理的所有单例对象初始化完成之后,遍历StreamListenerHandlerMethodMapping,进行InvocableHandlerMethod和StreamListenerMessageHandler的创建和初始化

StreamListenerHandlerMethodMapping保存了StreamListener和InvocableHandlerMethod的映射关系,映射关系的创建是在StreamListenerAnnotationBeanPostProcessor的postProcessAfterInitialization()方法

@Override
public final Object postProcessAfterInitialization(Object bean, final String beanName)
      throws BeansException {
   Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean)
         : bean.getClass();
   Method[] uniqueDeclaredMethods = ReflectionUtils
         .getUniqueDeclaredMethods(targetClass);
   for (Method method : uniqueDeclaredMethods) {
      StreamListener streamListener = AnnotatedElementUtils
            .findMergedAnnotation(method, StreamListener.class);
      if (streamListener != null && !method.isBridge()) {
         this.streamListenerCallbacks.add(() -> {
            Assert.isTrue(method.getAnnotation(Input.class) == null,
                  StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
            this.doPostProcess(streamListener, method, bean);
         });
      }
   }
   return bean;
}
private void doPostProcess(StreamListener streamListener, Method method,
            Object bean) {
        streamListener = postProcessAnnotation(streamListener, method);
        Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional;
        orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream()
                .filter(t -> t.supports(method)).findFirst();
        Assert.isTrue(orchestratorOptional.isPresent(),
                "A matching StreamListenerSetupMethodOrchestrator must be present");
        StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional
                .get();
        streamListenerSetupMethodOrchestrator
                .orchestrateStreamListenerSetupMethod(streamListener, method, bean);
    }

@Override
        public void orchestrateStreamListenerSetupMethod(StreamListener streamListener,
                Method method, Object bean) {
            String methodAnnotatedInboundName = streamListener.value();

            String methodAnnotatedOutboundName = StreamListenerMethodUtils
                    .getOutboundBindingTargetName(method);
            int inputAnnotationCount = StreamListenerMethodUtils
                    .inputAnnotationCount(method);
            int outputAnnotationCount = StreamListenerMethodUtils
                    .outputAnnotationCount(method);
            boolean isDeclarative = checkDeclarativeMethod(method,
                    methodAnnotatedInboundName, methodAnnotatedOutboundName);
            StreamListenerMethodUtils.validateStreamListenerMethod(method,
                    inputAnnotationCount, outputAnnotationCount,
                    methodAnnotatedInboundName, methodAnnotatedOutboundName,
                    isDeclarative, streamListener.condition());
            if (isDeclarative) {
                StreamListenerParameterAdapter[] toSlpaArray;
                toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters
                        .size()];
                Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(
                        method, methodAnnotatedInboundName, this.applicationContext,
                        this.streamListenerParameterAdapters.toArray(toSlpaArray));
                invokeStreamListenerResultAdapter(method, bean,
                        methodAnnotatedOutboundName, adaptedInboundArguments);
            }
            else {
                registerHandlerMethodOnListenedChannel(method, streamListener, bean);
            }
        }

private void registerHandlerMethodOnListenedChannel(Method method,
                StreamListener streamListener, Object bean) {
            Assert.hasText(streamListener.value(), "The binding name cannot be null");
            if (!StringUtils.hasText(streamListener.value())) {
                throw new BeanInitializationException(
                        "A bound component name must be specified");
            }
            final String defaultOutputChannel = StreamListenerMethodUtils
                    .getOutboundBindingTargetName(method);
            if (Void.TYPE.equals(method.getReturnType())) {
                Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel),
                        "An output channel cannot be specified for a method that does not return a value");
            }
            else {
                Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel),
                        "An output channel must be specified for a method that can return a value");
            }
            StreamListenerMethodUtils.validateStreamListenerMessageHandler(method);
            StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add(
                    streamListener.value(),
                    new StreamListenerHandlerMethodMapping(bean, method,
                            streamListener.condition(), defaultOutputChannel,
                            streamListener.copyHeaders()));
        }

StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add来创建并保存StreamListenerHandlerMethodMapping

这是使用Spring Cloud Stream的消息模型来使用RocketMQ,也可以使用SpringBoot集成的RocketMQ组件。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
39 3
|
2月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
20 1
|
4月前
|
消息中间件 运维 监控
ApsaraMQ Copilot for RocketMQ:消息数据集成链路的健康管家
阿里云消息队列 ApsaraMQ 始终围绕“高弹性低成本、更稳定更安全、智能化免运维”三大核心方向进行演进和拓展。在智能化免运维方面,通过 ApsaraMQ Copilot,为企业提供消息数据集成链路的健康管家,让消息服务走进智能化免运维的新时代。
71816 67
|
3月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
189 1
|
3月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
3月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
3月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
4月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
4月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
5月前
|
消息中间件 数据采集 Serverless
云消息队列 RocketMQ 版-消息集成-概述
消息集成是助力企业数字化转型的全栈式消息与数据集成平台,简化流程,支持云上云下、跨区域集成。它提供低代码的事件流服务,具备数据源集成、数据清洗、Serverless自定义处理等功能,支持丰富的数据源和跨端连接。然而,使用时存在如单个任务数据限制、任务名称长度等约束。消息流入(Source)负责从各种数据源获取数据,消息流出(Sink)将数据分发到目标,数据处理(Transform)允许数据转换和分析,而任务(Task)则结合这些组件执行实际的集成操作。
236 3