Spring Cloud Stream 消息发送

简介: Spring Cloud Stream 消息发送

Spring Cloud Stream 消息发送

image-20211008132856943

业务发送消息

source.output().send(message);来发送消息

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT = "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}
@FunctionalInterface
public interface MessageChannel {

   long INDEFINITE_TIMEOUT = -1;


   default boolean send(Message<?> message) {
      return send(message, INDEFINITE_TIMEOUT);
   }

  
   boolean send(Message<?> message, long timeout);

}

AbstractMessageChannel是消息通道的基本实现,提供发送消息和接收消息的公共方法。

AbstractSubscribableChannel类的doSend()方法

消息发送到AbstractSubscribableChannel类的doSend()方法如下:

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
        implements SubscribableChannel, SubscribableChannelManagement {

    
    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        try {
            return getRequiredDispatcher().dispatch(message);
        }
        catch (MessageDispatchingException e) {
            String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
            throw new MessageDeliveryException(message, description, e);
        }
    }

    private MessageDispatcher getRequiredDispatcher() {
        MessageDispatcher dispatcher = getDispatcher();
        Assert.state(dispatcher != null, "'dispatcher' must not be null");
        return dispatcher;
    }

    protected abstract MessageDispatcher getDispatcher();

}

调用getDispatcher方法从DirectChannel中得到消息分发类MessageDispatcher的实现类UnicastingDispatcher,调用dispatch方法把消息分发给各个MessageHandler

UnicastingDispatcher的doDispatch()方法

UnicastingDispatcher的doDispatch方法:

private boolean doDispatch(Message<?> message) {
   if (tryOptimizedDispatch(message)) {
      return true;
   }
   boolean success = false;
   Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
   if (!handlerIterator.hasNext()) {
      throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
   }
   List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
   while (!success && handlerIterator.hasNext()) {
      MessageHandler handler = handlerIterator.next();
      try {
         handler.handleMessage(message);
         success = true; // we have a winner.
      }
      catch (Exception e) {
         RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
               () -> "Dispatcher failed to deliver Message", e);
         exceptions.add(runtimeException);
         this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
      }
   }
   return success;
}

遍历所有的MessageHandler,调用handleMessage()处理消息,那么MessageHandler是从哪来的呢?

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()方法添加到handlers列表。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring容器加载所有Bean并完成初始化之后完成。

RocketMQMessageChannelBinder集成消息发送

AbstractMessageChannelBinder类提供创建MessageHandler规范,createProducerMessageHandler()方法在初始化Binder的时候会加载。

RocketMQMessageChannelBinder继承AbstractMessageChannelBinder,完成RocketMQMessageHandler的创建和初始化,RocketMQMessageHandler的消息处理器MessageHandler的具体实现,RocketMQMessageHandler在RocketMQBinder中的作用就是转化消息格式并发送消息。

RocketMQMessageChannelBinder的createProducerMessageHandler方法:

这个方法就是创建MessageHandler的

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
      ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
      MessageChannel channel, MessageChannel errorChannel) throws Exception {
   if (producerProperties.getExtension().getEnabled()) {

      // if producerGroup is empty, using destination
      String extendedProducerGroup = producerProperties.getExtension().getGroup();
      String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
            ? destination.getName()
            : extendedProducerGroup;

      RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
            .mergeProperties(rocketBinderConfigurationProperties,
                  rocketMQProperties);

      RocketMQTemplate rocketMQTemplate;
      if (producerProperties.getExtension().getTransactional()) {
         Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
               .getBeansOfType(RocketMQTemplate.class);
         if (rocketMQTemplates.size() == 0) {
            throw new IllegalStateException(
                  "there is no RocketMQTemplate in Spring BeanFactory");
         }
         else if (rocketMQTemplates.size() > 1) {
            throw new IllegalStateException(
                  "there is more than 1 RocketMQTemplates in Spring BeanFactory");
         }
         rocketMQTemplate = rocketMQTemplates.values().iterator().next();
      }
      else {
         rocketMQTemplate = new RocketMQTemplate();
         rocketMQTemplate.setObjectMapper(this.getApplicationContext()
               .getBeansOfType(ObjectMapper.class).values().iterator().next());
          //初始化DefaultMQProducer
         DefaultMQProducer producer;
         String ak = mergedProperties.getAccessKey();
         String sk = mergedProperties.getSecretKey();
         if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            RPCHook rpcHook = new AclClientRPCHook(
                  new SessionCredentials(ak, sk));
            producer = new DefaultMQProducer(producerGroup, rpcHook,
                  mergedProperties.isEnableMsgTrace(),
                  mergedProperties.getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
            producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
                  destination.getName() + "|" + UtilAll.getPid()));
         }
         else {
            producer = new DefaultMQProducer(producerGroup);
            producer.setVipChannelEnabled(
                  producerProperties.getExtension().getVipChannelEnabled());
         }
         producer.setNamesrvAddr(mergedProperties.getNameServer());
         producer.setSendMsgTimeout(
               producerProperties.getExtension().getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(
               producerProperties.getExtension().getRetryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(producerProperties
               .getExtension().getRetryTimesWhenSendAsyncFailed());
         producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
               .getCompressMessageBodyThreshold());
         producer.setRetryAnotherBrokerWhenNotStoreOK(
               producerProperties.getExtension().isRetryNextServer());
         producer.setMaxMessageSize(
               producerProperties.getExtension().getMaxMessageSize());
         rocketMQTemplate.setProducer(producer);
         if (producerProperties.isPartitioned()) {
            rocketMQTemplate
                  .setMessageQueueSelector(new PartitionMessageQueueSelector());
         }
      }

      RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
            rocketMQTemplate, destination.getName(), producerGroup,
            producerProperties.getExtension().getTransactional(),
            instrumentationManager, producerProperties,
            ((AbstractMessageChannel) channel).getChannelInterceptors().stream()
                  .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
                  .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
                  .findFirst().orElse(null));
      messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
      messageHandler.setSync(producerProperties.getExtension().getSync());
      messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
      if (errorChannel != null) {
         messageHandler.setSendFailureChannel(errorChannel);
      }
      return messageHandler;
   }
   else {
      throw new RuntimeException("Binding for channel " + destination.getName()
            + " has been disabled, message can't be delivered");
   }
}

RocketMQMessageHandler中持有RocketMQTemplate对象,RocketMQTemplate是对RocketMQ客户端API的封装

DefaultMQProducer由RocketMQ客户端提供的API,发送消息到RocketMQ消息服务器都是由它来完成。

RocketMQMessageHandler是消息发送的处理逻辑,解析Message对象头中的参数,调用RocketMQTemplate中不同的发送消息接口。

RocketMQMessageHandler的handleMessageInternal()方法

RocketMQMessageHandler用来处理消息

RocketMQMessageHandler的handleMessageInternal方法:

protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
      throws Exception {
   try {
      // issue 737 fix
      Map<String, String> jsonHeaders = headerMapper
            .fromHeaders(message.getHeaders());
      message = org.springframework.messaging.support.MessageBuilder
            .fromMessage(message).copyHeaders(jsonHeaders).build();

      final StringBuilder topicWithTags = new StringBuilder(destination);
      String tags = Optional
            .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
            .toString();
      if (!StringUtils.isEmpty(tags)) {
         topicWithTags.append(":").append(tags);
      }

      SendResult sendRes = null;
       //发送事务消息
      if (transactional) {
         sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
               topicWithTags.toString(), message, message.getHeaders()
                     .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
         log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
      }
      else {
          //设置定时消息参数
         int delayLevel = 0;
         try {
            Object delayLevelObj = message.getHeaders()
                  .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
            if (delayLevelObj instanceof Number) {
               delayLevel = ((Number) delayLevelObj).intValue();
            }
            else if (delayLevelObj instanceof String) {
               delayLevel = Integer.parseInt((String) delayLevelObj);
            }
         }
         catch (Exception e) {
            // ignore
         }
         boolean needSelectQueue = message.getHeaders()
               .containsKey(BinderHeaders.PARTITION_HEADER);
          //同步发送
         if (sync) {
             //顺序消息
            if (needSelectQueue) {
               sendRes = rocketMQTemplate.syncSendOrderly(
                     topicWithTags.toString(), message, "",
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
             //普通消息
            else {
               sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
                     message,
                     rocketMQTemplate.getProducer().getSendMsgTimeout(),
                     delayLevel);
            }
            log.debug("sync send to topic " + topicWithTags + " " + sendRes);
         }
          //异步消息
         else {
            Message<?> finalMessage = message;
            SendCallback sendCallback = new SendCallback() {
               @Override
               public void onSuccess(SendResult sendResult) {
                  log.debug("async send to topic " + topicWithTags + " "
                        + sendResult);
               }

               @Override
               public void onException(Throwable e) {
                  log.error("RocketMQ Message hasn't been sent. Caused by "
                        + e.getMessage());
                  if (getSendFailureChannel() != null) {
                     getSendFailureChannel().send(
                           RocketMQMessageHandler.this.errorMessageStrategy
                                 .buildErrorMessage(new MessagingException(
                                       finalMessage, e), null));
                  }
               }
            };
            if (needSelectQueue) {
               rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
                     message, "", sendCallback,
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
            else {
               rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
                     sendCallback);
            }
         }
      }
      if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
         if (getSendFailureChannel() != null) {
            this.getSendFailureChannel().send(message);
         }
         else {
            throw new MessagingException(message,
                  new MQClientException("message hasn't been sent", null));
         }
      }
   }
   catch (Exception e) {
      log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
      if (getSendFailureChannel() != null) {
         getSendFailureChannel().send(this.errorMessageStrategy
               .buildErrorMessage(new MessagingException(message, e), null));
      }
      else {
         throw new MessagingException(message, e);
      }
   }

}

代码有点长,但整体还是很好理解的

  1. 获取消息的目的地,也就是代码中的tags变量
  2. 判断是否为事务消息,如果是的话就发送事务消息
  3. 如果不是事务消息,先设置定时消息的参数,判断是否为同步同步消息,如果是的话再判断是顺序消息还是普通消息,顺序消息,同样异步消息还是分为异步顺序消息和异步的普通消息
  4. 根据发送结果,如果发送消息失败的话就把消息发送到失败队列中。

发送普通消息、事务消息、定时消息还是顺序消息,由Message对象的消息头Header中的属性决定,在业务代码创建Message对象时设置。

总结

这篇文章我们讲了Spring Cloud Stream 消息发送的基本流程,先是业务发送消息,经过AbstractSubscribableChannel类的doSend()方法,方法中调用UnicastingDispatcher的doDispatch()方法进行分发遍历所有的MessageHandler进行处理消息,RocketMQMessageHandler是其中之一,它根据消息头的header信息判断是什么类型的消息,然后发送对应的消息,发送失败的消息进行失败的队列中。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
监控 负载均衡 Java
深入理解Spring Cloud中的服务网关
深入理解Spring Cloud中的服务网关
|
4月前
|
Java 开发工具 git
实现基于Spring Cloud的配置中心
实现基于Spring Cloud的配置中心
|
4月前
|
设计模式 监控 Java
解析Spring Cloud中的断路器模式原理
解析Spring Cloud中的断路器模式原理
|
4月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14942 29
|
4月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
498 15
|
3月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
4月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
111 3
|
4月前
|
消息中间件 Java 开发者
Spring Cloud微服务框架:构建高可用、分布式系统的现代架构
Spring Cloud是一个开源的微服务框架,旨在帮助开发者快速构建在分布式系统环境中运行的服务。它提供了一系列工具,用于在分布式系统中配置、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态等领域的支持。
183 5
|
4月前
|
Java API 开发工具
Spring Boot与Spring Cloud Config的集成
Spring Boot与Spring Cloud Config的集成