开发者社区> 问答> 正文

发送事务消息


目前支持的域包括公网、华东1、华北2、华东2、华南1。
MQ 事务消息交互流程如下。

发送事务消息包含两个步骤:
1.发送半消息(Half Message)及执行本地事务

  1. #include "ONSFactory.h"
  2. #include "ONSClientException.h"
  3. using namespace ons;
  4.     class MyLocalTransactionExecuter : LocalTransactionExecuter
  5.     {
  6.         MyLocalTransactionExecuter()
  7.         {
  8.         }
  9.         ~MyLocalTransactionExecuter()
  10.         {
  11.         }
  12.         virtual TransactionStatus execute(Message &value)
  13.         {
  14.                 // 消息ID(有可能消息体一样,但消息id不一样, 当前消息ID在console控制不可能查询)
  15.                 string msgId = value.getMsgID();
  16.                 // 消息体内容进行crc32, 也可以使用其它的如MD5
  17.                 // 消息ID和crc32id主要是用来防止消息重复
  18.                 // 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
  19.                 // 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息.
  20.                 TransactionStatus transactionStatus = Unknow;
  21.                 try {
  22.                     boolean isCommit = 本地事务执行结果;
  23.                     if (isCommit) {
  24.                         // 本地事务成功、提交消息
  25.                         transactionStatus = CommitTransaction;
  26.                     } else {
  27.                         // 本地事务失败、回滚消息
  28.                         transactionStatus = RollbackTransaction;
  29.                     }
  30.                 } catch (...) {
  31.                     //exception handle
  32.                 }
  33.                 return transactionStatus;
  34.         }
  35.     }
  36.     int main(int argc, char* argv[])
  37.     {
  38.         ONSFactoryProperty factoryInfo;
  39.         factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//您在控制台创建的Producer ID
  40.         factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//输入您在控制台创建的Topic
  41.         factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//msg content
  42.         factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//阿里云身份验证,在阿里云服务器管理控制台创建
  43.         factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//阿里云身份验证,在阿里云服务器管理控制台创建
  44.         //创建producer,MQ不负责pChecker的释放,需要业务方自行释放资源
  45.         MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
  46.         g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
  47.         //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可;
  48.         pProducer->start();
  49.         Message msg(
  50.             //Message Topic
  51.             factoryInfo.getPublishTopics(),
  52.             //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤      
  53.             "TagA",
  54.             //Message Body,不能为空,MQ不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
  55.             factoryInfo.getMessageContent()
  56.         );
  57.         // 设置代表消息的业务关键属性,请尽可能全局唯一。
  58.         // 以方便您在无法正常收到消息情况下,可通过MQ Console查询消息并补发。
  59.         // 注意:不设置也不会影响消息正常收发
  60.         msg.setKey("ORDERID_100");
  61.         //发送消息,只要不抛出异常,就代表发送成功    
  62.         try
  63.         {
  64.             //MQ不负责pExecuter的释放,需要业务方自行释放资源
  65.             MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
  66.             SendResultONS sendResult = pProducer->send(msg,pExecuter);
  67.         }
  68.         catch(ONSClientException & e)
  69.         {
  70.             //自定义处理exception的细节
  71.         }
  72.         // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题
  73.         pProducer->shutdown();
  74.         return 0;
  75.     }

2.提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
  • 执行本地事务完成后提交;
  • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

事务状态有以下三种:
  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;
  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;

  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
    1. class MyLocalTransactionChecker : LocalTransactionChecker
    2. {
    3.      MyLocalTransactionChecker()
    4.      {
    5.      }
    6.      ~MyLocalTransactionChecker()
    7.      {
    8.      }
    9.      virtual TransactionStatus check(Message &value)
    10.      {
    11.          // 消息ID(有可能消息体一样,但消息id不一样, 当前消息ID在console控制不可能查询)
    12.          string msgId = value.getMsgID();
    13.          // 消息体内容进行crc32, 也可以使用其它的如MD5
    14.          // 消息ID和crc32id主要是用来防止消息重复
    15.          // 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
    16.          // 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息.
    17.          TransactionStatus transactionStatus = Unknow;
    18.          try {
    19.              boolean isCommit = 本地事务执行结果;
    20.              if (isCommit) {
    21.                  // 本地事务成功、提交消息
    22.                  transactionStatus = CommitTransaction;
    23.              } else {
    24.                  // 本地事务失败、回滚消息
    25.                  transactionStatus = RollbackTransaction;
    26.              }
    27.          } catch(...) {
    28.              //exception error
    29.          }
    30.          return transactionStatus;
    31.      }
    32. }

事务回查机制说明
1、发送事务消息为什么必须要实现 check 机制?
当步骤1发送半消息完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条半状态的消息的状态是未知的,因此 MQ Broker 会定期要求发送方能 check 该半状态消息,并上报其最终状态。
2、Check 被回调时,业务逻辑都需要做些什么?
MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker主动发起的本地事务状态回查请求;因此在事务消息的 check 方法中,需要完成两件事情:
(1) 检查该半消息对应的本地事务的状态(commited or rollback);
(2) 向 MQ Broker 提交该半消息本地事务的状态。
3、本地事务的不同状态对Half消息的影响?

  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。

  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。

  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
    具体代码详见 MyLocalTransactionChecker 的实现。

展开
收起
猫饭先生 2017-10-26 14:29:59 1832 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载