本文及程序不是介绍WCF怎么用,而是研究如何在WCF通信时的通道两端自动进行数据压缩和解压缩,从而增加分布式数据传输速度。
而且,这个过程是完全透明的,用户及编程人员根本不需要知道它的存在,相当于HOOK在两端的一个组件。可以使用中网络带宽较小
的网络环境中。当WCF在两个实体间通讯的时候,便自动创建一个信息通道转接通讯,这个消息包含数据请求和相应。WCF使用特殊的
编码器将请求和响应数据转换成一系列的字节。
我所带的项目里遇到大文件分布式传输问题,经过分析考虑采用WCF通道压缩技术来解决此问题。执行这样的编码是需要传输大文件(XML格式)由一台机器到另一台机器传输,而连接有速度限制。经过查看了一些国外英文网站,发现我不用写一个特殊的函数边压缩和边解压,而是配置传输通道可以做到这一点,这种方式压缩可重复使用的任何契约。我发现自己编写的消息编码器是最简单的方式来实现功能,真正的问题是如何编写信息编码器,在MSDN上没有找到任何关于此应用的实例。消息契约编码器的想法是Hook连接两端发送和接收信息的渠道。程序是采用Microsoft Visual Studio 2008 WCF设计。
图1 WCF消息通道编码过程时序图
发送方:代码中加入方法,该方法及其参数的序列化成SOAP消息,消息编码序列化的信息将成为一个字节数组,字节数组发送传输层。
接收方:传输层接收字节数组,消息编码器并行化字节数组到一条消息,该方法及其参数并行化到一个SOAP消息,方法是被监听的。 当加入压缩信息编码器,该方法要求有一点改变:发送方:代码中加入方法,该方法及其参数的序列化成SOAP消息,消息契约编码让其内在的信息编码序列的信息成为一个字节数组,消息契约编码压缩的字节数组第二个字节数组,字节数组发送传输层。
接收方:传输层接收字节数组,消息契约编码的字节数组解压到第二字节数组,消息契约编码让其内在的信息编码化的第二个字节数组消息,该方法及其参并行化到SOAP消息,方法是被监听的。
这个消息契约编码分为几个类:
CompactMessageEncoder //这个类提供了信息编码实施。
CompactMessageEncoderFactory //这个类是负责提供契约信息编码实例。
CompactMessageEncodingBindingElement //这个类负责通道的协议约束规范。
CompactMessageEncodingElement //这个类使信息编码通过增加应用程序配置文件。
图2 消息通道编码器静态类图
压缩方法:契约消息编码器是使用gzip压缩的NET Framework范围内执行的,是调用System.IO.Compression.GZipStream名字空间类中。
加入引用CompactMessageEncoder.dll,修改app.config文件引用,应用程序必须要在客户端和服务器端。
压缩缓冲代码:
private
static
ArraySegment
<
byte
>
CompressBuffer(ArraySegment
<
byte
>
buffer, BufferManager bufferManager,
int
messageOffset)
{
// Create a memory stream for the final message
MemoryStream memoryStream = new MemoryStream();
// Copy the bytes that should not be compressed into the stream
memoryStream.Write(buffer.Array, 0 , messageOffset);
// Compress the message into the stream
using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Compress, true ))
{
gzStream.Write(buffer.Array, messageOffset, buffer.Count);
}
// Convert the stream into a bytes array
byte [] compressedBytes = memoryStream.ToArray();
// Allocate a new buffer to hold the new bytes array
byte [] bufferedBytes = bufferManager.TakeBuffer(compressedBytes.Length);
// Copy the compressed data into the allocated buffer
Array.Copy(compressedBytes, 0 , bufferedBytes, 0 , compressedBytes.Length);
// Release the original buffer we got as an argument
bufferManager.ReturnBuffer(buffer.Array);
// Create a new ArraySegment that points to the new message buffer
ArraySegment < byte > byteArray = new ArraySegment < byte > (bufferedBytes, messageOffset, compressedBytes.Length - messageOffset);
return byteArray;
}
{
// Create a memory stream for the final message
MemoryStream memoryStream = new MemoryStream();
// Copy the bytes that should not be compressed into the stream
memoryStream.Write(buffer.Array, 0 , messageOffset);
// Compress the message into the stream
using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Compress, true ))
{
gzStream.Write(buffer.Array, messageOffset, buffer.Count);
}
// Convert the stream into a bytes array
byte [] compressedBytes = memoryStream.ToArray();
// Allocate a new buffer to hold the new bytes array
byte [] bufferedBytes = bufferManager.TakeBuffer(compressedBytes.Length);
// Copy the compressed data into the allocated buffer
Array.Copy(compressedBytes, 0 , bufferedBytes, 0 , compressedBytes.Length);
// Release the original buffer we got as an argument
bufferManager.ReturnBuffer(buffer.Array);
// Create a new ArraySegment that points to the new message buffer
ArraySegment < byte > byteArray = new ArraySegment < byte > (bufferedBytes, messageOffset, compressedBytes.Length - messageOffset);
return byteArray;
}
解压缓冲代码:
Code
private static ArraySegment < byte > DecompressBuffer(ArraySegment < byte > buffer, BufferManager bufferManager)
{
// Create a new memory stream, and copy into it the buffer to decompress
MemoryStream memoryStream = new MemoryStream(buffer.Array, buffer.Offset, buffer.Count);
// Create a memory stream to store the decompressed data
MemoryStream decompressedStream = new MemoryStream();
// The totalRead stores the number of decompressed bytes
int totalRead = 0 ;
int blockSize = 1024 ;
// Allocate a temporary buffer to use with the decompression
byte [] tempBuffer = bufferManager.TakeBuffer(blockSize);
// Uncompress the compressed data
using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Decompress))
{
while ( true )
{
// Read from the compressed data stream
int bytesRead = gzStream.Read(tempBuffer, 0 , blockSize);
if (bytesRead == 0 )
break ;
// Write to the decompressed data stream
decompressedStream.Write(tempBuffer, 0 , bytesRead);
totalRead += bytesRead;
}
}
// Release the temporary buffer
bufferManager.ReturnBuffer(tempBuffer);
// Convert the decompressed data stream into bytes array
byte [] decompressedBytes = decompressedStream.ToArray();
// Allocate a new buffer to store the message
byte [] bufferManagerBuffer = bufferManager.TakeBuffer(decompressedBytes.Length + buffer.Offset);
// Copy the bytes that comes before the compressed message in the buffer argument
Array.Copy(buffer.Array, 0 , bufferManagerBuffer, 0 , buffer.Offset);
// Copy the decompressed data
Array.Copy(decompressedBytes, 0 , bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);
// Create a new ArraySegment that points to the new message buffer
ArraySegment < byte > byteArray = new ArraySegment < byte > (bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);
// Release the original message buffer
bufferManager.ReturnBuffer(buffer.Array);
return byteArray;
}
private static ArraySegment < byte > DecompressBuffer(ArraySegment < byte > buffer, BufferManager bufferManager)
{
// Create a new memory stream, and copy into it the buffer to decompress
MemoryStream memoryStream = new MemoryStream(buffer.Array, buffer.Offset, buffer.Count);
// Create a memory stream to store the decompressed data
MemoryStream decompressedStream = new MemoryStream();
// The totalRead stores the number of decompressed bytes
int totalRead = 0 ;
int blockSize = 1024 ;
// Allocate a temporary buffer to use with the decompression
byte [] tempBuffer = bufferManager.TakeBuffer(blockSize);
// Uncompress the compressed data
using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Decompress))
{
while ( true )
{
// Read from the compressed data stream
int bytesRead = gzStream.Read(tempBuffer, 0 , blockSize);
if (bytesRead == 0 )
break ;
// Write to the decompressed data stream
decompressedStream.Write(tempBuffer, 0 , bytesRead);
totalRead += bytesRead;
}
}
// Release the temporary buffer
bufferManager.ReturnBuffer(tempBuffer);
// Convert the decompressed data stream into bytes array
byte [] decompressedBytes = decompressedStream.ToArray();
// Allocate a new buffer to store the message
byte [] bufferManagerBuffer = bufferManager.TakeBuffer(decompressedBytes.Length + buffer.Offset);
// Copy the bytes that comes before the compressed message in the buffer argument
Array.Copy(buffer.Array, 0 , bufferManagerBuffer, 0 , buffer.Offset);
// Copy the decompressed data
Array.Copy(decompressedBytes, 0 , bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);
// Create a new ArraySegment that points to the new message buffer
ArraySegment < byte > byteArray = new ArraySegment < byte > (bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);
// Release the original message buffer
bufferManager.ReturnBuffer(buffer.Array);
return byteArray;
}
改变服务端配置
加入消息契约编码器之前
app.config
的实例:
<?
xml version="1.0" encoding="utf-8"
?>
< configuration >
< system.serviceModel >
< services >
< service name ="Server.MyService" >
< endpoint
address ="net.tcp://localhost:1234/MyService"
binding ="netTcpBinding"
contract ="Server.IMyService" />
</ service >
</ services >
</ system.serviceModel >
</ configuration >
< configuration >
< system.serviceModel >
< services >
< service name ="Server.MyService" >
< endpoint
address ="net.tcp://localhost:1234/MyService"
binding ="netTcpBinding"
contract ="Server.IMyService" />
</ service >
</ services >
</ system.serviceModel >
</ configuration >
加入消息契约编码器后
app.config
的例子:
Code
<? xml version="1.0" encoding="utf-8" ?>
< configuration >
< system.serviceModel >
< services >
< service name ="Server.MyService" >
<!-- Set the binding of the endpoint to customBinding -->
< endpoint
address ="net.tcp://localhost:1234/MyService"
binding ="customBinding"
contract ="Server.IMyService" />
</ service >
</ services >
<!-- Defines a new customBinding that contains the compactMessageEncoding -->
< bindings >
< customBinding >
< binding name ="compactBinding" >
< compactMessageEncoding >
<!-- Defines the inner message encoder as binary encoder -->
< binaryMessageEncoding />
</ compactMessageEncoding >
< tcpTransport />
</ binding >
</ customBinding >
</ bindings >
<!-- Adds the extension dll so the WCF can find the compactMessageEncoding -->
< extensions >
< bindingElementExtensions >
< add name ="compactMessageEncoding" type ="Amib.WCF.CompactMessageEncodingElement, CompactMessageEncoder, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" />
</ bindingElementExtensions >
</ extensions >
</ system.serviceModel >
</ configuration >
<? xml version="1.0" encoding="utf-8" ?>
< configuration >
< system.serviceModel >
< services >
< service name ="Server.MyService" >
<!-- Set the binding of the endpoint to customBinding -->
< endpoint
address ="net.tcp://localhost:1234/MyService"
binding ="customBinding"
contract ="Server.IMyService" />
</ service >
</ services >
<!-- Defines a new customBinding that contains the compactMessageEncoding -->
< bindings >
< customBinding >
< binding name ="compactBinding" >
< compactMessageEncoding >
<!-- Defines the inner message encoder as binary encoder -->
< binaryMessageEncoding />
</ compactMessageEncoding >
< tcpTransport />
</ binding >
</ customBinding >
</ bindings >
<!-- Adds the extension dll so the WCF can find the compactMessageEncoding -->
< extensions >
< bindingElementExtensions >
< add name ="compactMessageEncoding" type ="Amib.WCF.CompactMessageEncodingElement, CompactMessageEncoder, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" />
</ bindingElementExtensions >
</ extensions >
</ system.serviceModel >
</ configuration >
客户端配置变化
加入消息契约编码器之前
app.config
的实例:
Code
加入消息契约编码器后
app.config
的例子:
Code
这种压缩方法,消息堵塞的几率很小。
使用
CompactMessageEncoder
在同一台机器运行客户端和服务器上可能会降低效率。