开发者社区> 问答> 正文

Java SDK如何进行事务消息收发


TCP 接入点域名,请 前往查看。
发送事务消息包含以下两个步骤:


  1. 发送半消息及执行本地事务package com.alibaba.webx.TryHsf.app1;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  4. import com.aliyun.openservices.ons.api.SendResult;
  5. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
  6. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
  7. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  8. import java.util.Properties;
  9. import java.util.concurrent.TimeUnit;
  10. public class TransactionProducerClient {
  11. private final static Logger log = ClientLogger.getLog(); // 用户需要设置自己的log, 记录日志便于排查问题
  12. public static void main(String[] args) throws InterruptedException {
  13.      final BusinessService businessService = new BusinessService(); // 本地业务Service
  14.      Properties properties = new Properties();
  15.      // 您在控制台创建的Producer ID。注意:事务消息的Producer ID不能与其他类型消息的Producer ID共用
  16.      properties.put(PropertyKeyConst.ProducerId, "");
  17.      // 阿里云身份验证,在阿里云服务器管理控制台创建
  18.      properties.put(PropertyKeyConst.AccessKey, "");
  19.      // 阿里云身份验证,在阿里云服务器管理控制台创建
  20.      properties.put(PropertyKeyConst.SecretKey, "");
  21.      // 设置 TCP 接入域名(此处以公共云生产环境为例)
  22.      properties.put(PropertyKeyConst.ONSAddr,
  23.        "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  24.      TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
  25.              new LocalTransactionCheckerImpl());
  26.      producer.start();
  27.      Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
  28.      // 输入您在控制台创建的Topic
  29.      SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
  30.          @Override
  31.          public TransactionStatus execute(Message msg, Object arg) {
  32.              // 消息ID(有可能消息体一样,但消息ID不一样, 当前消息ID在控制台无法查询)
  33.              String msgId = msg.getMsgID();
  34.              // 消息体内容进行crc32, 也可以使用其它的如MD5
  35.              long crc32Id = HashUtil.crc32Code(msg.getBody());
  36.              // 消息ID和crc32id主要是用来防止消息重复
  37.              // 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
  38.              // 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息
  39.              Object businessServiceArgs = new Object();
  40.              TransactionStatus transactionStatus = TransactionStatus.Unknow;
  41.              try {
  42.                  boolean isCommit =
  43.                          businessService.execbusinessService(businessServiceArgs);
  44.                  if (isCommit) {
  45.                      // 本地事务成功、提交消息
  46.                      transactionStatus = TransactionStatus.CommitTransaction;
  47.                  } else {
  48.                      // 本地事务失败、回滚消息
  49.                      transactionStatus = TransactionStatus.RollbackTransaction;
  50.                  }
  51.              } catch (Exception e) {
  52.                  log.error("Message Id:{}", msgId, e);
  53.              }
  54.              System.out.println(msg.getMsgID());
  55.              log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  56.              return transactionStatus;
  57.          }
  58.      }, null);
  59.      // demo example 防止进程退出(实际使用不需要这样)
  60.      TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
  61. }
  62. }

提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
  • 执行本地事务完成后提交
  • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态

事务状态有以下三种:
  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
  1. public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
  2.     private final static Logger log = ClientLogger.getLog();
  3.     final  BusinessService businessService = new BusinessService();
  4.     @Override
  5.     public TransactionStatus check(Message msg) {
  6.         //消息ID(有可能消息体一样,但消息ID不一样, 当前消息属于Half 消息,所以消息ID在控制台无法查询)
  7.         String msgId = msg.getMsgID();
  8.         //消息体内容进行crc32, 也可以使用其它的方法如MD5
  9.         long crc32Id = HashUtil.crc32Code(msg.getBody());
  10.         //消息ID、消息本 crc32Id主要是用来防止消息重复
  11.         //如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
  12.         //如果要求消息绝对不重复, 推荐做法是对消息体使用crc32或md5来防止重复消息.
  13.         //业务自己的参数对象, 这里只是一个示例, 实际需要用户根据情况来处理
  14.         Object businessServiceArgs = new Object();
  15.         TransactionStatus transactionStatus = TransactionStatus.Unknow;
  16.         try {
  17.             boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
  18.             if (isCommit) {
  19.                 //本地事务已成功、提交消息
  20.                 transactionStatus = TransactionStatus.CommitTransaction;
  21.             } else {
  22.                 //本地事务已失败、回滚消息
  23.                 transactionStatus = TransactionStatus.RollbackTransaction;
  24.             }
  25.         } catch (Exception e) {
  26.             log.error("Message Id:{}", msgId, e);
  27.         }
  28.         log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  29.         return transactionStatus;
  30.     }
  31. }

工具类
  1. import java.util.zip.CRC32;
  2. public class HashUtil {
  3.     public static long crc32Code(byte[] bytes) {
  4.         CRC32 crc32 = new CRC32();
  5.         crc32.update(bytes);
  6.         return crc32.getValue();
  7.     }
  8. }

事务回查机制说明

  1. 发送事务消息为什么必须要实现回查 Check 机制?
    当步骤(1)中 Half 消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 MQ Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  2. Check 被回调时,业务逻辑都需要做些什么?
    MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:
    (1) 检查该 Half 消息对应的本地事务的状态(commited or rollback);
    (2) 向 MQ Broker 提交该 Half 消息本地事务的状态。

订阅消息代码示例
事务消息订阅与普通消息订阅一致,可前往 订阅消息查看。

展开
收起
猫饭先生 2017-10-26 14:08:56 2095 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
JAVA开发手册1.5.0 立即下载