1需求
RocketMQ一个优点是有事务特性,可以保证事务的最终一致性。举一个简单的例子,以电商为例,创建订单ID9527被创建后要保存到数据库,通过该订单通过MQ投递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致性,要怎么做?
第一种情况,先保存数据库,在发送MQ
public void register(UserInfo userInfo){ //保存用户信息 userDao.insert(userInfo); //发送MQ消息 SendResult res = mqProvider.sendMessage(userInfo.getId); if(res.getCode().equals("SEND_OK")){ connection.commit(); }else{ connection.rollback(); } }
如果生产者发送消息时,因为网络原因导致10秒消息才返回SendResult结果,这就意味着10秒内数据库事务无法提交,高并发情况下,数据库连接池资源会被打满,后果是比较严重的。
第二种情况,先发送MQ,在保存数据库
public void register(UserInfo userInfo){ userDao.insert(userInfo); //发送MQ消息 SendResult res = mqProvider.sendMessage(userInfo.getId); if(res.getCode().equals("SEND_OK")){ //保存用户信息 userDao.insert(userInfo); //此时系统宕机 connection.commit(); }else{ connection.rollback(); } }
如果消息发送成功,但是执行保存的时候数据库出现宕机,则MQ中却有了消息,但是数据库却没有该条订单信息,则MQ的下游消费方需要从业务逻辑上做额外处理。
问题很清晰了,我就是想要数据库保存和MQ发送同时成功或者同时失败,保证事务的最终一致性。下面分析RocketMQ是怎么实现的
2代码
首先我会发送消息内容为9527到broker,此时消息的状态是半消息状态(properties中一个标志位,即对消费不可见),并且存储到Commitlog中,把偏移量返回给producer,producer收到存储消息成功后,会调用TransactionExecuterimpl的executeLocalTransactionBranch方法,在该方法中执行保存数据库的操作,如果保存数据库成功,返回一个LocalTransactionState.COMMIT_MESSAGE,反之返回LocalTransactionState.ROLLBACK_MESSAGE,并且把返回值发送给broker,broker根据LocalTransactionState的状态确定是消息是否标记丢弃和标记成功,从而实现对用户是否可见。
如果在TransactionExecuterimpl的executeLocalTransactionBranch中数据库执行commit后系统宕机,此时数据库是有数据而borker的消息为半消息状态,这种情况下数据还是不一致,那么这时就用到事务回查器,broker会通过线程池扫描半消息的任务队列去发送请求给broker到TransactionCheckListenerImpl方法,比如查看数据库是否存在返回LocalTransactionState状态。
2.1生产者
package transaction; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * @author chaird * @create 2021-10-25 11:53 */ public class SyncProducer { public static void main(String[] args) throws Exception { // 事务回查器 TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl(); // 事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); // NameServer地址 producer.setNamesrvAddr("localhost:9876"); producer.setSendMsgTimeout(999999999); // 注册事务回查监听 producer.setTransactionCheckListener(checkListener); // 本地事务执行器 TransactionExecuterimpl executerimpl = new TransactionExecuterimpl(); producer.start(); // Create a message instance, specifying topic, tag and message body. Message msg = new Message( "TopicTest" /* Topic */, "TagA" /* Tag */, ("" + 9527).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); // Call send message to deliver message to one of brokers. SendResult sendResult = producer.sendMessageInTransaction(msg, executerimpl, null); System.out.printf("%s%n", sendResult); // Shut down once the producer instance is not longer in use. producer.shutdown(); } }
2.2事务回查器
broker会查看服务器中某个半消息主题中的消息,通过定时任务去看这个消息是是否发送成功,后面会介绍。
package transaction; /** * @author chaird * @create 2022-01-08 20:24 */ import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("服务器端回查事务消息: "+messageExt.toString()); //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常? //可以查询数据库看这条数据是否已经处理 return LocalTransactionState.COMMIT_MESSAGE; } }
2.3注册事务回查监听器
package transaction; /** * @author chaird * @create 2022-01-08 20:24 */ import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.message.Message; import java.util.Date; public class TransactionExecuterimpl implements LocalTransactionExecuter { @Override public LocalTransactionState executeLocalTransactionBranch( final Message message, final Object o) { try { String msg = new String(message.getBody(), "UTF-8"); System.out.println("TransactionExecuterimpl-------" + msg); // db.save() // db.save() // db.save() // connect.commit() System.out.println(new Date() + "===> 本地事务执行成功,发送确认消息"); } catch (Exception e) { System.out.println(new Date() + "===> 本地事务执行失败,回滚!!!"); // connect.rollback() return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } }
3部分源码分析
3.1客户端发送消息源码分析
客户端的生产者类型为TransactionMQProducer类型,设置了一个执行器和回查器,如下面代码所示
// 事务回查器 TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl(); // 事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); // 注册事务回查监听 producer.setTransactionCheckListener(checkListener); // 本地事务执行器 TransactionExecuterimpl executerimpl = new TransactionExecuterimpl(); //创建消息 Message msg = new Message( "TopicTest" /* Topic */, "TagA" /* Tag */, ("" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); // 发送消息. SendResult sendResult = producer.sendMessageInTransaction(msg, executerimpl, null);
调用producer的sendMessageInTransaction方法,在该方法里会给消息添加一个如下代码的属性,标记该消息为事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
此时消息会发送到broker。
总结:消息在producer会被拦截并且添加属性
MessageConst.PROPERTY_TRANSACTION_PREPARED标记消息为事务消息
3.2broker收到消息源码分析
直接从SendMessageProcessor的asyncSendMessage方法看吧,该方法里会根据MessageConst.PROPERTY_TRANSACTION_PREPARED确定走哪个分支,如下图的transFlag(此时为“true”)属性
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { //省略 } putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }
然后就跳转到TransactionalMessageBridge的asyncPutHalfMessage方法,如下面代码所示,
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) { return store.asyncPutMessage(parseHalfMessageInner(messageInner)); }
很明显会parseHalfMessageInner方法对messageInner进行修改配置,这里重点是把主题改为一个默认的半消息主题
//public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
构建完毕后调用DefaultMessageStore的asyncPutMessage方法把数据方法CommitLog里,此时把偏移量返回给producer
总结:消息在broker会被换主题,发到一个特定的主题里,并且把消息在CommitLog的offset发送给proudcer
3.3客户端发送消息后收到broker消息处理本地事务并且反馈结果给broker
DefaultMQProducerImpl调用sendMessageInTransaction后发送消息并且拿到响应结果,如图是放入CommitLog成功,则会执行LocalTransactionExecuter(看上面的代码,逻辑为运行MySQL,并且跟进成功与否返回一个状态localTransactionState),然后执行DefaultMQProducerImpl的endTransaction方法,把localTransactionState状态发送给broker
//执行自定义的LocalTransactionExecuter,即上面的TransactionExecuterimpl localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); this.endTransaction(msg, sendResult, localTransactionState, localException);
总结:客户端确定消息(半消息状态)已经被放在CommitLog中,执行自定义的LocalTransactionExecuter(逻辑为根据数据库是否成功提交返回localTransactionState状态,即确定消息是否为丢弃或者修改为提交状态)
3.4broker收到producer的本地事务状态修改消息的状态
直接从EndTransactionProcessor的processRequest方法开始说吧
如果producer中返回的MessageSysFlag.TRANSACTION_COMMIT_TYPE,即本地事务执行成功。
首先会根据客户端发送的偏移量去CommitLog中取到消息,然后创建一个新的消息对象放入到CommitLog中
//EndTransactionProcessor###processRequest if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { //根据prdducer中传过来的偏移量获取消息内容 //根据prdducer中传过来的偏移量获取消息内容 //根据prdducer中传过来的偏移量获取消息内容 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { //把取出来的消息内容重新copy一个新对象 //把取出来的消息内容重新copy一个新对象 //把取出来的消息内容重新copy一个新对象 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 把新的数据在重新放在CommitLog中,生成ConsumeQueue // 把新的数据在重新放在CommitLog中,生成ConsumeQueue // 把新的数据在重新放在CommitLog中,生成ConsumeQueue RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; }
总结:把原来的消息(半消息主题)重新复制一份(更正为正确的主题)插入到CommitLog,构建CounsumeQueue。