WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)

简介:

对于一般的多线程操作,比如异步地进行基于文件系统的IO操作;异步地调用Web Service;或者是异步地进行数据库访问等等,是和具体的线程无关的。也就是说,对于这些操作,任意创建一个新的线程来执行都是等效的。但是有些情况下,有些操作却只能在固定的线程下执行。比如,在GUI应用下,对控件的访问就需要在创建该控件的线程下执行;或者我们在某个固定的线程中通过TLS(Thread Local Storage)设置了一些Context信息,供具体的操作使用,我们把操作和某个固定的线程的依赖称为线程关联性(Thread Affinity)。在这种情况下,我们的异步操作就需要被Marshal到固定的线程执行。在WCF并发或者Callback的情况下也具有这样的基于线程关联性的问题。

一、从基于Windows Application客户端的WCF回调失败谈起

在"我的WCF之旅"系列文章中,有一篇(WinForm Application中调用Duplex Service出现TimeoutException的原因和解决方案)专门介绍在一个Windows Application客户端应用, 通过WCF 的Duplex通信方式进行回调失败的文章.我们今天以此作为出发点介绍WCF在Thread Affinity下的表现和解决方案.

我们来创建一个WCF的应用来模拟该场景: 客户端是一个基于Windows Form应用, 完成一个计算器的功能, 用户输入操作数,点击"计算"按钮, 后台通过调用WCF service, 并传递一个用于显示计算结果的Callback对象; service进行相应的计算得到最后的运算结果,调用该Callback对象将运算结果显示到客户端界面.这是我们的WCF四层结构:

image

1、Contract:ICalculate & ICalculateCallback

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

这是Service Contract,下面是Callback Contract,用于显示运算结果:

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:    public interface ICalculateCallback
   4:     {
   5:         [OperationContract]
   6:         void DisplayResult(double result);
   7:     }
   8: } 

2、Service:CalculateService

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }         
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: }


由于需要进行callback, 我们把ConcurrencyMode 设为Reentrant。当得到运算的结果后,通过OperationContext.Current.GetCallbackChannel得到callback对象,并调用之。还有一点需要提的是,该service是通过一个Windows Form application进行host的。并且有一个ListBox列出所有service执行的结果,就像这样:

image

3、Hosting

Hosting的代码写在Form的Load事件中:

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:     CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 

我们注意到了CalculateService使用到的用于显示所有预算结果的ListBox就是在这了通过static property传递的。

这么配置文件

   1: <configuration>
   2:     <system.serviceModel>
   3:         <services>
   4:             <service name="Artech.ThreadAffinity.Services.CalculateService">
   5:                 <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />
   6:                 <host>
   7:                     <baseAddresses>
   8:                         <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />
   9:                     </baseAddresses>
  10:                 </host>
  11:             </service>
  12:         </services>
  13:     </system.serviceModel>
  14: </configuration> 

4、Client

Client的界面很简单:输入两个操作数,点击“=”按钮,将运算结果显示出来。

image

先来看看client端对callback contract的实现:

   1: namespace Clients
   2: {
   3:     public class CalculateCallback : ICalculateCallback
   4:     {
   5:         public static TextBox ResultPanel;               
   6:  
   7:         #region ICalculateCallback Members 
   8:  
   9:         public void DisplayResult(double result)
  10:         {
  11:             ResultPanel.Text = result.ToString();
  12:         }         
  13:  
  14:         #endregion
  15:     }
  16: } 

这是配置:

   1: <configuration>
   2:     <system.serviceModel>
   3:         <client>
   4:             <endpoint address="net.tcp://127.0.0.1:8888/calculateservice"
   5:                 binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"
   6:                 name="calculateservice" />
   7:         </client>
   8:     </system.serviceModel>
   9: </configuration>
然后是我们“=”按钮的单击事件对运算的实现:
   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   5:     ICalculate calculator = channelFactory.CreateChannel();
   6:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   7: } 

CalculateCallback 用于显示运算结果的TextBox通过statis property实现传递。这个实现很简单,貌似没有什么问题,但是我们运行程序,在客户端就会抛出这样的exception。可以看出是一个TimeoutException。

image

二、是什么导致TimeoutException?

我们现在来分析是什么导致了TimeoutException的抛出。原因很简单:由于我们对service的调用的是在UI 线程调用的,所以在开始调用到最终得到结果,这个UI Thread会被锁住;但是当service进行了相应的运算的到运算的结果后,需要调用callback对象对client进行回调,默认的情况下,Callback的执行是在UI线程执行的。当Callback试图执行的时候,发现UI 线程被锁,只能等待。这样形成一个死锁,UI线程需要等待CalculateService执行返回后才能解锁,而CalculateService需要Callback执行完成;而Callback需要等到UI线程解锁才能执行。

基于上门的原因,我们有两种解决方案:

  • CalculateService不必等到Callback执行完成就返回,我们可以通过异步调用Callback。或者让Client异步方式调用CalculateService,以便及时释放UI线程,我们可以通过One-way的方式来进行service的调用。
  • 让Callback的执行不必绑定到UI线程

三、解决方案一:通过异步调用或者One-way回调

为了简单起见,我们通过ThreadPool实现了异步回调:

   1: public void Add(double op1, double op2)
   2: {
   3:     double result = op1 + op2;
   4:     ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();  
   5:  
   6:     ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);
   7: } 

这是一种方案,另一种是将Add操作设成One-way的:

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract(IsOneWay = true)]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

这两种方案都可以解决问题。

四、方案二、通过解除Callback操作和UI线程的关联性

现在我们才进入我们今天讨论的主题:WCF并发操作的线程关联性问题。在这之前,我们需要了解一个重要的对象:SynchonizationContext(System.Threading.SynchronizationContext)。SynchonizationContext就是为了解决这种线程关联性问题而设计的。SynchonizationContext提供了两个主要的API将操作和对应的Thread关联:Post和Send。

   1: public virtual void Post(SendOrPostCallback d, object state) 
   2: public virtual void Send(SendOrPostCallback d, object state) 

Send和Post分别以同步和异步的方式将以Delegate表示的具体的操作和SynchonizationContext对象对应的Thread关联,而SendOrPostCallback delegate对象代表你需要的线程关联操作,state代表传入delegate的参数:

public delegate void SendOrPostCallback(object state);

对于某些具有线程关联的应用,比如Windows Form application,在程序启动的时候,会设置当前的SynchonizationContext对象(Windows Form application使用的是继承了SynchonizationContext的WindowsFormsSynchronizationContext :System.Windows.Forms.WindowsFormsSynchronizationContext)。当前SynchonizationContext被成功初始化后,你就可以通过SynchonizationContext的静态属性Current得到它。在你自己的应用中,如何有需要,你也可以自定义SynchonizationContext,并通过静态方法SetSynchronizationContext将其设置为current SynchronizationContext。

对应WCF来说,无论是host一个service,还是在调用service时制定callback,在默认的情况下,service和callback的操作将自动和当前的SynchonizationContext进行关联(如何有的话)。也就是说,如过我们的service被host到Windows Form application下,那么service的操作将在UI 线程下执行;同理,如何我们在一个Windows Forms UI线程下调用duplex service并制定callback,那么callback的最终执行将在UI线程。

关于WCF对线程关联性的控制,可以通过ServiceBehavior或者CallbackBehavior的UseSynchronizationContext属性进行设定,该属性默认为true,这正式WCF默认具有线程关联性的原因。

现在我们来实现我们的第二套方案:让Callback的执行不必绑定到UI线程。为此我们只需要加上如何的CallbackBehavior attribute就可以了。

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel; 
   7:  
   8:         #region ICalculateCallback Members 
   9:  
  10:         public void DisplayResult(double result)
  11:         {
  12:             ResultPanel.Text = result.ToString();
  13:  
  14:         } 
  15:  
  16:         #endregion
  17:     }
  18: }
  19:  

但是现在我们运行我们的程序,将会出现如下的InvalidOperation异常:

image

原因很简单,由于我们将callbaclk的UseSynchronizationContext 设置成false,那么callback的操作将不会再UI线程下执行。但是我们需要运算的结果输入到UI的TextBox上,对UI上控件的操作需要在UI线程上执行,显然会抛出异常了。

为了我们引入SynchonizationContext到CalculateCallback中:将SynchonizationContext定义成一个static属性,通过Post方法异步地实现对运算结果的显示。

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:        public static SynchronizationContext SynchronizationContext; 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:              SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);           
  14:         }        
  15:  
  16:         #endregion
  17:     }
  18: } 

SynchonizationContext在调用service的时候指定:

   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     CalculateCallback.SynchronizationContext = SynchronizationContext.Current; 
   5:  
   6:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   7:     ICalculate calculator = channelFactory.CreateChannel();
   8:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   9: } 

现在我们程序能够正常运行了。

五、另一种可选方案:通过ISynchronizeInvoke的Invoke/BeginInvoke

熟悉Windows Form编程的读者应该都知道,WinForm空间的基类Control(System.Windows.Forms.Control)都实现了System.ComponentModel.ISynchronizeInvoke接口,而Control对ISynchronizeInvoke的实现就是为了解决Control的操作必须在创建Control线程的问题,ISynchronizeInvoke定义Invoke和BeginInvoke方法方面我们以同步或者异步的方式操作Control:

   1: public interface ISynchronizeInvoke
   2: {
   3:     // Methods
   4:     [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]
   5:     IAsyncResult BeginInvoke(Delegate method, object[] args);
   6:     object EndInvoke(IAsyncResult result);
   7:     object Invoke(Delegate method, object[] args); 
   8:  
   9:     // Properties
  10:     bool InvokeRequired { get; }
  11: } 
  12:  

如何我们放弃基于SynchonizationContext的解决方案,我们也可以通过基于ISynchronizeInvoke的方式来解决这个问题。为此我们这样定义CalculateCallback:

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:         public delegate void DisplayResultDelegate(TextBox resultPanel, double result); 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:             DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);
  14:            ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });                   
  15:         } 
  16:  
  17:         private void DisplayResult(TextBox resultPanel, double result)
  18:         {
  19:             resultPanel.Text = result.ToString();
  20:         } 
  21:  
  22:         #endregion
  23:     }
  24: } 
  25:  

由于BeginInvoke方式只能接受一个具体的delegate对象(不能使用匿名方法),所以需要定义一个具体的Delegate(DisplayResultDelegate)和对应的方法(DisplayResult),参数通过一个object[]传入。

从本质上将,这两种方式的实现完全是一样的,如何你查看System.Windows.Forms.WindowsFormsSynchronizationContext的代码,你会发现其Send和Post方方法就是通过调用Invoke和BeginInvoke方式实现的。

六、Service Hosting的线程关联性

我们花了很多的精力介绍了WCF Duplex通信中Callback操作的线程关联性问题,实际上我们使用到更多的还是service操作的线程关联性问题。就以我们上面的程序为例,我们通过一个Windows Form application来host我们的service,并且要求service的运算结束后将结果输出到server端的Window form的ListBox中,对ListBox的操作肯定需要的Host程序的UI线程中执行。

按照我们一般的想法,我们的Service面向若干client,肯定是并发的接收client端的请求,以多线程的方式执行service的操作,那么操作中UI 控件的操作肯定会出现错误。

我们的程序依然可以正常运行,其根本原因是WCF的service操作默认实现了对Host service的当前线程的SynchonizationContext实现了关联。与Callback操作的线程关联性通过CallbackBehavior的UseSynchronizationContext 进行控制一样,service的线程关联性通过ServiceBehavir的UseSynchronizationContext 进行设定。UseSynchronizationContext 的默认值为true。

如何我们将CalculateService的UseSynchronizationContext 设为false:

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }       
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16:            DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: } 
  24:  

有control被不是创建它的线程操作,肯定会抛出一个InvalidOperationException,就像这样:

image

我们一样可以通过SynchonizationContext或者ISynchronizeInvoke的方式来解决这样的问题,我们只讨论前面一种,为此我们改变了CalculateService的定义:通过SynchonizationContext的Post方法实现对ListBox的访问。

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; } 
   8:  
   9:         public static SynchronizationContext SynchronizationContext
  10:         { get; set; } 
  11:  
  12:         #region ICalculate Members 
  13:  
  14:         public void Add(double op1, double op2)
  15:         {
  16:             double result = op1 + op2;
  17:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
  18:            SynchronizationContext.Post(delegate
  19:             {
  20:                 DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  21:             }, null); 
  22:  
  23:             callback.DisplayResult(result);            
  24:         } 
  25:  
  26:         #endregion
  27:     }
  28: } 
  29:  

通过static属性定义的SynchonizationContext在host的时候指定:

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:    CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 
  13:  

这样我们的程序又可以正常运行了。

WCF后续之旅:
WCF后续之旅(1): WCF是如何通过Binding进行通信的
WCF后续之旅(2): 如何对Channel Layer进行扩展——创建自定义Channel
WCF后续之旅(3): WCF Service Mode Layer 的中枢—Dispatcher
WCF后续之旅(4):WCF Extension Point 概览
WCF后续之旅(5): 通过WCF Extension实现Localization
WCF后续之旅(6): 通过WCF Extension实现Context信息的传递
WCF后续之旅(7):通过WCF Extension实现和Enterprise Library Unity Container的集成
WCF后续之旅(8):通过WCF Extension 实现与MS Enterprise Library Policy Injection Application Block 的集成
WCF后续之旅(9):通过WCF的双向通信实现Session管理[Part I]
WCF后续之旅(9): 通过WCF双向通信实现Session管理[Part II]
WCF后续之旅(10): 通过WCF Extension实现以对象池的方式创建Service Instance
WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)
WCF后续之旅(12): 线程关联性(Thread Affinity)对WCF并发访问的影响
WCF后续之旅(13): 创建一个简单的WCF SOAP Message拦截、转发工具[上篇]
WCF后续之旅(13):创建一个简单的SOAP Message拦截、转发工具[下篇]
WCF后续之旅(14):TCP端口共享
WCF后续之旅(15): 逻辑地址和物理地址
WCF后续之旅(16): 消息是如何分发到Endpoint的--消息筛选(Message Filter)
WCF后续之旅(17):通过tcpTracer进行消息的路由


作者:蒋金楠
微信公众账号:大内老A
微博: www.weibo.com/artech
如果你想及时得到个人撰写文章以及著作的消息推送,或者想看看个人推荐的技术资料,可以扫描左边二维码(或者长按识别二维码)关注个人公众号(原来公众帐号 蒋金楠的自媒体将会停用)。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
2月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
246 0
|
1月前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
209 59
|
5天前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
5天前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程
|
1月前
|
Java C# Python
线程等待(Thread Sleep)
线程等待是多线程编程中的一种同步机制,通过暂停当前线程的执行,让出CPU时间给其他线程。常用于需要程序暂停或等待其他线程完成操作的场景。不同语言中实现方式各异,如Java的`Thread.sleep(1000)`、C#的`Thread.Sleep(1000)`和Python的`time.sleep(1)`。使用时需注意避免死锁,并考虑其对程序响应性的影响。
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
43 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
61 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
36 3