什么是事务消息?
事务消息可以认为是两阶段提交消息的实现,用来确保分布式系统中的最终一致性。事务消息保证执行本地事务的执行和消息发送的原子性。
使用约束
- 消息事务不支持定时和批量。
- 为了避免一个消息被多次检查,导致半数队列消息堆积,我们限制单个消息的默认检查次数为15次,但用户可以改变这个限制通过修改broker的配置文件中的 transactionCheckMax参数。如果一个消息检查次数超过transactionCheckMax,默认情况下,broker将会丢弃这个消息并同时打印错误日志。用户可以改变这种行为通过覆盖 AbstractTransactionCheckListener 类。
- 由broker的配置文件中参数 transactionTimeou t决定的特点时间段之后检查事务消息。当发送事务消息时,通过设置用户配置CHECK_IMMUNITY_TIME_IN_SECONDS,用户也可以改变这个限制。这个参数优先于 transactionMsgTimeout 参数。
- 一个事务消息可能被检查或消费多次。
- 提交过的消息重新放到用户目标主题可能会失败。目前,它依赖日志记录。通过RocketMQ自身高可用机制确保高可用。如果你想确保事务消息不丢失并且保证事务完整性,建议使用同步双写机制。
- 事务消息的生产者ID不能与其他类型消息的生产者ID共享。不像其他类型消息,事务消息允许回查。MQ server通过生产者ID查询客户端。
应用
事务状态
三种事务消息状态:
- TransactionStatus.CommitTransaction:提交事务,允许消费者消费这个消息。
- TransactionStatus.RollbackTransaction:回滚事务,消息将会被删除或不再允许消费。
- TransactionStatus.Unknown:中间状态,MQ需要重新检查来确定状态。
发送事务消息
创建事务生产者
使用TransactionMQProducer类创建producer客户端,指定唯一producerGroup,你可以设置一个自定义线程池来处理检查请求。执行本地事务后,你需要根据执行结果恢复MQ,并回复上面描述的状态。
实现事务监听器接口
当发送半消息成功时,使用 executeLocalTransaction 方法执行本地事务。它返回三种事务状态的一种。
使用 checkLocalTransaction 方法检查本地事务状态和响应MQ检查请求。它同样返回三种事务状态的一种。
代码
@RequestMapping(value = "/sendTransaction")
public String sendTransaction(HttpServletRequest request)throws Exception{
TransactionMQProducer producer=new TransactionMQProducer("sendTransaction_producer_group");
try{
String mesage=request.getParameter("message");
producer.setNamesrvAddr(NAMESERVER_ADDR);
//设置线程池
producer.setExecutorService(new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue(2000),new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread=new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}));
//事务监听器
producer.setTransactionListener(new TransactionListener() {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value=transactionIndex.getAndIncrement();
Integer status=value%3;
localTrans.put(msg.getTransactionId(),status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status =localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
//case 0:
//return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for(int i=0;i<10;i++){
Message msg=new Message("Test_filter",tags[i % tags.length],"KEY" + i,(mesage+i).getBytes());
msg.putUserProperty("a",String.valueOf(i));
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
logger.error("返回结果:"+sendResult);
Thread.sleep(10);
}
return "发送成功";
}catch (Exception e){
logger.error(e.getMessage());
return "发送失败";
}finally {
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}