艾伟:Azure Services Bus(服务总线)中的工作流(workflow)

简介: 在Azure Services Platform上对于工作流服务的支持,一直是我很感兴趣的内容。当然也是疑问比较多的领域。鉴于这方面的资料太少,所以今天就从AzureServicesKit中的一个DEMO出发,来大概了解一下这方面相关内容。

在Azure Services Platform上对于工作流服务的支持,一直是我很感兴趣的内容。当然也是疑问比较多的领域。鉴于这方面的资料太少,所以今天就从AzureServicesKit中的一个DEMO出发,来大概了解一下这方面相关内容。

注:今天的示例位于AzureServicesKit安装目录\Labs\Ex02-RoutingWithXPath\end文件夹。

(编辑注:是AzureServicesKit\Labs\IntroWorkflowService\Ex02-RoutingWithXPath\end文件夹)

该示例场景展示的是一个定单(order)流程,如下图:

注:图中的两个服务可能布置在1台或N台机器上。

在上图中,我们看出在当前场景中存在两个服务,即:

BillService(即定单生成)。 ShipOrderService(定单处理:包括处理定单相关信息等)

其中的BillService的代码如下:

[ServiceContract(Name = "Billing",
    Namespace = "http://Microsoft.ServicePlatformLabs")]
public class BillingService
{
    [OperationContract(Name = "Invoice", IsOneWay = true)]
    public void Invoice(string orderId, string total)
    {
        Console.WriteLine("Invoice for Order {0} ({1}) generated",
            orderId, total);
    }
}

注:上面的ServiceContract属性Name="Billing"和OperationContract属性Name = "Invoice"会以链接方式绑定到工作流CloudServiceBusSend活动(activity)的Action属性上,即:

http://Microsoft.ServicePlatformLabs/Billing/Invoice

ShippingService的代码如下:

[ServiceContract(Name = "Shipping",
    Namespace = "http://Microsoft.ServicePlatformLabs")]
public class ShippingService
{
    [OperationContract(Name = "ProcessOrder", IsOneWay = true)]
    public void ProcessOrder(string orderId)
    {
        Console.WriteLine("Processing Shipping information for Order {0}",
            orderId);
    }
}

同理,ShippingService会以链接方式绑定到工作流CloudServiceBusSend活动(activity)的Action属性上,即:

http://Microsoft.ServicePlatformLabs/Shipping/ProcessOrder

而这两个服务都会被暴露到ServiceBus中以便让用户进行访问操作,从而完成一个客户下订单的完整流程(CreateOrder)

而如何对这两个服务进行安排组装,就是通过WorkFlow的进行的。如果有开发过工作流经验的开发者应该会很容易理解这个概念。不过这里还是要解释一下,就是在云中运行的工作流与我们平时所了解的工作流(主要是在Activities方面)还是有些差异的,当然这并不意味着云中的工作流要更难于理解,恰恰相反,就目前而言,还是很容易的,下面是其在VS中的设计器中的截图:

有关这几个新添加的工作流activity,可能就要几个篇幅来介绍和说明,因为今天的目的不在于此,所以就先略过这块内容了。

另外就是目前在云中只支持CloudSequentialWorkFlow,如下图:

目前还不支持状态机工作流,但并不确保将来就不会出现。当然将来会不会出现workflow4中的flowchart型工作流就更不好说了。

假设我们最终要实现的工作流程如下:

1.接受客户端post过来的请求,并获取其中指定的节点信息,本DEMO中节点路径为:

<root>
  <order>
    <total>节点值</total>
  </order>
</root>

2.通过对该节点值进行比较判断,当值大于1000时则将工作流程转入一个CloudDelay活动中,以进行延时操作(设置CloudDelay活动的TimeOut属性)。当值小于或等于1000时则顺序执行上面所介绍的两个服务(BillingService,ShippingService)

将上面的流程中工作流进行表示(创建)的结果如下图:

好了,现在我们有了工作流和服务,接下来就是通过编码将服务运行起来以及将工作流发布到Azure上了。下面是服务的创建运行代码:

internal static void Main()
{
    var billingServiceHost = new ServiceHost(typeof(BillingService));
    Console.WriteLine("BillingService hosted at:");
    Console.WriteLine("\t" + billingServiceHost.Description.Endpoints[0].Address.Uri);
    billingServiceHost.Open();
    var shippingServiceHost = new ServiceHost(typeof(ShippingService));
    Console.WriteLine("ShippingService hosted at:");
    Console.WriteLine("\t" + shippingServiceHost.Description.Endpoints[0].Address.Uri);
    shippingServiceHost.Open();
    Console.WriteLine();
    Console.WriteLine("Press [Enter] to exit");
    Console.ReadLine();
    billingServiceHost.Close();
    shippingServiceHost.Close();
}

上面代码中的Address.Uri属性是在app.config中进行配置的:

代码在这里展开
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <bindings>
      <netEventRelayBinding>
        <binding name="default" />
      </netEventRelayBinding>
    </bindings>
    <services>
      <service name="Services.ShippingService">
        <host>
          <baseAddresses>
            <add baseAddress="sb://servicebus.windows.net/services/[ENTER YOUR SERVICEBUS USERNAME HERE]"/>
          </baseAddresses>
        </host>
        <endpoint address="Shipping"
                  contract="Services.ShippingService"
                  binding="netEventRelayBinding"
                  behaviorConfiguration="transportClientBehavior0"
                  bindingConfiguration="default" />
      </service>
      <service name="Services.BillingService">
        <host>
          <baseAddresses>
            <add baseAddress="sb://servicebus.windows.net/services/[ENTER YOUR SERVICEBUS USERNAME HERE]"/>
          </baseAddresses>
        </host>
        <endpoint address="Billing"
                  contract="Services.BillingService"
                  binding="netEventRelayBinding"
                  behaviorConfiguration="transportClientBehavior0"
                  bindingConfiguration="default" />
      </service>
    </services>
    <behaviors>
      <endpointBehaviors>
        <behavior name="transportClientBehavior0">
          <!--
            Specify
              - CardSpace (default)
              - UserNamePassword
              - X509Certificate
              - AutomaticRenewal (tgt)
              - FederationViaCardSpace
            for credentialType.
          -->
          <transportClientEndpointBehavior credentialType="UserNamePassword">
            <clientCredentials>
              <userNamePassword userName="[ENTER YOUR SERVICEBUS USERNAME HERE]"
                                password="[ENTER YOUR SERVICEBUS PASSWORD HERE]" />
            </clientCredentials>
          </transportClientEndpointBehavior>
        </behavior>
      </endpointBehaviors>
    </behaviors>
  </system.serviceModel>
</configuration>

注:config文件中的[ENTER YOUR SERVICEBUS USERNAME HERE]内容就是我们在AzureService平台上创建的solution名称(这部分内容参见这篇文章),这里我们继续使用上一篇文章中创建的那个项目名称MSF_DataSyncExample,而PASSWORD就是我们在创建MSF_DataSyncExample之后所设置的口令。

上面仅是完成了服务的本地化配置。而要把工作流(文件内容)发布到AzureService上我们还需要在工作流文件的可视模式下的空白区域单击鼠标右键,从弹出菜单中选择“Deploy WordFlow”,这里系统会弹出一个窗口,如下:

我们只需要将刚才在config文件中输入的solution名称和口令在这里敲进去并单击Deploy按钮即可。

这样系统就会在AzureService上为我们创建这样一个发布了的工作流(注:您也可以通过下面的地址http://workflow.ex.azure.microsoft.com/login.aspx?name=[solutionname]来访问AzureService并在系统向导的提示下完成工作流的发布)。

工作流发布之后,我们还需要为当前的工作流创建一个实例,以便于让客户端访问该工作流服务时持有它,进而完成工作流程。而创建工作流实例的工作是在Azure平台上完成的,请在IE地址栏中敲入地址:https://workflow.ex.azure.microsoft.com/WorkflowManagement.aspx,如下图

我们在上图中选择了刚才发布(Deploy)的工作流,然后点击“Manage Instances”按钮后,系统显示如下:

我们点击:“CreateInstance”按钮后,如下图:

我们看到,系统以 “name_date/time stamp”方式为我们命名了一个“实例名称”,这主要是为了避免出现同名的情况,因为实例在系统中必须唯一。

接着我们点击"next"按钮,系统接下来会提示我们进行身份验证,如下图:

这样系统就会创建该实例并返回到当前工作流的实例管理列表:

当我们把工作流实例创建完成后,就把将当前工作流运行(通过选中相应的实例并单击页面下方的“Start”按钮,另外可以点击Details按钮来获得实例ID,下面会介绍)起来。

这样在Azure平台上,我们的工作流就算是配置运行起来了,那么接下来,我们来看看到如何让客户端访问发布的工作流的。

首先是客户端的配置文件

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="UserName" value="[ENTER YOUR WORKFLOW USERNAME HERE]"/>
    <add key="Password" value="[ENTER YOUR WORKFLOW PASSWORD HERE]"/>
    <add key="InstanceId" value="[ENTER THE WORKFLOW INSTANCE ID]"/>
  </appSettings>
</configuration>

从上面节点信息可以看出,我们要将配置在service端的相应节点信息(包括solution名称和口令)配置到当前config节点上。当然这里还多了一个节点就是“InstanceId”,该实例ID就是我们刚才在Azure平台上创建的工作流实例的ID,我们可以通过单击相应的实例来获得该内容,如下:

这样就完成了在client端的config配置,下面就是客户端访问的工作流的代码了。

代码在这里展开
internal class Program
{
    private const string workflowTypeName = "CloudOrderWorkflow2";
    private const string activityName = "ReceiveOrder";
    internal static void Main()
    {
        Console.Title = "Client";
        var userName = ConfigurationManager.AppSettings["UserName"].ToString();
        var password = ConfigurationManager.AppSettings["Password"].ToString();
        var instanceId = ConfigurationManager.AppSettings["InstanceId"].ToString();
        Console.WriteLine("Press [enter] to send the order to the cloud workflow");
        Console.ReadLine();
        var request = CreateMessage();
        try
        {
            CallReceiveActivity(userName, password, workflowTypeName, instanceId, activityName, request);
        }
        catch (WebException ex)
        {
            Console.WriteLine("Error handled:");
            using (var sr = new StreamReader(ex.Response.GetResponseStream()))
            {
                Console.WriteLine(sr.ReadToEnd());
            }
        }
        Console.WriteLine("Press [enter] to exit");
        Console.ReadLine();
    }
    private static string CreateMessage()
    {
        return "<root><order><total>1400</total></order></root>";
    }
    internal static void CallReceiveActivity
        (string userName, string password, string workflowName,
        string instanceId, string activityName, string message)
    {
        var url = String.Format(CultureInfo.InvariantCulture,
                   "http://{0}/workflowsHttp/{1}/workflows/{2}/instances/{3}/{4}",
                   WorkflowClientConfig.WorkflowHostName, userName, workflowName, instanceId, activityName);
        var token = GetWorkflowGrantingToken(userName, password);
        using (var httpRequest = new WebClient())
        {
            httpRequest.Headers.Add(HttpRequestHeader.ContentType, "text/xml");
            httpRequest.Headers.Add("X-MS-Identity-Token", token);
            var response = httpRequest.UploadString(url, message);
            Console.WriteLine("Response by the server: {0}", response);
        }
    }
    internal static string GetWorkflowGrantingToken(string userName, string password)
    {
        string tokenUrl = string.Format(CultureInfo.InvariantCulture,
                                        "https://{0}/issuetoken.aspx?u={1}&p={2}",
                                        WorkflowClientConfig.StsHostName,
                                        userName,
                                        password);
        Console.WriteLine("Requesting authentication token...");
        HttpWebRequest tokenRequest = (HttpWebRequest)WebRequest.Create(tokenUrl);
        HttpWebResponse tokenResponse = (HttpWebResponse)tokenRequest.GetResponse();
        byte[] tokenBody = new byte[500];
        int tokenBodyLength = tokenResponse.GetResponseStream().Read(tokenBody, 0, 500);
        var authenticationToken = Encoding.UTF8.GetString(tokenBody, 0, tokenBodyLength);
        Console.WriteLine("Authentication Token: {0}", authenticationToken);
        return authenticationToken;
    }
}

现在我们在本地分别打开两个VS2008,一个将service项目做为启动项目中,一个以Client作为启动项目,分别在相应的项目上击鼠标右键,在弹出菜单上选“调试”--》“启动新实例”。先运行service端如下:

然后是按上面所说方式启动Client端:

这里,我们在切换回服务端运行界面,如下

到这里,我们只是测试了工作流中的ELSE分支(代码中Total为800),还未测试当Total>1000的分支。下面是修改Total值之后的测试流程:

1.首先重复上面在AZURE平台上创建工作流实例的步骤,以便生成新的实例ID供客户端config文件更新节点配置。

2.将client的运行代码从800改为1400,然后顺序启动上面的service,client应用,这时我们还是会看到之前运行的service端和client端的运行界面,只是这次服务端在客户端提提交请求之后,并未马上返回打印结果,而是运行了cloudDelay活动,这时如果我们访问azure平台上的相关工作流页面时,会看到下面的结果 azure_run_flow.gif 即当前工作流实例正在运行,我们需要等待1分钟之后,再刷新该页面时,会看到当前实例已运行完成: azure_workflow_complate.gif

好了,到现在为止,整个开发和运行测试流程就介绍的差不多了。

因为手头的资料不多,而我还有的一些疑问还是没有最终得到解答。下面我将它们罗列出来,如果大家感兴趣或有这方面的经验,不妨与我联系或在回复中进行讨论。

1.例子中的SERVICE端如果能够被布署在多台服务器上,当client端post请求时,当前会通过哪台机器上的服务来处理?我猜测的一种可能应该是service bus会通过一些网络路由算法(如最短路径算法)来分配相应的服务机来处理相应请求。但如果是这样,是否还应该包括负载均衡方面的考虑呢? 必定每台机器的处理能力有限,任务多了就要排队,如果一味还是”最短路径“的话,会让service bus中的某一个服务结点不堪负。

2.目前的测试可以让azure平台上的某一工作流在某一条件下被触发执行(上传中的total<=1000).但工作流的持久化就在azure上吗?这样的话,如果azure平台出现问题,正在运行的实例可能会将执行在一半的工作流“回滚”,以免出现数据被脏读的情况吗?另外如果企业想将被执行失败的工作流按“自己的方式”进行处理又应该如何去做呢?

3.在biztalk中其扮演的角色是ESB(企业服务总线),而Azure Service Bus目前而言应该是一个ISB(Internet 服务总线)。如果与ISG是ESB的一种实现方式的话,那在ESB中的消息类型与ISB中的消息类型又是否为一脉相承呢,在SDK中我看到了这样一段代码(出现在了AzureServicesKit\Labs\IntroServiceBus\Ex04-RESTSample中):

Message response = StreamMessageHelper.CreateMessage(OperationContext.Current.IncomingMessageVersion, "GETRESPONSE", this.WriteImage);

而该Message类型为abstract类型(位置System.ServiceModel.Channels名空间),这个类型又与在BIZTALK Server上配置于数据库中的message表中的消息数据是怎样一个关系呢。以前曾在网上看到有篇文章说azure会将shartpoint逐步在云中加以实现,如果是这样的话那biztalk是否将来会与AZURE平台产生某种更深层次的关联吗?

疑问之后还是疑问,看来在研究Azure平台的过程中还有很长的路要走,走一步看一步吧!!!

目录
相关文章
|
3月前
【Azure 服务总线】Azure门户获取ARM模板,修改Service Bus的TLS版本
【Azure 服务总线】Azure门户获取ARM模板,修改Service Bus的TLS版本
|
3月前
|
XML 数据格式 Windows
【Azure 云服务】Azure Cloud Service (Extended Support) 云服务开启诊断日志插件 WAD Extension (Windows Azure Diagnostic) 无法正常工作的原因
【Azure 云服务】Azure Cloud Service (Extended Support) 云服务开启诊断日志插件 WAD Extension (Windows Azure Diagnostic) 无法正常工作的原因
|
3月前
【Azure Standard Logic App】Workflow积压非常严重的情况下, 执行实例居然不能自动缩放的原因?
【Azure Standard Logic App】Workflow积压非常严重的情况下, 执行实例居然不能自动缩放的原因?
EMQ
|
SQL 存储 数据可视化
EMQX Enterprise 5.2 发布:Flow 设计器,Amazon Kinesis,Azure Event Hubs
EMQX Enterprise 5.2.0 增加了可拖拽的可视化 Flow 设计器,可以快速部署数据集成。同时,新版本新增了对 Amazon Kinesis 和 Azure Event Hubs 的支持。
EMQ
766 5
EMQX Enterprise 5.2 发布:Flow 设计器,Amazon Kinesis,Azure Event Hubs
|
SQL 开发工具 开发者
艾伟:[漫步云端,Azure Services Platform]第一回:认识Azure Services Platform
1 引言 今天是TechEd 2008北京站的第一天,作为技术盛会自然少不了很多抢眼的新技术面向广大的技术开发者。虚拟化、SQL Server、NUI、云计算,还有很多很多,每个人都充满了惊叹和期待,只有在那一刻,我们才感觉到技术带给世界的诸多魅力,也同时深味作为技术开发者的自豪。
1015 0