1、业务场景
在电商场景里面,成功付款后,会发放优惠券。
上面的场景:在电商系统中,会出现,付款成功后,准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。
但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。
对于这种场景的解决方案:引入消息中间件MQ来解耦。
但是上述这种情景,存在MQ不可用,宕机的情况。会产生付款成功,发优惠券失败的情况。
针对这种情况,需要引入分布式事务。
2、事务消息
分布式事务是一种抽象的概念。
那具体的实现呢?
是有很多种实现的。
在这里,主要介绍:RocketMQ的事务消息。
事务消息的流程图
流程步骤:
- 1、生产者发送half消息
- 2、MQ回复ACK确认消息
- 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
- 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
- 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
- 6、消费者对消息进行消费,下发优惠券
3、如何使用
上面,大概知道了事务消息的流程。
接下来,要知道如何使用。
还是以付款下发优惠券为例。
3.1 发送half消息-MQ回复ACK确认消息
@Override public void finishedOrder(String orderNo, String phoneNumber) { try { // 退房事务消息,topic:完成订单 Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8)); // 发送half消息 TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null); } catch (MQClientException e) { } }
3.2 执行本地事务:付款
@Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 修改订单的状态 orderService.payOrder(); // 成功 提交prepare消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { // 执行本地事务失败 回滚prepare消息 return LocalTransactionState.ROLLBACK_MESSAGE; } }
3.3 MQ定时器回调查询half消息状态
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { try { //查询订单状态 Integer orderStatus = orderService.getOrderStatus(); if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) { //返回commit消息 return LocalTransactionState.COMMIT_MESSAGE; } else { //返回rollback消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 查询订单状态失败 return LocalTransactionState.ROLLBACK_MESSAGE; } }
3.4 消费者进行消费,下发优惠券
@Bean(value = "orderFinishedConsumer") public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup); consumer.setNamesrvAddr(namesrvAddress); //topic:完成订单 consumer.subscribe(orderFinishedTopic, "*"); consumer.setMessageListener(orderFinishedMessageListener); consumer.start(); return consumer; }
监听器:OrderFinishedMessageListener
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //下发优惠券 couponService.distributeCoupon(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
4、知其然知其所以然
你看完上面,已经知道如何使用事务消息。
接下来,你需要了解其底层原理:看看源码(面试常问)
step1:首先看发送half消息的代码:
step2:进入代码里面:
step3:其实就是默认调用了
DefaultMQProducer#sendMessageInTransaction。
public TransactionSendResult sendMessageInTransaction(final Message msg, ...省略一堆代码 SendResult sendResult = null; // 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //发送消息--重点0 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { //消息发送成功 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); //执行本地事务,executeLocalTransaction需要子类去具体实现 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { // 最后,给broker发送提交或者回滚事务的RPC请求 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 组装结果返回 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:
- 简单的数据校验
- 给消息添加属性,表明这个事务消息
- 发送消息,且返回消息的结果--重点0
- 根据消息不同结果,进行不同的处理
- 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果--重点1
- 最后,根据本地事务的结果,给broker发送Commit或rollback的消息--重点2
上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。
接下来,我们深入了解一些细节:
我们先研究一下重点0:sendResult = this.send(msg); 我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。
step4:接着到SendMessageProcessor#sendMessage
step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage-->TransactionalMessageBridge#putHalfMessage-->TransactionalMessageBridge#parseHalfMessageInner
step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);
public interface TransactionListener { /** * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. * * @param msg Half(prepare) message * @param arg Custom business parameter * @return Transaction state */ LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg); }
你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。
比如:执行本地事务:付款
step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);
public void endTransaction( // 省略一堆代码 //事务id String transactionId = sendResult.getTransactionId(); // broker地址 final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); // 根据事务消息和本地事务的执行结果,发送不同的结果给broker switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; //发送给broker this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
到这个时候,我们已经把消息从生产者发送到了broker里面。
那接下来,我们就需要了解broker是如何处理事务消息的。
step8: 事务消息如何回查
直接看代码注解即可
TransactionalMessageCheckService#onWaitEnd
@Override protected void onWaitEnd() { //timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); //checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); //回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
step9:进入check方法:TransactionalMessageServiceImpl#check。
直接看注解即可
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { //RMQ_SYS_TRANS_HALF_TOPIC主题 String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; //获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列 Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); //数据校验 if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); //遍历队列 for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); //根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue //RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。 //RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下 MessageQueue opQueue = getOpQueue(messageQueue); //messageQueue队列的偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); //opQueue队列的偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); //如果其中一个队列的偏移量小于0,就跳过 if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } //doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口 List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread //空消息的次数 int getMessageNullCount = 1; //RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量 long newOffset = halfOffset; //RMQ_SYS_TRANS_HALF_TOPIC的偏移量 long i = halfOffset; while (true) { //限制每次最多处理的时间是60s if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } //removeMap包含当前信息,则跳过,处理下一条信息 //removeMap的信息填充是在上面的fillOpRemoveMap //fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条, //如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时 //会添加到removeMap中,表示已处理过 if (removeMap.containsKey(i)) { log.info("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { //根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); //如果消息为空 if (msgExt == null) { //则根据允许重复次数进行操作,默认重试一次 MAX_RETRY_COUNT_WHEN_HALF_NULL=1 //如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查 if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } //如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。 if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); //其他原因,则将偏移量i设置为:getResult.getPullResult().getNextBeginOffset(),重新拉取 i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } //判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过) //needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数, // 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法, // 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。 //needSkip依据:如果事务消息超过文件的过期时间, // 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } //消息的存储时间大于开始时间,中断while循环 if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } //该消息已存储的时间=系统当前时间-消息存储的时间戳 long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); //checkImmunityTime:检测事务的时间 //transactionTimeout:事务消息的超时时间 long checkImmunityTime = transactionTimeout; //用户设定的checkImmunityTimeStr String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { //checkImmunityTime=Long.valueOf(checkImmunityTimeStr) checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { //最近进度=当前消息进度+1 newOffset = i + 1; i++; continue; } } } else {//如果当前时间小于事务超时时间,则结束while循环 if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); //是否需要回查,判断依据如下: //消息已存储的时间大于事务超时时间 boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) {//11 continue; } //重点:进行事务回查(异步) listener.resolveHalfMsg(msgExt); } else { //加载已处理的消息进行筛选 pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } //保存half消息队列的回查进度 if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); //保存处理队列opQueue的处理今夕 if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }
step10:继续深入研究一下:resolveHalfMsg
public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { //针对每个待反查的half消息,进行回查本地事务结果 sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); }
step11:继续追进sendCheckMessage(msgExt)方法
/** * 发送回查消息 * @param msgExt * @throws Exception */ public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); //原主题 msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); //原队列id msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); if (channel != null) { //回调查询本地事务状态 brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }
到这里,基本上把事务消息的流程和实现细节走了一遍。
还有什么问题的话,在留言区或者私信大头菜
5、问题:分布式事务还有其他实现
上面的事务消息是分布式事务的一种实现。
事务消息被称为二段提交。
问题:分布式事务,还有哪些具体的实现方式?
欢迎留言
6、后续文章
- RocketMQ-入门(已更新)
- RocketMQ-架构和角色(已更新)
- RocketMQ-发送消息(已更新)
- RocketMQ-消费信息
- RocketMQ-集群模式和广播模式(已更新)
- RocketMQ-顺序消息(已更新)
- RocketMQ-延迟消息(已更新)
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息(已更新)
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列
...
欢迎各位入(guan)股(zhu),后续文章干货多多。
—本文转载自「大头菜技术」公众号