大家好,我是君哥。
事务消息是分布式事务的一种解决方案,RocketMQ 有成熟的事务消息模型,今天就来聊一聊 RocketMQ 事务消息实现机制。
假如有一个电商场景,用户下单后,账户服务从用户账户上扣减金额,然后通知库存服务给用户发货,这两个服务需要在一个分布式事务内完成。
这时,账户服务作为 Producer,库存服务作为 Consumer,见下面消息流程:
- 账户服务作为 Producer 向 Broker 发送一条 half 消息;
- half 消息发送成功后,执行本地事务,执行成功则向 Broker 发送 commit 请求,否则发送 rollback 请求;
- 如果 Broker 收到的是 rollback 请求,则删除保存的 half 消息;
- 如果 Broker 收到的是 commit 请求,则保存扣减库存消息(这里的处理是把消息从 half 队列投递到真实的队列),然后删除保存的 half 消息;
- 如果 Broker 没有收到请求,则会发送请求到 Producer 查询本地事务状态,然后根据 Producer 返回的本地状态做 commit/rollback 相关处理。
1 half 消息
上面电商的案例中,RocketMQ 解决分布式事务的第一步是账户服务发送 half 消息。
首先看官网一个发送事务消息的示例:
public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }
上面的代码中 Producer 有一个 TransactionListener 属性,这个由开发者通过实现这个接口来自己定义。这个接口有两个方法:
- 提交本地事务 executeLocalTransaction
- 检查本地事务状态 checkLocalTransaction
下面代码是发送事务消息的方法:
//类 DefaultMQProducerImpl public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); //省略验证逻辑 SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; //省略发送结果处理 try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { } TransactionSendResult transactionSendResult = new TransactionSendResult(); //省略封装属性 return transactionSendResult; }
从这段代码中看到,在发送消息前,给消息封装了一个属性PROPERTY_TRANSACTION_PREPARED,通过这个属性可以找到 Broker 端的处理。
Broker 保存 half 消息时,把消息 topic 改为 RMQ_SYS_TRANS_HALF_TOPIC,然后把消息投递到 queueId 等于 0 的队列。投递成功后给 Producer 返回 PutMessageStatus.PUT_OK。代码如下:
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) { return store.asyncPutMessage(parseHalfMessageInner(messageInner)); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
2 执行本地事务
上一节讲到,Producer 发送事务消息时,会给一个 transactionListener,发送 half 消息成功后,会通过 transactionListener 的 executeLocalTransactionBranch 提交本地事务,代码如下:
//类 DefaultMQProducerImpl public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { //省略部分代码 SendResult sendResult = null; //省略部分代码 try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { //省略部分代码 if (null != localTransactionExecuter) { //这个分支已经废弃 localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { //执行本地事务 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } //省略部分代码 } catch (Throwable e) { } } break; //省略其他 case } try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } //省略部分代码 return transactionSendResult; }
从上面代码中可以看到,本地事务执行结束后,会调用一个 endTransaction 方法,这个就是向 Broker 发送 commit/rollback,也可能发送 UNKNOW,封装到 requestHeader 的 commitOrRollback 的属性中。这个请求的请求码是 END_TRANSACTION。
3 commit/rollback 处理
根据请求码 END_TRANSACTION 可以找到 Broker 端对事务消息的处理。代码如下:
//EndTransactionProcessor 类 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { //省略部分逻辑 OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { //查找出 half 消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { //检查 groupId 和消息偏移量是否合法 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); //删除PROPERTY_TRANSACTION_PREPARED属性 MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { //删除 half 消息 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; }
这段代码逻辑很清晰,首先查找出 half 消息,然后对查找出的消息进行检查(groupId 和消息偏移量是否合法),如果是 commit,则去除事务消息准备阶段属性,重新把消息投递到原始队列,然后删除 half 消息。如果是 rollback,则直接删除 half 消息。
注意:对于 UNKNOW 的类型,这里直接返回 null,上面代码没有贴出来。
4 check 事务状态
Broker 初始化的时候,会初始化一个 TransactionalMessageServiceImpl 线程,这个线程会定时检查过期的消息,通过向 Producer 发送 check 消息来获取事务状态。代码如下:
//TransactionalMessageCheckService protected void onWaitEnd() { //超时时间,默认 6s long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); //最大检查次数,默认 15 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
这里有两个参数需要注意:
- 事务消息超时时间,超时后会向 Producer 发送 check 消息检查本地事务状态,默认 6s;
- 最大检查次数,Broker 每次向 Producer 发送 check 消息后检查次数加 1,超过最大检查次数后 half 消息被丢弃,默认最大检查次数是 15;注意:这里的丢弃是把消息写入了一个新的队列,Topic 为 TRANS_CHECK_MAX_TIME_TOPIC,queueId 为 0。
- 文件保存时间,默认72 小时。
检查事务消息的流程如下:
Producer 收到 check 消息后,最终调用 TransactionListener 中定义的 checkLocalTransaction 方法,查询本地事务执行状态,然后发送给 Broker。
需要注意的是,check 消息发送给 Broker 时,会在请求 Header 中给 fromTransactionCheck 属性赋值为 true,以标记是 check 消息。
Broker 收到 check 响应消息后,处理逻辑跟第 3 节的处理逻辑一样,唯一不同的是,这里针对 check 消息和非 check 消息打印了不同的日志。
5 总结
从上面代码的分析可以看到,RocketMQ 的事务消息实现机制非常简洁。使用事务消息时自己定义 TransactionListener,实现执行本地事务 executeLocalTransaction 和检查本地事务状态 checkLocalTransaction 这两个方法,然后使用 TransactionMQProducer 进行发送。
最后,附一张 Producer 端的 UML 类图: