Spring Cloud Stream 消息发送

简介: Spring Cloud Stream 消息发送

Spring Cloud Stream 消息发送

image-20211008132856943

业务发送消息

source.output().send(message);来发送消息

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT = "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}
@FunctionalInterface
public interface MessageChannel {

   long INDEFINITE_TIMEOUT = -1;


   default boolean send(Message<?> message) {
      return send(message, INDEFINITE_TIMEOUT);
   }

  
   boolean send(Message<?> message, long timeout);

}

AbstractMessageChannel是消息通道的基本实现,提供发送消息和接收消息的公共方法。

AbstractSubscribableChannel类的doSend()方法

消息发送到AbstractSubscribableChannel类的doSend()方法如下:

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
        implements SubscribableChannel, SubscribableChannelManagement {

    
    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        try {
            return getRequiredDispatcher().dispatch(message);
        }
        catch (MessageDispatchingException e) {
            String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
            throw new MessageDeliveryException(message, description, e);
        }
    }

    private MessageDispatcher getRequiredDispatcher() {
        MessageDispatcher dispatcher = getDispatcher();
        Assert.state(dispatcher != null, "'dispatcher' must not be null");
        return dispatcher;
    }

    protected abstract MessageDispatcher getDispatcher();

}

调用getDispatcher方法从DirectChannel中得到消息分发类MessageDispatcher的实现类UnicastingDispatcher,调用dispatch方法把消息分发给各个MessageHandler

UnicastingDispatcher的doDispatch()方法

UnicastingDispatcher的doDispatch方法:

private boolean doDispatch(Message<?> message) {
   if (tryOptimizedDispatch(message)) {
      return true;
   }
   boolean success = false;
   Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
   if (!handlerIterator.hasNext()) {
      throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
   }
   List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
   while (!success && handlerIterator.hasNext()) {
      MessageHandler handler = handlerIterator.next();
      try {
         handler.handleMessage(message);
         success = true; // we have a winner.
      }
      catch (Exception e) {
         RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
               () -> "Dispatcher failed to deliver Message", e);
         exceptions.add(runtimeException);
         this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
      }
   }
   return success;
}

遍历所有的MessageHandler,调用handleMessage()处理消息,那么MessageHandler是从哪来的呢?

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()方法添加到handlers列表。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring容器加载所有Bean并完成初始化之后完成。

RocketMQMessageChannelBinder集成消息发送

AbstractMessageChannelBinder类提供创建MessageHandler规范,createProducerMessageHandler()方法在初始化Binder的时候会加载。

RocketMQMessageChannelBinder继承AbstractMessageChannelBinder,完成RocketMQMessageHandler的创建和初始化,RocketMQMessageHandler的消息处理器MessageHandler的具体实现,RocketMQMessageHandler在RocketMQBinder中的作用就是转化消息格式并发送消息。

RocketMQMessageChannelBinder的createProducerMessageHandler方法:

这个方法就是创建MessageHandler的

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
      ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
      MessageChannel channel, MessageChannel errorChannel) throws Exception {
   if (producerProperties.getExtension().getEnabled()) {

      // if producerGroup is empty, using destination
      String extendedProducerGroup = producerProperties.getExtension().getGroup();
      String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
            ? destination.getName()
            : extendedProducerGroup;

      RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
            .mergeProperties(rocketBinderConfigurationProperties,
                  rocketMQProperties);

      RocketMQTemplate rocketMQTemplate;
      if (producerProperties.getExtension().getTransactional()) {
         Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
               .getBeansOfType(RocketMQTemplate.class);
         if (rocketMQTemplates.size() == 0) {
            throw new IllegalStateException(
                  "there is no RocketMQTemplate in Spring BeanFactory");
         }
         else if (rocketMQTemplates.size() > 1) {
            throw new IllegalStateException(
                  "there is more than 1 RocketMQTemplates in Spring BeanFactory");
         }
         rocketMQTemplate = rocketMQTemplates.values().iterator().next();
      }
      else {
         rocketMQTemplate = new RocketMQTemplate();
         rocketMQTemplate.setObjectMapper(this.getApplicationContext()
               .getBeansOfType(ObjectMapper.class).values().iterator().next());
          //初始化DefaultMQProducer
         DefaultMQProducer producer;
         String ak = mergedProperties.getAccessKey();
         String sk = mergedProperties.getSecretKey();
         if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            RPCHook rpcHook = new AclClientRPCHook(
                  new SessionCredentials(ak, sk));
            producer = new DefaultMQProducer(producerGroup, rpcHook,
                  mergedProperties.isEnableMsgTrace(),
                  mergedProperties.getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
            producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
                  destination.getName() + "|" + UtilAll.getPid()));
         }
         else {
            producer = new DefaultMQProducer(producerGroup);
            producer.setVipChannelEnabled(
                  producerProperties.getExtension().getVipChannelEnabled());
         }
         producer.setNamesrvAddr(mergedProperties.getNameServer());
         producer.setSendMsgTimeout(
               producerProperties.getExtension().getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(
               producerProperties.getExtension().getRetryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(producerProperties
               .getExtension().getRetryTimesWhenSendAsyncFailed());
         producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
               .getCompressMessageBodyThreshold());
         producer.setRetryAnotherBrokerWhenNotStoreOK(
               producerProperties.getExtension().isRetryNextServer());
         producer.setMaxMessageSize(
               producerProperties.getExtension().getMaxMessageSize());
         rocketMQTemplate.setProducer(producer);
         if (producerProperties.isPartitioned()) {
            rocketMQTemplate
                  .setMessageQueueSelector(new PartitionMessageQueueSelector());
         }
      }

      RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
            rocketMQTemplate, destination.getName(), producerGroup,
            producerProperties.getExtension().getTransactional(),
            instrumentationManager, producerProperties,
            ((AbstractMessageChannel) channel).getChannelInterceptors().stream()
                  .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
                  .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
                  .findFirst().orElse(null));
      messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
      messageHandler.setSync(producerProperties.getExtension().getSync());
      messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
      if (errorChannel != null) {
         messageHandler.setSendFailureChannel(errorChannel);
      }
      return messageHandler;
   }
   else {
      throw new RuntimeException("Binding for channel " + destination.getName()
            + " has been disabled, message can't be delivered");
   }
}

RocketMQMessageHandler中持有RocketMQTemplate对象,RocketMQTemplate是对RocketMQ客户端API的封装

DefaultMQProducer由RocketMQ客户端提供的API,发送消息到RocketMQ消息服务器都是由它来完成。

RocketMQMessageHandler是消息发送的处理逻辑,解析Message对象头中的参数,调用RocketMQTemplate中不同的发送消息接口。

RocketMQMessageHandler的handleMessageInternal()方法

RocketMQMessageHandler用来处理消息

RocketMQMessageHandler的handleMessageInternal方法:

protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
      throws Exception {
   try {
      // issue 737 fix
      Map<String, String> jsonHeaders = headerMapper
            .fromHeaders(message.getHeaders());
      message = org.springframework.messaging.support.MessageBuilder
            .fromMessage(message).copyHeaders(jsonHeaders).build();

      final StringBuilder topicWithTags = new StringBuilder(destination);
      String tags = Optional
            .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
            .toString();
      if (!StringUtils.isEmpty(tags)) {
         topicWithTags.append(":").append(tags);
      }

      SendResult sendRes = null;
       //发送事务消息
      if (transactional) {
         sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
               topicWithTags.toString(), message, message.getHeaders()
                     .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
         log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
      }
      else {
          //设置定时消息参数
         int delayLevel = 0;
         try {
            Object delayLevelObj = message.getHeaders()
                  .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
            if (delayLevelObj instanceof Number) {
               delayLevel = ((Number) delayLevelObj).intValue();
            }
            else if (delayLevelObj instanceof String) {
               delayLevel = Integer.parseInt((String) delayLevelObj);
            }
         }
         catch (Exception e) {
            // ignore
         }
         boolean needSelectQueue = message.getHeaders()
               .containsKey(BinderHeaders.PARTITION_HEADER);
          //同步发送
         if (sync) {
             //顺序消息
            if (needSelectQueue) {
               sendRes = rocketMQTemplate.syncSendOrderly(
                     topicWithTags.toString(), message, "",
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
             //普通消息
            else {
               sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
                     message,
                     rocketMQTemplate.getProducer().getSendMsgTimeout(),
                     delayLevel);
            }
            log.debug("sync send to topic " + topicWithTags + " " + sendRes);
         }
          //异步消息
         else {
            Message<?> finalMessage = message;
            SendCallback sendCallback = new SendCallback() {
               @Override
               public void onSuccess(SendResult sendResult) {
                  log.debug("async send to topic " + topicWithTags + " "
                        + sendResult);
               }

               @Override
               public void onException(Throwable e) {
                  log.error("RocketMQ Message hasn't been sent. Caused by "
                        + e.getMessage());
                  if (getSendFailureChannel() != null) {
                     getSendFailureChannel().send(
                           RocketMQMessageHandler.this.errorMessageStrategy
                                 .buildErrorMessage(new MessagingException(
                                       finalMessage, e), null));
                  }
               }
            };
            if (needSelectQueue) {
               rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
                     message, "", sendCallback,
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
            else {
               rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
                     sendCallback);
            }
         }
      }
      if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
         if (getSendFailureChannel() != null) {
            this.getSendFailureChannel().send(message);
         }
         else {
            throw new MessagingException(message,
                  new MQClientException("message hasn't been sent", null));
         }
      }
   }
   catch (Exception e) {
      log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
      if (getSendFailureChannel() != null) {
         getSendFailureChannel().send(this.errorMessageStrategy
               .buildErrorMessage(new MessagingException(message, e), null));
      }
      else {
         throw new MessagingException(message, e);
      }
   }

}

代码有点长,但整体还是很好理解的

  1. 获取消息的目的地,也就是代码中的tags变量
  2. 判断是否为事务消息,如果是的话就发送事务消息
  3. 如果不是事务消息,先设置定时消息的参数,判断是否为同步同步消息,如果是的话再判断是顺序消息还是普通消息,顺序消息,同样异步消息还是分为异步顺序消息和异步的普通消息
  4. 根据发送结果,如果发送消息失败的话就把消息发送到失败队列中。

发送普通消息、事务消息、定时消息还是顺序消息,由Message对象的消息头Header中的属性决定,在业务代码创建Message对象时设置。

总结

这篇文章我们讲了Spring Cloud Stream 消息发送的基本流程,先是业务发送消息,经过AbstractSubscribableChannel类的doSend()方法,方法中调用UnicastingDispatcher的doDispatch()方法进行分发遍历所有的MessageHandler进行处理消息,RocketMQMessageHandler是其中之一,它根据消息头的header信息判断是什么类型的消息,然后发送对应的消息,发送失败的消息进行失败的队列中。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
3天前
|
负载均衡 监控 Java
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
|
3天前
|
负载均衡 Java 应用服务中间件
Spring Cloud 负载平衡的意义什么?
负载平衡是指将网络流量在多个服务器之间分布,以达到提高系统性能、增强可靠性和提供更好用户体验的目的。在负载平衡的架构中,多个服务器被组织成一个集群,共同处理用户的请求。
27 4
|
5天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
18 0
|
5天前
|
消息中间件 负载均衡 Java
【Spring Cloud 初探幽】
【Spring Cloud 初探幽】
16 1
|
5天前
|
安全 Java Docker
|
5天前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
38 6
|
5天前
|
JSON Java Apache
Spring Cloud Feign 使用Apache的HTTP Client替换Feign原生httpclient
Spring Cloud Feign 使用Apache的HTTP Client替换Feign原生httpclient
|
5天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
5天前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
30 1

热门文章

最新文章