DIOCP开源项目-详解编码器和解码器和如何在传输中加入压缩和解压功能

简介: >>>>>>DIOCP讨论群:320641073 >>>>>>SVN源码和DEMO下载:https://code.google.com/p/diocp/ 网络带宽有限,对数据进行压缩再进行传送可以有效的利用带宽和提高传输的效率。

>>>>>>DIOCP讨论群:320641073

>>>>>>SVN源码和DEMO下载:https://code.google.com/p/diocp/

网络带宽有限,对数据进行压缩再进行传送可以有效的利用带宽和提高传输的效率。

DIOCP中对数据进行压缩传送,需要修改编码器和解码器,先说说这两个东西的的用法和功能。

      

          举个例子:我们要把一台电脑快递回老家给正在上学的小弟使用,那么老家就是服务端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。

在这个过程中,发送一个对象(电脑)用到客户端的编码器,接收对象(电脑)用到服务端的解码器。

         

          在之前编写的DIOCP例子中都使用了JSonStream对象进行传输,这个TJSonStream类,主要有两个部分的数据,第一部分包含JSon字符串数据,第二部分包含Stream流数据。在三层数据保存例子中我们把客户端的请求放在JSon中。服务端接收数据后通过服务端的解码器还原成JSonStream对象。然后进行逻辑的处理,服务端回写对象时,同过服务端的编码器对回写的JSonStream进行编码发送,客户端通过客户端的解码器接收并还原成JSonStream对象。在服务端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,如果对Stream对象进行压缩,在做压缩中调试程序时发现一个70K的数据包进行一下压缩,数据包可以变成7K了,对文本压缩效果还是很不错的。

        *当然我们允许自己定义协议和编写编码和解码器,我们可以定义自己的TStrStream类或者TXMLStream等等,然后编写相应的编码器和解码器就行了。

        下面分析下代码

客户端代码:

var
  lvJSonStream, lvRecvObject:TJsonStream;
  lvStream:TStream;
  lvData:AnsiString;
  l, j, x:Integer;
begin
  lvJSonStream := TJsonStream.Create;
  try
    lvJSonStream.JSon := SO();
    lvJSonStream.JSon.I['cmdIndex'] := 1001;   //打开一个SQL脚本,获取数据
    lvJSonStream.Json.S['sql'] := mmoSQL.Lines.Text;

    FClientSocket.sendObject(lvJSonStream);
  finally
    lvJSonStream.Free;
  end;

首先是建立一个TJSonStream对象,然后设定信息,因为是发生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,

procedure TD10ClientSocket.sendObject(pvObject:TObject);
begin
  if FCoder = nil then raise Exception.Create('没有注册对象编码和解码器(registerCoder)!');

  if not Active then Exit;

  FCoder.Encode(Self, pvObject);
end;

可以看出这里调用注册的编码器,调用Encode函数

 

客户端编码器的Encode函数如下

procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
    TObject);
var
  lvJSonStream:TJsonStream;
  lvJSonLength:Integer;
  lvStreamLength:Integer;
  sData, lvTemp:String;
  lvStream:TStream;
  lvTempBuf:PAnsiChar;

  lvBytes, lvTempBytes:TBytes;
  
  l:Integer;
  lvBufBytes:array[0..1023] of byte;
begin
  if pvObject = nil then exit;
  lvJSonStream := TJsonStream(pvObject);
  
  //是否压缩流
  if (lvJSonStream.Stream <> nil) then
  begin
    if lvJSonStream.Json.O['config.stream.zip'] <> nil then
    begin
      if lvJSonStream.Json.B['config.stream.zip'] then
      begin
        //压缩流
        TZipTools.compressStreamEx(lvJSonStream.Stream);
      end;
    end else if lvJSonStream.Stream.Size > 0 then
    begin
      //压缩流
      TZipTools.compressStreamEx(lvJSonStream.Stream);
      lvJSonStream.Json.B['config.stream.zip'] := true;
    end;
  end;

  sData := lvJSonStream.JSon.AsJSon(True);

  lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);

  lvJSonLength := Length(lvBytes);
  lvStream := lvJSonStream.Stream;

  lvJSonLength := TNetworkTools.htonl(lvJSonLength);

  if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;


  if lvStream <> nil then
  begin
    lvStreamLength := lvStream.Size;
  end else
  begin
    lvStreamLength := 0;
  end;

  lvStreamLength := TNetworkTools.htonl(lvStreamLength);
  if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;

  //json bytes
  if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;

  if lvStream.Size > 0 then
  begin
    lvStream.Position := 0;
    repeat
      l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
      if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
    until (l = 0);
  end;
end;

该部分完成的功能有

1.判断Stream数据是否需要压缩。

2.发送Json数据长度和Stream数据长度

3.发送Json数据

4.发送Stream数据

 

说明:

lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

这三行代码需要说明下,是为了兼容java,netty做服务端方便解码,当然我们也可以不进行转换。直接发送也是可以的。只要配合服务端就行了。协议是自己定义的。

 

接下来是服务端IOCP队列中会收到接收数据的信号。

function TIOCPObject.processIOQueued: Integer;
var
  BytesTransferred:Cardinal;
  lvResultStatus:BOOL;
  lvRet:Integer;
  lvIOData:POVERLAPPEDEx;

  lvDataObject:TObject;

  lvClientContext:TIOCPClientContext;
begin
  Result := IOCP_RESULT_OK;

  //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止
  lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
 
  .......
    if lvIOData.IO_TYPE = IO_TYPE_Accept then  //连接请求
    begin
      TIODataMemPool.instance.giveBackIOData(lvIOData);
      PostWSARecv(lvClientContext);
    end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
    begin
      //加入到套接字对应的缓存中,处理逻辑
      lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
        lvIOData.Overlapped.InternalHigh);

      TIODataMemPool.instance.giveBackIOData(lvIOData);

      //继续投递接收请求
      PostWSARecv(lvClientContext);
    end;    
  .........
end;

 

//加入到套接字对应的缓存中,处理逻辑
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
  lvIOData.Overlapped.InternalHigh);

//这里会调用解码器尝试进行解码

procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
var
  lvObject:TObject;
begin
  FCS.Enter;
  try
    //加入到套接字对应的缓存
    FBuffers.AddBuffer(buf, len);

    //调用注册的解码器<进行解码>
    lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
    if lvObject <> nil then
    try
      try
        //解码成功,调用业务逻辑的处理方法
        dataReceived(lvObject);
      except
        on E:Exception do
        begin
          TIOCPFileLogger.logErrMessage('截获处理逻辑异常!' + e.Message);
        end;
      end; 
      //清理掉这一次分配的内存<如果没有可用的内存块>清理
      if FBuffers.validCount = 0 then
      begin
        FBuffers.clearBuffer;
      end;
    finally
      lvObject.Free;
    end;
  finally
    FCS.Leave;
  end;
end;

 

我们在之前的Demo中使用的是TIOCPJSonStreamDecoder解码器

 

function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
var
  lvJSonLength, lvStreamLength:Integer;
  lvData:String;
  lvBuffer:array of Char;
  lvBufData:PAnsiChar;
  lvStream:TMemoryStream;
  lvJsonStream:TJsonStream;
  lvBytes:TBytes;
  lvValidCount:Integer;
begin
  Result := nil;

  //如果缓存中的数据长度不够包头长度,解码失败<json字符串长度,流长度>
  lvValidCount := inBuf.validCount;
  if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
  begin
    Exit;
  end;

  //记录读取位置
  inBuf.markReaderIndex;
  inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
  inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));

  lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
  lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

  //如果缓存中的数据不够json的长度和流长度<说明数据还没有收取完毕>解码失败
  lvValidCount := inBuf.validCount;
  if lvValidCount < (lvJSonLength + lvStreamLength) then
  begin
    //返回buf的读取位置
    inBuf.restoreReaderIndex;
    exit;
  end else if (lvJSonLength + lvStreamLength) = 0 then
  begin
    //两个都为0<两个0>客户端可以用来作为自动重连使用
    TIOCPFileLogger.logDebugMessage('接收到一次[00]数据!');
    Exit;
  end;



  //解码成功
  lvJsonStream := TJsonStream.Create;
  Result := lvJsonStream;

  //读取json字符串
  if lvJSonLength > 0 then
  begin
    SetLength(lvBytes, lvJSonLength);
    ZeroMemory(@lvBytes[0], lvJSonLength);
    inBuf.readBuffer(@lvBytes[0], lvJSonLength);

    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

    lvJsonStream.Json := SO(lvData);
  end else
  begin
    TFileLogger.instance.logMessage('接收到一次JSon为空的一次数据请求!', 'IOCP_ALERT_');
  end;


  //读取流数据 
  if lvStreamLength > 0 then
  begin
    GetMem(lvBufData, lvStreamLength);
    try
      inBuf.readBuffer(lvBufData, lvStreamLength);
      lvJsonStream.Stream.Size := 0;
      lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);

      //解压流
      if lvJsonStream.Json.B['config.stream.zip'] then
      begin
        //解压
        TZipTools.unCompressStreamEX(lvJsonStream.Stream);
      end;
    finally
      FreeMem(lvBufData, lvStreamLength);
    end;
  end;
end;

//服务端解码器中有三行代码来配合客户端的编码流


lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);   
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

/////

服务端解码器主要完成的功能有

0.判断接收到的数据是否可以进行解码,如果不可以退出,解码不成功。

1.接收json长度,流数据长度

2.接收json数据,接收流数据存入JsonStream.json中,

3.根据json中config.stream.zip进行判断流数据是否需要解压.放入JsonStream.stream中
4.解码成功返回JsonStream对象。

 

解码完成后可以看到

lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);

if lvObject <> nil then

try //解码成功,调用业务逻辑的处理方法

    dataReceived(lvObject);

………

解码成功调用dataReceived,进行逻辑的处理。

 

总结:

   服务端的解码器配套客户端的编码器,服务端的编码器配套客户端的解码器。

目录
相关文章
|
2月前
|
存储 编解码 算法
视频为什么可以被压缩?帧内压缩与帧间压缩有何区别?视频编码中的CBR、VBR、CRF...是什么?
视频压缩基于冗余,包括空间冗余、时间冗余、视觉冗余和编码冗余。帧内压缩利用空间相关性,帧间压缩利用时间相关性。视频编码中的码率控制方法有CBR(固定码率)、VBR(动态码率)、CRF(固定码率系数)、CQP(固定质量参数)、CVBR(约束可变码率)和ABR(平均码率),各有优缺点,适用于不同的场景。
|
7月前
|
编解码 数据可视化 搜索推荐
ffdshow源代码分析:解码、编码与多媒体处理的深度探索
ffdshow是知名的DirectShow解码器,集成多种视频音频解码器如libavcodec、libmpeg2等,支持格式丰富。它提供滤镜处理(如锐化、亮度调节)和可视化效果,允许用户个性化设置。此外,ffdshow处理音频,支持AC3、MP3等格式,可外挂DSP插件增强音效。通过对源代码的分析,能深入了解其解码、处理机制,预示着ffdshow将持续改进以提升多媒体体验。
|
8月前
|
机器学习/深度学习 存储 算法
6 种 卷积神经网络压缩方法
6 种 卷积神经网络压缩方法
100 0
|
机器学习/深度学习 存储 人工智能
解码Transformer:自注意力机制与编解码器机制详述与代码实现
解码Transformer:自注意力机制与编解码器机制详述与代码实现
238 0
|
机器学习/深度学习 存储 传感器
【语音压缩】基于adpcm实现语音信号压缩与解压缩附Matlab实现
【语音压缩】基于adpcm实现语音信号压缩与解压缩附Matlab实现
|
存储 编解码 算法
视音频压缩编码
视音频压缩编码
|
机器学习/深度学习 编解码 计算机视觉
即插即用模块 | CompConv卷积让模型不丢精度还可以提速(附论文下载)
即插即用模块 | CompConv卷积让模型不丢精度还可以提速(附论文下载)
369 0
|
机器学习/深度学习 存储 编解码
Opus从入门到精通(八)Opus编码基础之压缩编码
莫尔斯码就是大家熟悉的电报码,它的发明为人类做出了巨大的贡献.该码采用"."和"-"来表示26个英文字母,这实质上还是二进制码(点为"0",而杠为"1"),但是它没有采用固定字长的编码方式,而是采用了常用字母用短码表示(如E用"."表示,T用"-"表示),不常用字母用长码表示(如Z用"--.."表示,j用"-..-"表示)的变长编码方式.通过对英文单词进行大量统计,找出各字母的概率,最后确定有12个字母出现概率最低,用4bit数字表示,有8个字母出现概率较低,用3bit数字表示;有4个字母出现概率较高,用2bit数字表示;有两个字母出现概率最高,用1bit表示,共26个字母.
641 0
|
存储 缓存 安全
为什么你的Opus编码出来的数据有杂音(解决Android平台架构问题)
Gradle插件分为脚本插件和对象插件,脚本插件就是在普通的gradle中写一系列task,然后在别的gradle构建脚本中通过 apply from: 'xx.gradle' 引用这个脚本插件,下面主要介绍一下对象插件对象插件是指实现了org.gradle.api.Plugin接口的类。并且需要实现void apply(T target)这个方法,该方法中的泛型指的是此Plugin可以应用到的对象,而我们通常是将其应用到Project对象上。 编写对象插件常见创建方式
558 0
|
机器学习/深度学习 存储 算法
卷积神经网络压缩方法总结
卷积神经网络压缩方法总结
164 0