事务消息
1. 概念
分布式事务:一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败,分布式事务于普通事务一样,就是为了保证操作结果的一致性
事务消息:RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态:Producer 回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查:重新查询本地事务的执行状态
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。
XA模式中有三个重要组件:TC、TM、RM。
TC(Transaction Coordinator):事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。RocketMQ中Broker充当着TC
TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。RocketMQ中事务消息的Producer充当着TM
RM(Resource Manager):资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RocketMQ中事务消息的Producer及Broker均是RM
XA模式是一个典型的2PC,其执行原理如下:
TM向TC发起指令,开启一个全局事务。
根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
各个RM在接收到指令后会在进行本地事务预执行。
RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应,则向TC发送Global Commit指令。 只要有结果是失败响应,则向TC发送Global Rollback指令。
TC在接收到指令后再次向RM发送确认指令。
注意:
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的 情况)
2. 生产者业务接口
public interface TransactionMessageService {
/**
* 发送事务消息
* @param id
* @param message
*/
void sendTransactionMessage(String id, String message);
}
3. 生产者业务接口实现类
@Service
public class TransactionMessageServiceImpl implements TransactionMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger logger = LoggerFactory.getLogger(TransactionMessageServiceImpl.class);
@Override
public void sendTransactionMessage(String id, String message) {
Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-message-topic:transaction-tags", strMessage, id);
if (result.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送事务消息成功!消息ID为:{}", result.getMsgId());
}
}
}
4. 生产事务监听器类
@RocketMQTransactionListener
public class TransactionListener implements RocketMQLocalTransactionListener {
private static final Logger logger = LoggerFactory.getLogger(TransactionListener.class);
@Autowired
private SysUserInfoService userInfoService;
@Autowired
private SysLogInfoService logInfoService;
/**
* 执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
logger.info("事务消息key为:{}", rocketmqKeys);
String payload = new String((byte[]) msg.getPayload());
logger.info("事务消息为:{}", payload);
SysUserInfo userInfo = JSONObject.parseObject(payload, SysUserInfo.class);
userInfoService.saveUserInfoAndLogInfo(userInfo, rocketmqKeys);
} catch (Exception e) {
logger.error("发送事务消息异常!异常信息为:{}", e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 校验本地事务
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String rocketmqKeys = msg.getHeaders().get("rocketmq_KEYS").toString();
logger.info("事务消息key为:{}", rocketmqKeys);
SysLogInfo logInfo = logInfoService.selectLogInfoByKey(rocketmqKeys);
logger.info("查询日志信息为:{}", JSON.toJSONString(logInfo));
if (null == logInfo) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
5. 消费者类
@Component
@RocketMQMessageListener(topic = "transaction-message-topic", consumerGroup = "transaction-consumer-group")
public class TransactionMessageListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(TransactionMessageListener.class);
@Override
public void onMessage(String message) {
logger.info("接收到事务消息:{}", message);
}
}
6. 测试
@Test
void transactionMessage() {
String uuid = UUID.randomUUID().toString().replace("-", "");
SysUserInfo userInfo = new SysUserInfo();
userInfo.setUserId(1001L);
userInfo.setAddr("重庆渝北");
userInfo.setAge(18);
userInfo.setUserName("xiaofeng");
userInfo.setPhone("13509877890");
userInfo.setSex(1);
userInfo.setStatus(1);
transactionMessageService.sendTransactionMessage(uuid, JSON.toJSONString(userInfo));
}