Spring Cloud Stream 消息发送
业务发送消息
source.output().send(message);来发送消息
public interface Source {
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
* @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();
}
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
AbstractMessageChannel是消息通道的基本实现,提供发送消息和接收消息的公共方法。
AbstractSubscribableChannel类的doSend()方法
消息发送到AbstractSubscribableChannel类的doSend()方法如下:
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
implements SubscribableChannel, SubscribableChannelManagement {
@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
return getRequiredDispatcher().dispatch(message);
}
catch (MessageDispatchingException e) {
String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
throw new MessageDeliveryException(message, description, e);
}
}
private MessageDispatcher getRequiredDispatcher() {
MessageDispatcher dispatcher = getDispatcher();
Assert.state(dispatcher != null, "'dispatcher' must not be null");
return dispatcher;
}
protected abstract MessageDispatcher getDispatcher();
}
调用getDispatcher方法从DirectChannel中得到消息分发类MessageDispatcher的实现类UnicastingDispatcher,调用dispatch方法把消息分发给各个MessageHandler
UnicastingDispatcher的doDispatch()方法
UnicastingDispatcher的doDispatch方法:
private boolean doDispatch(Message<?> message) {
if (tryOptimizedDispatch(message)) {
return true;
}
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
while (!success && handlerIterator.hasNext()) {
MessageHandler handler = handlerIterator.next();
try {
handler.handleMessage(message);
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
}
}
return success;
}
遍历所有的MessageHandler,调用handleMessage()处理消息,那么MessageHandler是从哪来的呢?
AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()方法添加到handlers列表。
AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring容器加载所有Bean并完成初始化之后完成。
RocketMQMessageChannelBinder集成消息发送
AbstractMessageChannelBinder类提供创建MessageHandler规范,createProducerMessageHandler()方法在初始化Binder的时候会加载。
RocketMQMessageChannelBinder继承AbstractMessageChannelBinder,完成RocketMQMessageHandler的创建和初始化,RocketMQMessageHandler的消息处理器MessageHandler的具体实现,RocketMQMessageHandler在RocketMQBinder中的作用就是转化消息格式并发送消息。
RocketMQMessageChannelBinder的createProducerMessageHandler方法:
这个方法就是创建MessageHandler的
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
// if producerGroup is empty, using destination
String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName()
: extendedProducerGroup;
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
.mergeProperties(rocketBinderConfigurationProperties,
rocketMQProperties);
RocketMQTemplate rocketMQTemplate;
if (producerProperties.getExtension().getTransactional()) {
Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
.getBeansOfType(RocketMQTemplate.class);
if (rocketMQTemplates.size() == 0) {
throw new IllegalStateException(
"there is no RocketMQTemplate in Spring BeanFactory");
}
else if (rocketMQTemplates.size() > 1) {
throw new IllegalStateException(
"there is more than 1 RocketMQTemplates in Spring BeanFactory");
}
rocketMQTemplate = rocketMQTemplates.values().iterator().next();
}
else {
rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setObjectMapper(this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next());
//初始化DefaultMQProducer
DefaultMQProducer producer;
String ak = mergedProperties.getAccessKey();
String sk = mergedProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ak, sk));
producer = new DefaultMQProducer(producerGroup, rpcHook,
mergedProperties.isEnableMsgTrace(),
mergedProperties.getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
destination.getName() + "|" + UtilAll.getPid()));
}
else {
producer = new DefaultMQProducer(producerGroup);
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
}
producer.setNamesrvAddr(mergedProperties.getNameServer());
producer.setSendMsgTimeout(
producerProperties.getExtension().getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getExtension().getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerProperties
.getExtension().getRetryTimesWhenSendAsyncFailed());
producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(
producerProperties.getExtension().isRetryNextServer());
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
if (producerProperties.isPartitioned()) {
rocketMQTemplate
.setMessageQueueSelector(new PartitionMessageQueueSelector());
}
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getTransactional(),
instrumentationManager, producerProperties,
((AbstractMessageChannel) channel).getChannelInterceptors().stream()
.filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
.map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
.findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel);
}
return messageHandler;
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
+ " has been disabled, message can't be delivered");
}
}
RocketMQMessageHandler中持有RocketMQTemplate对象,RocketMQTemplate是对RocketMQ客户端API的封装
DefaultMQProducer由RocketMQ客户端提供的API,发送消息到RocketMQ消息服务器都是由它来完成。
RocketMQMessageHandler是消息发送的处理逻辑,解析Message对象头中的参数,调用RocketMQTemplate中不同的发送消息接口。
RocketMQMessageHandler的handleMessageInternal()方法
RocketMQMessageHandler用来处理消息
RocketMQMessageHandler的handleMessageInternal方法:
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception {
try {
// issue 737 fix
Map<String, String> jsonHeaders = headerMapper
.fromHeaders(message.getHeaders());
message = org.springframework.messaging.support.MessageBuilder
.fromMessage(message).copyHeaders(jsonHeaders).build();
final StringBuilder topicWithTags = new StringBuilder(destination);
String tags = Optional
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
.toString();
if (!StringUtils.isEmpty(tags)) {
topicWithTags.append(":").append(tags);
}
SendResult sendRes = null;
//发送事务消息
if (transactional) {
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
topicWithTags.toString(), message, message.getHeaders()
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
}
else {
//设置定时消息参数
int delayLevel = 0;
try {
Object delayLevelObj = message.getHeaders()
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
if (delayLevelObj instanceof Number) {
delayLevel = ((Number) delayLevelObj).intValue();
}
else if (delayLevelObj instanceof String) {
delayLevel = Integer.parseInt((String) delayLevelObj);
}
}
catch (Exception e) {
// ignore
}
boolean needSelectQueue = message.getHeaders()
.containsKey(BinderHeaders.PARTITION_HEADER);
//同步发送
if (sync) {
//顺序消息
if (needSelectQueue) {
sendRes = rocketMQTemplate.syncSendOrderly(
topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
//普通消息
else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
}
//异步消息
else {
Message<?> finalMessage = message;
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
@Override
public void onException(Throwable e) {
log.error("RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(new MessagingException(
finalMessage, e), null));
}
}
};
if (needSelectQueue) {
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
sendCallback);
}
}
}
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message);
}
else {
throw new MessagingException(message,
new MQClientException("message hasn't been sent", null));
}
}
}
catch (Exception e) {
log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy
.buildErrorMessage(new MessagingException(message, e), null));
}
else {
throw new MessagingException(message, e);
}
}
}
代码有点长,但整体还是很好理解的
- 获取消息的目的地,也就是代码中的tags变量
- 判断是否为事务消息,如果是的话就发送事务消息
- 如果不是事务消息,先设置定时消息的参数,判断是否为同步同步消息,如果是的话再判断是顺序消息还是普通消息,顺序消息,同样异步消息还是分为异步顺序消息和异步的普通消息
- 根据发送结果,如果发送消息失败的话就把消息发送到失败队列中。
发送普通消息、事务消息、定时消息还是顺序消息,由Message对象的消息头Header中的属性决定,在业务代码创建Message对象时设置。
总结
这篇文章我们讲了Spring Cloud Stream 消息发送的基本流程,先是业务发送消息,经过AbstractSubscribableChannel类的doSend()方法,方法中调用UnicastingDispatcher的doDispatch()方法进行分发遍历所有的MessageHandler进行处理消息,RocketMQMessageHandler是其中之一,它根据消息头的header信息判断是什么类型的消息,然后发送对应的消息,发送失败的消息进行失败的队列中。