开发者社区> 问答> 正文

.NET SDK发送事务消息如何实现?

目前支持的域包括公网、华东1、华北2、华东2、华南1。

发送事务消息包含以下两个步骤:


  1. 发送半消息(Half Message)及执行本地事务 using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Runtime.InteropServices;
  6. using ons;
  7. namespace ons
  8. {
  9. public class MyLocalTransactionExecuter : LocalTransactionExecuter
  10. {
  11.      public MyLocalTransactionExecuter()
  12.      {
  13.      }
  14.      ~MyLocalTransactionExecuter()
  15.      {
  16.      }
  17.      public override TransactionStatus execute(Message value)
  18.      {
  19.              Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
  20.              value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
  21.              // 消息ID(有可能消息体一样,但消息ID不一样, 当前消息ID在控制台控制不可能查询)
  22.              string msgId = value.getMsgID();
  23.              // 消息体内容进行crc32, 也可以使用其它的如MD5
  24.              // 消息ID和crc32id主要是用来防止消息重复
  25.              // 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
  26.              // 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息.
  27.              TransactionStatus transactionStatus = TransactionStatus.Unknow;
  28.              try {
  29.                  boolean isCommit = 本地事务执行结果;
  30.                  if (isCommit) {
  31.                      // 本地事务成功、提交消息
  32.                      transactionStatus = TransactionStatus.CommitTransaction;
  33.                  } else {
  34.                      // 本地事务失败、回滚消息
  35.                      transactionStatus = TransactionStatus.RollbackTransaction;
  36.                  }
  37.              } catch (Exception e) {
  38.                  //exception handle
  39.              }
  40.              return transactionStatus;
  41.      }
  42. }
  43. class onscsharp
  44. {
  45.      static void Main(string[] args)
  46.      {
  47.          ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  48.          factoryInfo.setFactoryProperty(factoryInfo.ProducerId, "");//您在MQ控制台申请的Producer ID
  49.          factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "");// 您在MQ控制台申请的Topic
  50.          factoryInfo.setFactoryProperty(factoryInfo.MsgContent, "");//message body
  51.          factoryInfo.setFactoryProperty(factoryInfo.AccessKey, "");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  52.          factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  53.          //create transaction producer      
  54.          ONSFactory onsfactory = new ONSFactory();
  55.          LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
  56.          TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
  57.          //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可,启动之后可以多线程并发发送消息
  58.          pProducer.start();
  59.              Message msg = new Message(
  60.              //Message Topic
  61.              factoryInfo.getPublishTopics(),
  62.              //Message Tag
  63.              "TagA",
  64.              //Message Body
  65.              factoryInfo.getMessageContent()
  66.          );
  67.          // 设置代表消息的业务关键属性,请尽可能全局唯一。
  68.          // 以方便您在无法正常收到消息情况下,可通过MQ K控制台查询消息并补发。
  69.          // 注意:不设置也不会影响消息正常收发
  70.          msg.setKey("ORDERID_100");
  71.          //发送消息,只要不抛出异常,就代表发送成功
  72.          try
  73.          {
  74.              LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
  75.              SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
  76.          }
  77.          catch(ONSClientException e)
  78.          {
  79.              Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
  80.          }
  81.          // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题;
  82.          // shutdown之后不能重新start此producer
  83.          pProducer.shutdown();
  84.      }
  85. }
  86. }

提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
  • 执行本地事务完成后提交;
  • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

事务状态有以下三种:
  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;
  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;
  • TransactionStatus.Unknow 无法判断状态,期待MQ Broker向发送方再次询问该消息对应的本地事务的状态。
  1.     public class MyLocalTransactionChecker : LocalTransactionChecker
  2.     {
  3.         public MyLocalTransactionChecker()
  4.         {
  5.         }
  6.         ~MyLocalTransactionChecker()
  7.         {
  8.         }
  9.         public override TransactionStatus check(Message value)
  10.         {
  11.                 Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
  12.                 value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
  13.                 // 消息ID(有可能消息体一样,但消息ID不一样, 当前消息ID在控制台控制不可能查询)
  14.                 string msgId = value.getMsgID();
  15.                 // 消息体内容进行crc32, 也可以使用其它的如MD5
  16.                 // 消息ID和crc32id主要是用来防止消息重复
  17.                 // 如果业务本身是幂等的, 可以忽略, 否则需要利用msgId或crc32Id来做幂等
  18.                 // 如果要求消息绝对不重复, 推荐做法是对消息体body使用crc32或md5来防止重复消息.
  19.                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
  20.                 try {
  21.                     boolean isCommit = 本地事务执行结果;
  22.                     if (isCommit) {
  23.                         // 本地事务成功、提交消息
  24.                         transactionStatus = TransactionStatus.CommitTransaction;
  25.                     } else {
  26.                         // 本地事务失败、回滚消息
  27.                         transactionStatus = TransactionStatus.RollbackTransaction;
  28.                     }
  29.                 } catch (Exception e) {
  30.                     //exception handle
  31.                 }
  32.                 return transactionStatus;
  33.         }
  34.         }

事务回查机制说明
1、发送事务消息为什么必须要实现 check 机制?
当步骤 1 半消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 MQ Broker 的角度看,这条半状态的消息的状态是未知的,因此 MQ Broker 会定期要求发送方能 check 该半状态消息,并上报其最终状态。
2、Check 被回调时,业务逻辑都需要做些什么?
MQ 事务消息的 check 方法里面,应该写一些检查事务一致性的逻辑。MQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 MQ Broker 主动发起的本地事务状态回查请求;因此在事务消息的 check 方法中,需要完成两件事情:
(1)检查该半消息对应的本地事务的状态(commited or rollback);
(2)向 MQ Broker 提交该半消息本地事务的状态。
3、本地事务的不同状态对半消息的影响?

  • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息;

  • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费;

  • TransactionStatus.Unknow 无法判断状态,期待 MQ Broker 向发送方再次询问该消息对应的本地事务的状态。
    具体代码详见 MyLocalTransactionChecker 的实现。

展开
收起
猫饭先生 2017-10-27 10:08:13 1742 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
从 SDK 到编解码:视频直播架构解析 立即下载
跨平台的云服务SDK需要什么 立即下载
一个跨平台的云服务SDK需要什么 立即下载