在 muduo 中实现 protobuf 编解码器与消息分发器

本文涉及的产品
云解析DNS,个人版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:

陈硕 (giantchen_AT_gmail)

Blog.csdn.net/Solstice  t.sina.com.cn/giantchen

Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx

本文是《一种自动反射消息类型的 Google Protobuf 网络传输方案》的延续,介绍如何将前文介绍的打包方案与 muduo::net::Buffer 结合,实现了 protobuf codec 和 dispatcher。

Muduo 的下载地址: http://muduo.googlecode.com/files/muduo-0.1.9-alpha.tar.gz ,SHA1 dc0bb5f7becdfc0277fb35f6dfaafee8209213bc ,本文的完整代码可在线阅读 http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/ 。

考虑到不是每个人都安装了 Google Protobuf,muduo 中的 protobuf 相关示例默认是不 build 的,如果你的机器上安装了 protobuf 2.3.0 或 2.4.0a,那么可以用 ./build.sh protobuf_all 来构建 protobuf 相关的 examples。

在介绍 codec 和 dispatcher 之前,先讲讲前文的一个未决问题。

为什么 Protobuf 的默认序列化格式没有包含消息的长度与类型?

Protobuf 是经过深思熟虑的消息打包方案,它的默认序列化格式没有包含消息的长度与类型,自然有其道理。哪些情况下不需要在 protobuf 序列化得到的字节流中包含消息的长度和(或)类型?我能想到的答案有:

  • 如果把消息写入文件,一个文件存一个消息,那么序列化结果中不需要包含长度和类型,因为从文件名和文件长度中可以得知消息的类型与长度。
  • 如果把消息写入文件,一个文件存多个消息,那么序列化结果中不需要包含类型,因为文件名就代表了消息的类型。
  • 如果把消息存入数据库(或者 NoSQL),以 VARBINARY 字段保存,那么序列化结果中不需要包含长度和类型,因为从字段名和字段长度中可以得知消息的类型与长度。
  • 如果把消息以 UDP 方式发生给对方,而且对方一个 UDP port 只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从 port 和 UDP packet 长度中可以得知消息的类型与长度。
  • 如果把消息以 TCP 短连接方式发给对方,而且对方一个 TCP port 只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从 port 和 TCP 字节流长度中可以得知消息的类型与长度。
  • 如果把消息以 TCP 长连接方式发给对方,但是对方一个 TCP port 只接收一种消息类型,那么序列化结果中不需要包含类型,因为 port 代表了消息的类型。
  • 如果采用 RPC 方式通信,那么只需要告诉对方 method name,对方自然能推断出 Request 和 Response 的消息类型,这些可以由 protoc 生成的 RPC stubs 自动搞定。

对于最后一点,比方说 sudoku.proto 定义为:

service SudokuService {
  rpc Solve (SudokuRequest) returns (SudokuResponse);
}

那么 RPC method Sudoku.Solve 对应的请求和响应分别是 SudokuRequest 和 SudokuResponse。在发送 RPC 请求的时候,不需要包含 SudokuRequest 的类型,只需要发送 method name Sudoku.Solve,对方自知道应该按照 SudokuRequest 来解析(parse)请求。这个例子来自我的半成品项目 evproto,见 http://blog.csdn.net/Solstice/archive/2010/04/17/5497699.aspx 。

对于上述这些情况,如果 protobuf 无条件地把长度和类型放到序列化的字节串中,只会浪费网络带宽和存储。可见 protobuf 默认不发送长度和类型是正确的决定。Protobuf 为消息格式的设计树立了典范,哪些该自己搞定,哪些留给外部系统去解决,这些都考虑得很清楚。

只有在使用 TCP 长连接,且在一个连接上传递不止一种消息的情况下(比方同时发 Heartbeat 和 Request/Response),才需要我前文提到的那种打包方案。(为什么要在一个连接上同时发 Heartbeat 和业务消息?请见陈硕《分布式系统的工程化开发方法》 p.51 心跳协议的设计。)这时候我们需要一个分发器 dispatcher,把不同类型的消息分给各个消息处理函数,这正是本文的主题之一。

以下均只考虑 TCP 长连接这一应用场景。

先谈谈编解码器。

什么是编解码器 codec?

Codec 是 encoder 和 decoder 的缩写,这是一个到软硬件都在使用的术语,这里我借指“把网络数据和业务消息之间互相转换”的代码。

在最简单的网络编程中,没有消息 message 只有字节流数据,这时候是用不到 codec 的。比如我们前面讲过的 echo server,它只需要把收到的数据原封不动地发送回去,它不必关心消息的边界(也没有“消息”的概念),收多少就发多少,这种情况下它干脆直接使用 muduo::net::Buffer,取到数据再交给 TcpConnection 发送回去,见下图。

codec_echo

non-trivial 的网络服务程序通常会以消息为单位来通信,每条消息有明确的长度与界限。程序每次收到一个完整的消息的时候才开始处理,发送的时候也是把一个完整的消息交给网络库。比如我们前面讲过的 asio chat 服务,它的一条聊天记录就是一条消息,我们设计了一个简单的消息格式,即在聊天记录前面加上 4 字节的 length header,LengthHeaderCodec 代码及解说见《Muduo 网络编程示例之二:Boost.Asio 的聊天服务器》一文。

codec 的基本功能之一是做 TCP 分包:确定每条消息的长度,为消息划分界限。在 non-blocking 网络编程中,codec 几乎是必不可少的。如果只收到了半条消息,那么不会触发消息回调,数据会停留在 Buffer 里(数据已经读到 Buffer 中了),等待收到一个完整的消息再通知处理函数。既然这个任务太常见,我们干脆做一个 utility class,避免服务端和客户端程序都要自己处理分包,这就有了 LengthHeaderCodec。这个 codec 的使用有点奇怪,不需要继承,它也没有基类,只要把它当成普通 data member 来用,把 TcpConnection 的数据喂给它,然后向它注册 onXXXMessage() 回调,代码见 asio chat 示例。muduo 里的 codec 都是这样的风格,通过 boost::function 粘合到一起。

codec 是一层间接性,它位于 TcpConnection 和 ChatServer 之间,拦截处理收到的数据,在收到完整的消息之后再调用 CharServer 对应的处理函数,注意 CharServer::onStringMessage() 的参数是 std::string,不再是 muduo::net::Buffer,也就是说 LengthHeaderCodec 把 Buffer 解码成了 string。另外,在发送消息的时候,ChatServer 通过 LengthHeaderCodec::send() 来发送 string,LengthHeaderCodec 负责把它编码成 Buffer。这正是“编解码器”名字的由来。

codec_chat

Protobuf codec 与此非常类似,只不过消息类型从 std::string 变成了 protobuf::Message。对于只接收处理 Query 消息的 QueryServer 来说,用 ProtobufCodec 非常方便,收到 protobuf::Message 之后 down cast 成 Query 来用就行。如果要接收处理不止一种消息,ProtobufCodec 恐怕还不能单独完成工作,请继续阅读下文。

codec_protobuf

实现 ProtobufCodec

Protobuf 的打包方案我已经在《一种自动反射消息类型的 Google Protobuf 网络传输方案》中讲过,并以 string 为载体演示了 encode 和 decode 操作。在 muduo 里,我们有专门的 Buffer class,编码更轻松。

编码算法很直截了当,按照前文定义的消息格式一路打包下来,最后更新一下首部的长度即可。

解码算法有几个要点:

  • protobuf::Message 是 new 出来的对象,它的生命期如何管理?muduo 采用 shared_ptr<Message> 来自动管理对象生命期,这与其他地方的做法是一致的。
  • 出错如何处理?比方说长度超出范围、check sum 不正确、message type name 不能识别、message parse 出错等等。ProtobufCodec 定义了 ErrorCallback,用户代码可以注册这个回调。如果不注册,默认的处理是断开连接,让客户重连重试。codec 的单元测试里模拟了各种出错情况。
  • 如何处理一次收到半条消息、一条消息、一条半消息、两条消息等等情况?这是每个 non-blocking 网络程序中的 codec 都要面对的问题。

ProtobufCodec 在实际使用中有明显的不足:它只负责把 muduo::net::Buffer 转换为具体类型的 protobuf::Message,应用程序拿到 Message 之后还有再根据其具体类型做一次分发。我们可以考虑做一个简单通用的分发器 dispatcher,以简化客户代码。

此外,目前 ProtobufCodec 的实现非常初级,它没有充分利用 ZeroCopyInputStream 和 ZeroCopyOutputStream,而是把收到的数据作为 byte array 交给 protobuf Message 去解析,这给性能优化留下了空间。protobuf Message 不要求数据连续(像 vector 那样),只要求数据分段连续(像 deque 那样),这给 buffer 管理带来性能上的好处(避免重新分配内存,减少内存碎片),当然也使得代码变复杂。muduo::net::Buffer 非常简单,它内部是 vector<char>,我目前不想让 protobuf 影响 muduo 本身的设计,毕竟 muduo 是个通用的网络库,不是为实现 protobuf RPC 而特制的。

消息分发器 dispatcher 有什么用?

前面提到,在使用 TCP 长连接,且在一个连接上传递不止一种 protobuf 消息的情况下,客户代码需要对收到的消息按类型做分发。比方说,收到 Logon 消息就交给 QueryServer::onLogon() 去处理,收到 Query 消息就交给 QueryServer::onQuery() 去处理。这个消息分派机制可以做得稍微有点通用性,让所有 muduo+protobuf 程序收益,而且不增加复杂性。

换句话说,又是一层间接性,ProtobufCodec 拦截了 TcpConnection 的数据,把它转换为 Message,ProtobufDispatcher 拦截了 ProtobufCodec 的 callback,按消息具体类型把它分派给多个 callbacks。

codec_dispatcher

ProtobufCodec 与 ProtobufDispatcher 的综合运用

我写了两个示例代码,client 和 server,把 ProtobufCodec 和 ProtobufDispatcher 串联起来使用。server 响应 Query 消息,发生回 Answer 消息,如果收到未知消息类型,则断开连接。client 可以选择发送 Query 或 Empty 消息,由命令行控制。这样可以测试 unknown message callback。

为节省篇幅,这里就不列出代码了,请移步阅读

http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/client.cc 

http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/server.cc

在构造函数中,通过注册回调函数把四方 (TcpConnection、codec、dispatcher、QueryServer) 结合起来。

ProtobufDispatcher 的两种实现

要完成消息分发,那么就是对消息做 type-switch,这似乎是一个 bad smell,但是 protobuf Message 的 Descriptor 没有留下定制点(比如暴露一个 boost::any 成员),我们只好硬来了。

先定义

typedef boost::function<void (Message*)> ProtobufMessageCallback;

注意,本节出现的不是 muduo dispatcher 真实的代码,仅为示意,突出重点,便于画图。

ProtobufDispatcherLite 的结构非常简单,它有一个 map<Descriptor*, ProtobufMessageCallback> 成员,客户代码可以以 Descriptor* 为 key 注册回调(recall: 每个具体消息类型都有一个全局的 Descriptor 对象,其地址是不变的,可以用来当 key)。在收到 protobuf Message 之后,在 map 中找到对应的 ProtobufMessageCallback,然后调用之。如果找不到,就调用 defaultCallback。

codec_dispatcher_lite

当然,它的设计也有小小的缺陷,那就是 ProtobufMessageCallback 限制了客户代码只能接受基类 Message,客户代码需要自己做向下转型,比如:

codec_query_server1

如果我希望 QueryServer 这么设计:不想每个消息处理函数自己做 down casting,而是交给 dispatcher 去处理,客户代码拿到的就已经是想要的具体类型。如下:

codec_query_server2

那么该该如何实现 ProtobufDispatcher 呢?它如何与多个未知的消息类型合作?做 down cast 需要知道目标类型,难道我们要用一长串模板类型参数吗?

有一个办法,把多态与模板结合,利用 templated derived class 来提供类型上的灵活性。设计如下。

codec_dispatcher_class

ProtobufDispatcher 有一个模板成员函数,可以接受注册任意消息类型 T 的回调,然后它创建一个模板化的派生类 CallbackT<T>,这样消息的类新信息就保存在了 CallbackT<T> 中,做 down casting 就简单了。

比方说,我们有两个具体消息类型 Query 和 Answer。

codec_query

然后我们这样注册回调:

dispatcher_.registerMessageCallback<muduo::Query>(
    boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
dispatcher_.registerMessageCallback<muduo::Answer>(
    boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));

这样会具现化 (instantiation) 出两个 CallbackT 实体,如下:

codec_query_callback

以上设计参考了 shared_ptr 的 deleter,Scott Meyers 也谈到过

ProtobufCodec 和 ProtobufDispatcher 有何意义?

ProtobufCodec 和 ProtobufDispatcher 把每个直接收发 protobuf Message 的网络程序都会用到的功能提炼出来做成了公用的 utility,这样以后新写 protobuf 网络程序就不必为打包分包和消息分发劳神了。它俩以库的形式存在,是两个可以拿来就当 data member 用的 class,它们没有基类,也没有用到虚函数或者别的什么面向对象特征,不侵入 muduo::net 或者你的代码。如果不这么做,那将来每个 protobuf 网络程序都要自己重新实现类似的功能,徒增负担。

下一篇文章讲《分布式程序的自动回归测试》会介绍利用 protobuf 的跨语言特性,采用 Java 为 C++ 服务程序编写 test harness。

posted on 2011-04-13 07:48 陈硕 阅读(3996) 评论(9编辑 收藏

评论

#1楼 2011-04-13 09:30 Paul Wong  

好文章,近断时间正在了解这方面的知识。
  

#2楼 2011-04-13 09:40 大石头  

非常好的东西,顶一个!

我们也做了一个消息模型,用自己的二进制序列化器。当时也考虑过Protobuf,那是个好东西,只是在控制方面还不够灵活,不能满足要求。

我们的消息开头就是编号,用7位压缩编码写的,接收者就是根据这个编号来识别是哪一种消息类型,然后根据该类型进行反序列化。

因为指定了类型,所以数据里面不需要指定长度。
如果遇到需要使用动态长度数据的消息,消息实现本身要加一个字段表明长度,然后扩展反序列化,告诉Reader,后面这个家伙的长度是多少。

为了减少传输数据的长度,我们的二进制序列化器,不写长度,不写类型,不写成员名称,整数一律采用7位编码存储。
说到底,就跟C++里面一个对象的内存模型一样,这边把对象的内存数据拿走,接收者把数据放入内存,成为一个对象
  

#3楼 2011-04-13 14:44 msnweb  

....ls 的。用7位压缩编码写的,接收者就是根据这个编号来识别是哪一种消息类型,然后根据该类型进行反序列化。拿到序列号就能解析出消息长度吗?不能,除非消息是定长的。否则你的解析器需要做到非常智能,比如解析http协议那样的解析器。否则如果一段的数据发生了错误,你的解析器能立刻发现问题吗?不行,因为你缺少一些边界校验。

至于lz 这篇文章。只是用map<Descriptor*, ProtobufMessageCallback> 成员 代替了 switch case 的做法。用 mether string 代替了手工的编号技术。lz的组包方式,已经拍过砖了 就不多说了。

关于lz 自己一直宣扬自己的 vector<char> 做buff, 我只能保留自己的意见。 一个buff 居然需要使用 vector 您不嫌太重了吗?lz 曾经写过一篇批判ACE的文章,老夫挺有同感,现在看来 lz 在ACE 的道路上继续前行!
  

#4楼 2012-09-16 22:24 hailong  

我想请问下关于关于google probuf的问题,如果我拥有的字段是可变的,比如TLV编码,中L(length)是可变的,可以是一个字节或者两个字节,用C++ ungigned表示,当一个字节时,长度范围是(0-127),所以连个字节就是>127,如果自己写代码就判断下第一个字节是否为1来判断;但是我想如果自己写解码比较麻烦,还不知道probuf中能否有自定义解码规则,您知道吗,谢谢啦!
  

#5楼[楼主2012-09-17 07:44 陈硕  

@ hailong
既然使用了protobuf,为什么你还要关心它如何编码呢?
字段直接定义成 int 类型就可以,protobuf 对整数采用变长编码。
  

#6楼 2014-05-05 18:25 ChironChan  

请教一下。复杂的message结构,许多个optional。。对性能影响大吗?optional项可能每次内容不一样。。
  

#7楼[楼主2014-05-06 01:06 陈硕  

@ ChironChan
你做一下 benchmark 不就知道了。
  

#8楼 2014-05-06 09:29 ChironChan  

好吧。确实有点偷懒了。谢谢博主回复。
  

#9楼 2014-07-13 20:36 卿言放纵  

@ ChironChan
引用请教一下。复杂的message结构,许多个optional。。对性能影响大吗?optional项可能每次内容不一样。。
基本上没影响,这个是按照id做的一个KV队,如果optional很多,就是KV大一点而已,只要别搞个几十万条,其他的无所谓

ProtoBuf使用,主要注意两点:
1, bytes和string的数据长度别超过 ( 508 * 1024 - 49 ) ,否则序列号时会有一个数量级的性能降低;因为内存管理方式上的不同,直接把数据打包入PB中,比使用带外数据要快一些;不过如果考虑通用性,大数据最好都走带外数据;
2, PB不具有边界性,一定要注意保持他的完整性



  
    本文转自 陈硕  博客园博客,原文链接:http://www.cnblogs.com/Solstice/archive/2011/04/13/2014362.html ,如需转载请自行联系原作者





相关文章
|
8月前
19.6 Boost Asio 文本压缩传输
Base64是一种二进制到文本的编码方案,用于将二进制数据转换为`ASCII`字符串格式。它通过将二进制数据流转换为一系列`64`个字符来工作,这些字符都可以安全地传输到设计用于处理文本数据的系统中。如下代码中我们使用Boost中提供的`base64_from_binary`头文件实现两个函数,其中`Base64Decode`函数接收一个字符串并对其进行解压缩操作输出解密后的原始字符串内容,其次`Base64Encode`函数用于将一个原始数据包压缩处理,有了这两个函数的支持,我们只需要在调用发送函数之前对数据进行压缩,在接收数据后在使用对等的函数对其进行解压缩即可,如下是该案例的完整实现。
38 0
19.6 Boost Asio 文本压缩传输
|
8月前
|
存储
19.7 Boost Asio 传输序列化数据
序列化和反序列化是指将数据结构或对象转换为一组字节,以便在需要时可以将其存储在磁盘上或通过网络传输,并且可以在需要时重新创建原始对象或数据结构。 序列化是将内存中的对象转换为字节的过程。在序列化期间,对象的状态被编码为一组字节,并可以保存或传输到另一个位置。序列化后的字节可以在之后进行反序列化,以将对象重建为在序列化之前的状态。反序列化则是将字节序列重新转换为对象或数据结构的过程。在反序列化期间,字节被反转回原始对象的状态,以便它可以被使用或操作。
38 0
19.7 Boost Asio 传输序列化数据
|
2月前
|
设计模式 JSON 编解码
Netty使用篇:编解码器
Netty使用篇:编解码器
|
编解码 JSON 安全
IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理
本篇将从Base64再到Base128编码,带你一起从底层来理解Protobuf的数据编码原理。 本文结构总体与 Protobuf 官方文档相似,不少内容也来自官方文档,并在官方文档的基础上添加作者理解的内容(确保不那么枯燥),如有出入请以官方文档为准。
336 0
IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理
|
XML 存储 JSON
IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理
本篇文章我们不讨论IM系统中的那些高端技术话题,我们回归到通讯的本质——也就是数据在网络中交互时的编解码原理,并由浅入深从底层理解Protobuf的编解码技术实现。
153 0
IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理
|
XML JSON Java
Netty 使用 Protobuf 序列化,太强大了!
我们来使用Protobuf进行序列化,它和XML,json一样都有自己的语法,xml的后缀是.xml,json文件的后缀是.json,自然Protobuf文件的后缀就是.proto(哈哈,当然不是全称)。
518 0
Netty 使用 Protobuf 序列化,太强大了!
|
网络协议 Java 编译器
【Android Protobuf 序列化】Protobuf 服务器与客户端通信 ( TCP 通信中使用 Protobuf )
【Android Protobuf 序列化】Protobuf 服务器与客户端通信 ( TCP 通信中使用 Protobuf )
400 0
|
编解码 缓存 API
|
SQL 关系型数据库 数据库
Protobuf协议精品应用
  Protobuf应用广泛,尤其作为网络通讯协议最为普遍。本文将详细描述几个让人眼前一亮的protobuf协议设计,对准备应用或已经应用protobuf的开发者会有所启发,甚至可以直接拿过去用。 这里描述的协议设计被用于生产环境的即时通讯、埋点数据采集、消息推送、redis和mysql数据代理。
1208 0

热门文章

最新文章