本节目录:
接收者:通道侦听器
和它们的名字暗示的一样,通道侦听器就是为了创建通道并侦听传入的消息。这个模型借鉴了伯克利Socket编程API。在WCF里,这个模型可以在Windows Socket(Winsock) API里看到。在.NET Framework编程里,这个模型存在于System.Net.Sockets命名空间里。在这个模型里,TcpListener
或Socket会绑定一个地址,然后被动侦听连接传入的消息。当连接建立以后(例如,客户端链接到侦听器),会有一个以Accept开头的方法返回一个TcpListener
或Socket的实例,程序可以使用这个对象来接受数据。
在WCF里,通道侦听器做着相同的工作。它们会绑定一个URI,然后等待传入的消息,当连接建立以后一个以Accept开头的方法会返回一个通道的实例。然后程序可以使用这个通道对象来接收消息。尽管所有的通道都定义了以Accept开头的方法,但是只有侦听器类型的侦听器才会侦听消息。举例说明一下,考虑一下通道侦听器的堆栈。通道侦听器处于堆栈的最底层。传输通道侦听器是唯一的一个绑定地址并且开始侦听连接消息的通道。堆栈处于上面通道侦听器会依次调用下一个通道侦听器的Accept方法,直到调用最底层的传输通道侦听器。
图7-1:传输通道堆栈
但是也不是所有的传输通道侦听器都是这样工作的。因为各自传输层固有的差别,导致了他们之间存在很大的不同。例如,面向连接的传输通道侦听器(例如TCP/IP和named pipe)在接收了一个消息以后会返回一个通道。面向非连接的传输通道侦听器(例如MSMQ)会立即返回一个通道,因为不需要等待请求消息。
IChannelListener
接口
所有的通道都实现了System.ServiceModel.Channels.IChannelListener接口。这个接口会强制所有的通道实现通道状态机,而且包含一些基本的通道侦听器成员。IChannelListener的定义如下:
public interface IChannelListener : ICommunicationObject {
IAsyncResult BeginWaitForChannel(TimeSpan timeout,
AsyncCallback callback,
Object
state);
Bool
ean EndWaitForChannel(IAsyncResult result);
Bool
ean WaitForChannel(TimeSpan timeout);
T GetProperty<T>() where T: class;
// the listening address
Uri
Uri { get; }
}
WaitForChannel
方法(包含异步方法)可以返回一个Boolean值,来表示一个通道是否可用。Uri可以访问侦听地址。GetProperty<T>可以被实现的IChannel的类使用。IChannelListener接口没有实现IChannel,因为IChannel是区别API里通道的标识。例如,对于类型和IChannel接口,许多泛型参数是受约束的。目的就是为了约束拥有特定形状通道的参数。如果IChannelListener实现了Channel,通道侦听器就可以在很多地方使用,而不是只能被通道调用,并且堆栈和堆栈里的通道侦听器都必须支持查询功能。
所有的通道同样也实现IChannelListener<TChannel>接口。这里我们会第一次看到伯克利Socket API的
Accept范式,如下所示:
public interface IChannelListener<TChannel> : IChannelListener,
where TChannel: class,
IChannel {
TChannel AcceptChannel();
TChannel AcceptChannel(TimeSpan timeout);
IAsyncResult BeginAcceptChannel(AsyncCallback callback,
Object state);
IAsyncResult BeginAcceptChannel(TimeSpan timeout,
AsyncCallback callback,
Object
state);
TChannel EndAcceptChannel(IAsyncResult result);
}
注意,接口定义把泛型类型指定为一个具体的IChannel。在WCF API里,实现特定形状的通道必须遵守这个规则。整体来看,这意味着通道侦听器必须引用一个通道形状。这与通道使用通道形状的方式不同。通道是实现通道形状,而通道侦听器引用一个通道形状并且使用这个引用来创建特定的通道。
实现IChannelListener<TChannel>接口的类型必须通过AcceptChannel方法(包含异步实现)返回一个通道的实例。与通道层的其它成员一样,AcceptChannel也包含一个重载的方法,它可以接受一个TimeSpan参数。因为消息接收程序可以长时间等待接受消息,所以,参数经常是TimeSpan.MaxValue。
所有的通道侦听器都继承自抽象类型System.ServiceModel.Channels.ChannelListenerBase。它的定义如下:
public abstract class ChannelListenerBase : ChannelManagerBase,
IChannelListener,
ICommunicationObject {
protected ChannelListenerBase();
protected ChannelListenerBase(IDefaultCommunicationTimeouts timeouts);
// IChannelListener implementation
public IAsyncResult BeginWaitForChannel(TimeSpan timeout,
AsyncCallback callback,
Object
state);
public bool EndWaitForChannel(IAsyncResult result);
public bool WaitForChannel(TimeSpan timeout);
// Extensibility points for IChannelListener members
protected abstract IAsyncResult OnBeginWaitForChannel(TimeSpan timeout,
AsyncCallback callback,
Object state);
protected abstract bool OnEndWaitForChannel(IAsyncResult result);
protected abstract bool OnWaitForChannel(TimeSpan timeout);
public abstract
Uri Uri { get; }
// Query mechanism
public virtual T GetProperty<T>() where T: class;
// CommunicationObject timeouts
protected override TimeSpan DefaultCloseTimeout { get; }
protected override TimeSpan DefaultOpenTimeout { get; }
// ChannelManagerBase timeouts
protected override TimeSpan DefaultReceiveTimeout { get; }
protected override TimeSpan DefaultSendTimeout { get; }
}
这里非常有趣的一点就是,有一个构造函数接受一个TimeSpan参数。由于ChannelListenerBase的类型层次的原因,它定义了4个
protected的TimeSpan属性。WCF类型系统对每个超时属性默认设置的值都是1分钟。如果这个值不能满足通道(包括子通道)的需求,你可以通过ChannelListenerBase构造函数传递一个IDefaultCommunicationTimeouts参数。在这个构造函数里,超时的值会设置到每个相关的属性上。你会在第8章“绑定”里看到Binding实现了IDefaultCommunicationTimeouts接口,而且这个也是用户控制通道超时属性的一种方式。
通道侦听器也继承自抽象类型System.ServiceModel.Channels.ChannelListenerBase<TChannel>。这个类型是ChannelListenerBase的子类型,而且实现了IChannelListener<TChannel>接口,如下所示:
public abstract class ChannelListenerBase<TChannel> : ChannelListenerBase,
IChannelListener<TChannel>, where TChannel: class, IChannel {
protected ChannelListenerBase();
protected ChannelListenerBase(IDefaultCommunicationTimeouts timeouts);
// IChannelListener<TChannel> implementation
public IAsyncResult BeginAcceptChannel(AsyncCallback callback,
Object state);
public IAsyncResult BeginAcceptChannel(TimeSpan timeout,
AsyncCallback callback, Object state);
public TChannel EndAcceptChannel(IAsyncResult result);
public TChannel AcceptChannel();
public TChannel AcceptChannel(TimeSpan timeout);
// extensibility points for IChannelListener<TChannel>
protected abstract TChannel OnAcceptChannel(TimeSpan timeout);
protected abstract IAsyncResult OnBeginAcceptChannel(TimeSpan timeout,
AsyncCallback callback, Object state);
protected abstract TChannel OnEndAcceptChannel(IAsyncResult result);
}
既然已经学习完了通道侦听器里使用的类型,现在我们自己来创建一个通道侦听器。前面一章里,我们学习了如何构建不同的DelegatorChannel通道。这一节里,我们会学习如何在消息接受程序端创建一个通道侦听器,而且这些通道侦听器能够创建DelegatorChannel通道。当然这些代码直到第8章才能全部给出。
当创建一个通道侦听器的时候,我们首先考虑的就是要支持的通道形状。因为我们的DelegatorChannel例子要支持任何一种通道形状,所以我们的通道侦听器必须能够创建所有已知的DelegatorChannel通道。在第6章里,我们使用泛型参数,这给我们的编码带来了很大的灵活性,这里我们也会这么做。
我们就从熟悉的地方开始。这里最简单的方法就是继承ChannelListenerBase<TChannel>类型。我们必须使得我们的通道侦听器类型支持泛型和任何可能的通道形状。定义如下:
internal sealed class DelegatorChannelListener<TShape> :
ChannelListenerBase<TShape> where TShape : class, IChannel {
// implementation omitted for clarity
}
注意这里修改了DelegatorChannelListener<TShape>类型的访问控制器。和第6章里的通道定义一样,这里的通道侦听器不需要外部调用者访问。我们会在第8章里介绍通过Binding
和BindingElement对象访问此对象。现在我们已经定义完了通道侦听器的基类型,现在就来实现它。下面是DelegatorChannelListener<TShape>的具体实现:
internal sealed class DelegatorChannelListener<TShape> :
ChannelListenerBase<TShape> where TShape : class, IChannel {
// field referencing the next channel listener
IChannelListener<TShape> _innerListener;
// String to output to console
String _consolePrefix = "LISTENER: DelegatorChannelListener";
// builds the next channel listener, then assigns it to
// the _innerListener field
public DelegatorChannelListener(BindingContext context) {
PrintHelper.Print(_consolePrefix, "ctor");
this._innerListener = context.BuildInnerChannelListener<TShape>();
}
// Creates a DelegatorChannel of the correct shape and returns it
private TShape WrapChannel(TShape innerChannel) {
if(innerChannel == null) {
throw new ArgumentNullException("innerChannel cannot be null", "innerChannel");
}
if(typeof(TShape) == typeof(IInputChannel)) {
return (TShape)(Object)new DelegatorInputChannel<IInputChannel>(this,
(IInputChannel)innerChannel, "RECEIVE");
}
if(typeof(TShape) == typeof(IReplyChannel)) {
return (TShape)(object)new DelegatorReplyChannel(this, (IReplyChannel)innerChannel,
"RECEIVE");
}
if(typeof(TShape) == typeof(IDuplexChannel)) {
return (TShape)(object)new DelegatorDuplexChannel(this, (IDuplexChannel)innerChannel,
"RECEIVE");
}
if(typeof(TShape) == typeof(IInputSessionChannel)) {
return (TShape)(object)new DelegatorInputSessionChannel(this,
(IInputSessionChannel)innerChannel, "RECEIVE");
}
if(typeof(TShape) == typeof(IReplySessionChannel)) {
return (TShape)(object)new DelegatorReplySessionChannel(this,
(IReplySessionChannel)innerChannel, "RECEIVE");
}
if(typeof(TShape) == typeof(IDuplexSessionChannel)) {
return (TShape)(object)new DelegatorDuplexSessionChannel(this,
(IDuplexSessionChannel)innerChannel, "RECEIVE");
}
// cannot wrap this channel
throw new ArgumentException(String.Format("invalid channel shape passed:{0}",
innerChannel.GetType()));
}
// IChannelListener<TChannel> members
protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback
callback, object state) {
PrintHelper.Print(_consolePrefix, "OnBeginAcceptChannel");
return this._innerListener.BeginAcceptChannel(timeout, callback, state);
}
protected override TShape OnEndAcceptChannel(IAsyncResult result) {
// create and return the channel
PrintHelper.Print(_consolePrefix, "OnEndAcceptChannel");
TShape innerChannel = _innerListener.EndAcceptChannel(result);
// when closing, _inner.EndAcceptChannel returns null, nothing to wrap
if (innerChannel != null) {
return WrapChannel(innerChannel);
}
return null;
}
protected override TShape OnAcceptChannel(TimeSpan timeout){
// delegate to next channel, wrap it, and return it
PrintHelper.Print(_consolePrefix, "OnAcceptChannel");
TShape innerChannel = _innerListener.AcceptChannel(timeout);
// when closing, _inner.AcceptChannel returns null, nothing to wrap
if (innerChannel != null) {
return WrapChannel(innerChannel);
}
return null;
}
// IChannelListener members
protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback
callback, object state) {
PrintHelper.Print(_consolePrefix, "OnBeginWaitForChannel");
return this._innerListener.BeginWaitForChannel(timeout, callback, state);
}
protected override bool OnEndWaitForChannel(IAsyncResult result) {
PrintHelper.Print(_consolePrefix, "OnEndWaitForChannel");
return this._innerListener.EndWaitForChannel(result);
}
protected override bool OnWaitForChannel(TimeSpan timeout) {
PrintHelper.Print(_consolePrefix, "OnWaitForChannel");
return this._innerListener.WaitForChannel(timeout);
}
public override Uri Uri {
get {
PrintHelper.Print(_consolePrefix, "Uri");
return this._innerListener.Uri;
}
}
public override T GetProperty<T>() {
PrintHelper.Print(_consolePrefix, "GetProperty<" + typeof(T) + ">");
return this._innerListener.GetProperty<T>();
}
// CommunicationObject members
protected override void OnAbort() {
PrintHelper.Print(_consolePrefix, "OnAbort");
this._innerListener.Abort();
}
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback,
object state) {
PrintHelper.Print(_consolePrefix, "OnBeginClose");
return this._innerListener.BeginClose(timeout, callback, state);
}
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback,
object state) {
PrintHelper.Print(_consolePrefix, "OnBeginOpen");
return this._innerListener.BeginOpen(timeout, callback, state);
}
protected override void OnClose(TimeSpan timeout) {
PrintHelper.Print(_consolePrefix, "OnClose");
this._innerListener.Close(timeout);
}
protected override void OnEndClose(IAsyncResult result) {
PrintHelper.Print(_consolePrefix, "OnEndClose");
this._innerListener.EndClose(result);
}
protected override void OnEndOpen(IAsyncResult result) {
PrintHelper.Print(_consolePrefix, "OnEndOpen");
this._innerListener.EndOpen(result);
}
protected override void OnOpen(TimeSpan timeout) {
PrintHelper.Print(_consolePrefix, "OnOpen");
this._innerListener.Open(timeout);
}
}
这里要补充一下。从构造函数开始。和前面几章里DelegatorChannel的定义一样,DelegatorChannelListener<TShape>对象可以与其它通道侦听器在堆栈里共存。虽然有好几种建立通道侦听器堆栈的方法,但是最终都是传输通道侦听器位于堆栈的底部。DelegatorChannelListener<TShape>类型定义了几个IChannelListener<TShape>类型的成员,而且可以通过构造函数赋值。在第8章里会介绍,Binding创建通道侦听器堆栈的主要方式就是使用BindingContext对象。另外一个方法就是改变构造函数的参数类型为IChannelListener<TShape>。这时调用者负责使用BindingContext对象。我个人认为,这个差别问题不大。
DelegatorChannelListener<TShape>
里的大部分方法都和DelegatorChannel里的方法很相似,它们都是简单地调用堆栈里的下一个方法。有趣的是,DelegatorChannelListener<TShape>的私有方法WrapChannel。这个方法的作用就是返回拥有TShape
泛型参数的DelegatorChannel的实例。innerChannel参数传递给DelegatorChannel的构造函数,这样就可以正常地创建通道堆栈。OnAcceptChannel
和OnEndAcceptChannel是仅有的调用WrapChannel的方法。在调用之前,它们必须调用成员变量innerListener的方法(分别调用AcceptChannel
和EndAcceptChannel)并且把通道侦听器传递给WrapChannel方法。
当通道侦听器堆栈关闭的时候,DelegatorChannelListener<TShape>类型会循环调用下一个通道侦听器的关闭方法(比如,Close
、OnClose、Abort和OnAbort)。如果在关闭方法以前调用了BeginAcceptChannel
或AcceptChannel ,委托调用就会返回null。这种情况下,OnEndAcceptChannel或AcceptChannel方法也会返回null。
本文转自 frankxulei 51CTO博客,原文链接:
http://blog.51cto.com/frankxulei/318550
,如需转载请自行联系原作者