>>>>>>DIOCP讨论群:320641073
>>>>>>SVN源码和DEMO下载:https://code.google.com/p/diocp/
网络带宽有限,对数据进行压缩再进行传送可以有效的利用带宽和提高传输的效率。
DIOCP中对数据进行压缩传送,需要修改编码器和解码器,先说说这两个东西的的用法和功能。
举个例子:我们要把一台电脑快递回老家给正在上学的小弟使用,那么老家就是服务端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。
在这个过程中,发送一个对象(电脑)用到客户端的编码器,接收对象(电脑)用到服务端的解码器。
在之前编写的DIOCP例子中都使用了JSonStream对象进行传输,这个TJSonStream类,主要有两个部分的数据,第一部分包含JSon字符串数据,第二部分包含Stream流数据。在三层数据保存例子中我们把客户端的请求放在JSon中。服务端接收数据后通过服务端的解码器还原成JSonStream对象。然后进行逻辑的处理,服务端回写对象时,同过服务端的编码器对回写的JSonStream进行编码发送,客户端通过客户端的解码器接收并还原成JSonStream对象。在服务端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,如果对Stream对象进行压缩,在做压缩中调试程序时发现一个70K的数据包进行一下压缩,数据包可以变成7K了,对文本压缩效果还是很不错的。
*当然我们允许自己定义协议和编写编码和解码器,我们可以定义自己的TStrStream类或者TXMLStream等等,然后编写相应的编码器和解码器就行了。
下面分析下代码
客户端代码:
var lvJSonStream, lvRecvObject:TJsonStream; lvStream:TStream; lvData:AnsiString; l, j, x:Integer; begin lvJSonStream := TJsonStream.Create; try lvJSonStream.JSon := SO(); lvJSonStream.JSon.I['cmdIndex'] := 1001; //打开一个SQL脚本,获取数据 lvJSonStream.Json.S['sql'] := mmoSQL.Lines.Text; FClientSocket.sendObject(lvJSonStream); finally lvJSonStream.Free; end;
首先是建立一个TJSonStream对象,然后设定信息,因为是发生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,
procedure TD10ClientSocket.sendObject(pvObject:TObject); begin if FCoder = nil then raise Exception.Create('没有注册对象编码和解码器(registerCoder)!'); if not Active then Exit; FCoder.Encode(Self, pvObject); end;
可以看出这里调用注册的编码器,调用Encode函数
客户端编码器的Encode函数如下
procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject: TObject); var lvJSonStream:TJsonStream; lvJSonLength:Integer; lvStreamLength:Integer; sData, lvTemp:String; lvStream:TStream; lvTempBuf:PAnsiChar; lvBytes, lvTempBytes:TBytes; l:Integer; lvBufBytes:array[0..1023] of byte; begin if pvObject = nil then exit; lvJSonStream := TJsonStream(pvObject); //是否压缩流 if (lvJSonStream.Stream <> nil) then begin if lvJSonStream.Json.O['config.stream.zip'] <> nil then begin if lvJSonStream.Json.B['config.stream.zip'] then begin //压缩流 TZipTools.compressStreamEx(lvJSonStream.Stream); end; end else if lvJSonStream.Stream.Size > 0 then begin //压缩流 TZipTools.compressStreamEx(lvJSonStream.Stream); lvJSonStream.Json.B['config.stream.zip'] := true; end; end; sData := lvJSonStream.JSon.AsJSon(True); lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData); lvJSonLength := Length(lvBytes); lvStream := lvJSonStream.Stream; lvJSonLength := TNetworkTools.htonl(lvJSonLength); if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit; if lvStream <> nil then begin lvStreamLength := lvStream.Size; end else begin lvStreamLength := 0; end; lvStreamLength := TNetworkTools.htonl(lvStreamLength); if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit; //json bytes if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit; if lvStream.Size > 0 then begin lvStream.Position := 0; repeat l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes)); if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit; until (l = 0); end; end;
该部分完成的功能有
1.判断Stream数据是否需要压缩。
2.发送Json数据长度和Stream数据长度
3.发送Json数据
4.发送Stream数据
说明:
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
这三行代码需要说明下,是为了兼容java,netty做服务端方便解码,当然我们也可以不进行转换。直接发送也是可以的。只要配合服务端就行了。协议是自己定义的。
接下来是服务端IOCP队列中会收到接收数据的信号。
function TIOCPObject.processIOQueued: Integer; var BytesTransferred:Cardinal; lvResultStatus:BOOL; lvRet:Integer; lvIOData:POVERLAPPEDEx; lvDataObject:TObject; lvClientContext:TIOCPClientContext; begin Result := IOCP_RESULT_OK; //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止 lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle, ....... if lvIOData.IO_TYPE = IO_TYPE_Accept then //连接请求 begin TIODataMemPool.instance.giveBackIOData(lvIOData); PostWSARecv(lvClientContext); end else if lvIOData.IO_TYPE = IO_TYPE_Recv then begin //加入到套接字对应的缓存中,处理逻辑 lvClientContext.RecvBuffer(lvIOData.DataBuf.buf, lvIOData.Overlapped.InternalHigh); TIODataMemPool.instance.giveBackIOData(lvIOData); //继续投递接收请求 PostWSARecv(lvClientContext); end; ......... end;
//加入到套接字对应的缓存中,处理逻辑
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
lvIOData.Overlapped.InternalHigh);
//这里会调用解码器尝试进行解码
procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal); var lvObject:TObject; begin FCS.Enter; try //加入到套接字对应的缓存 FBuffers.AddBuffer(buf, len); //调用注册的解码器<进行解码> lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers); if lvObject <> nil then try try //解码成功,调用业务逻辑的处理方法 dataReceived(lvObject); except on E:Exception do begin TIOCPFileLogger.logErrMessage('截获处理逻辑异常!' + e.Message); end; end; //清理掉这一次分配的内存<如果没有可用的内存块>清理 if FBuffers.validCount = 0 then begin FBuffers.clearBuffer; end; finally lvObject.Free; end; finally FCS.Leave; end; end;
我们在之前的Demo中使用的是TIOCPJSonStreamDecoder解码器
function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject; var lvJSonLength, lvStreamLength:Integer; lvData:String; lvBuffer:array of Char; lvBufData:PAnsiChar; lvStream:TMemoryStream; lvJsonStream:TJsonStream; lvBytes:TBytes; lvValidCount:Integer; begin Result := nil; //如果缓存中的数据长度不够包头长度,解码失败<json字符串长度,流长度> lvValidCount := inBuf.validCount; if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then begin Exit; end; //记录读取位置 inBuf.markReaderIndex; inBuf.readBuffer(@lvJSonLength, SizeOf(Integer)); inBuf.readBuffer(@lvStreamLength, SizeOf(Integer)); lvJSonLength := TNetworkTools.ntohl(lvJSonLength); lvStreamLength := TNetworkTools.ntohl(lvStreamLength); //如果缓存中的数据不够json的长度和流长度<说明数据还没有收取完毕>解码失败 lvValidCount := inBuf.validCount; if lvValidCount < (lvJSonLength + lvStreamLength) then begin //返回buf的读取位置 inBuf.restoreReaderIndex; exit; end else if (lvJSonLength + lvStreamLength) = 0 then begin //两个都为0<两个0>客户端可以用来作为自动重连使用 TIOCPFileLogger.logDebugMessage('接收到一次[00]数据!'); Exit; end; //解码成功 lvJsonStream := TJsonStream.Create; Result := lvJsonStream; //读取json字符串 if lvJSonLength > 0 then begin SetLength(lvBytes, lvJSonLength); ZeroMemory(@lvBytes[0], lvJSonLength); inBuf.readBuffer(@lvBytes[0], lvJSonLength); lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes); lvJsonStream.Json := SO(lvData); end else begin TFileLogger.instance.logMessage('接收到一次JSon为空的一次数据请求!', 'IOCP_ALERT_'); end; //读取流数据 if lvStreamLength > 0 then begin GetMem(lvBufData, lvStreamLength); try inBuf.readBuffer(lvBufData, lvStreamLength); lvJsonStream.Stream.Size := 0; lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength); //解压流 if lvJsonStream.Json.B['config.stream.zip'] then begin //解压 TZipTools.unCompressStreamEX(lvJsonStream.Stream); end; finally FreeMem(lvBufData, lvStreamLength); end; end; end;
//服务端解码器中有三行代码来配合客户端的编码流
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
/////
服务端解码器主要完成的功能有
0.判断接收到的数据是否可以进行解码,如果不可以退出,解码不成功。
1.接收json长度,流数据长度
2.接收json数据,接收流数据存入JsonStream.json中,
3.根据json中config.stream.zip进行判断流数据是否需要解压.放入JsonStream.stream中
4.解码成功返回JsonStream对象。
解码完成后可以看到
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
if lvObject <> nil then
try //解码成功,调用业务逻辑的处理方法
dataReceived(lvObject);
………
解码成功调用dataReceived,进行逻辑的处理。
总结:
服务端的解码器配套客户端的编码器,服务端的编码器配套客户端的解码器。