Muduo 网络编程示例之二:Boost.Asio 的聊天服务器

简介:

陈硕 (giantchen_AT_gmail)

Blog.csdn.net/Solstice

这是《Muduo 网络编程示例》系列的第二篇文章。

Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx

本文讲介绍一个与 Boost.Asio 的示例代码中的聊天服务器功能类似的网络服务程序,包括客户端与服务端的 muduo 实现。这个例子的主要目的是介绍如何处理分包,并初步涉及 Muduo 的多线程功能。Muduo 的下载地址: http://muduo.googlecode.com/files/muduo-0.1.7-alpha.tar.gz ,SHA1 873567e43b3c2cae592101ea809b30ba730f2ee6,本文的完整代码可在线阅读

http://code.google.com/p/muduo/source/browse/trunk/examples/asio/chat/ 。

TCP 分包

前面一篇《五个简单 TCP 协议》中处理的协议没有涉及分包,在 TCP 这种字节流协议上做应用层分包是网络编程的基本需求。分包指的是在发生一个消息(message)或一帧(frame)数据时,通过一定的处理,让接收方能从字节流中识别并截取(还原)出一个个消息。“粘包问题”是个伪问题。

对于短连接的 TCP 服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕,接收方 read() 返回 0,从而知道消息的结尾。例如前一篇文章里的 daytime 和 time 协议。

对于长连接的 TCP 服务,分包有四种方法:

  1. 消息长度固定,比如 muduo 的 roundtrip 示例就采用了固定的 16 字节消息;
  2. 使用特殊的字符或字符串作为消息的边界,例如 HTTP 协议的 headers 以 "\r\n" 为字段的分隔符;
  3. 在每条消息的头部加一个长度字段,这恐怕是最常见的做法,本文的聊天协议也采用这一办法;
  4. 利用消息本身的格式来分包,例如 XML 格式的消息中 <root>...</root> 的配对,或者 JSON 格式中的 { ... } 的配对。解析这种消息格式通常会用到状态机。

在后文的代码讲解中还会仔细讨论用长度字段分包的常见陷阱。

聊天服务

本文实现的聊天服务非常简单,由服务端程序和客户端程序组成,协议如下:

  • 服务端程序中某个端口侦听 (listen) 新的连接;
  • 客户端向服务端发起连接;
  • 连接建立之后,客户端随时准备接收服务端的消息并在屏幕上显示出来;
  • 客户端接受键盘输入,以回车为界,把消息发送给服务端;
  • 服务端接收到消息之后,依次发送给每个连接到它的客户端;原来发送消息的客户端进程也会收到这条消息;
  • 一个服务端进程可以同时服务多个客户端进程,当有消息到达服务端后,每个客户端进程都会收到同一条消息,服务端广播发送消息的顺序是任意的,不一定哪个客户端会先收到这条消息。
  • (可选)如果消息 A 先于消息 B 到达服务端,那么每个客户端都会先收到 A 再收到 B。

这实际上是一个简单的基于 TCP 的应用层广播协议,由服务端负责把消息发送给每个连接到它的客户端。参与“聊天”的既可以是人,也可以是程序。在以后的文章中,我将介绍一个稍微复杂的一点的例子 hub,它有“聊天室”的功能,客户端可以注册特定的 topic(s),并往某个 topic 发送消息,这样代码更有意思。

消息格式

本聊天服务的消息格式非常简单,“消息”本身是一个字符串,每条消息的有一个 4 字节的头部,以网络序存放字符串的长度。消息之间没有间隙,字符串也不一定以 '\0' 结尾。比方说有两条消息 "hello" 和 "chenshuo",那么打包后的字节流是:

0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'

共 21 字节。

打包的代码

这段代码把 const string& message 打包为 muduo::net::Buffer,并通过 conn 发送。

   1: void send(muduo::net::TcpConnection* conn, const string& message)
   2: {
   3:   muduo::net::Buffer buf;
   4:   buf.append(message.data(), message.size());
   5:   int32_t len = muduo::net::sockets::hostToNetwork32(static_cast<int32_t>(message.size()));
   6:   buf.prepend(&len, sizeof len);
   7:   conn->send(&buf);
   8: }

muduo::Buffer 有一个很好的功能,它在头部预留了 8 个字节的空间,这样第 6 行的 prepend() 操作就不需要移动已有的数据,效率较高。

分包的代码

解析数据往往比生成数据复杂,分包打包也不例外。

   1: void onMessage(const muduo::net::TcpConnectionPtr& conn,
   2:                muduo::net::Buffer* buf,
   3:                muduo::Timestamp receiveTime)
   4: {
   5:   while (buf->readableBytes() >= kHeaderLen)
   6:   {
   7:     const void* data = buf->peek();
   8:     int32_t tmp = *static_cast<const int32_t*>(data);
   9:     int32_t len = muduo::net::sockets::networkToHost32(tmp);
  10:     if (len > 65536 || len < 0)
  11:     {
  12:       LOG_ERROR << "Invalid length " << len;
  13:       conn->shutdown();
  14:     }
  15:     else if (buf->readableBytes() >= len + kHeaderLen)
  16:     {
  17:       buf->retrieve(kHeaderLen);
  18:       muduo::string message(buf->peek(), len);
  19:       buf->retrieve(len);
  20:       messageCallback_(conn, message, receiveTime);  // 收到完整的消息,通知用户
  21:     }
  22:     else
  23:     {
  24:       break;
  25:     }
  26:   }
  27: }

上面这段代码第 7 行用了 while 循环来反复读取数据,直到 Buffer 中的数据不够一条完整的消息。请读者思考,如果换成 if (buf->readableBytes() >= kHeaderLen) 会有什么后果。

以前面提到的两条消息的字节流为例:

0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'

假设数据最终都全部到达,onMessage() 至少要能正确处理以下各种数据到达的次序,每种情况下 messageCallback_ 都应该被调用两次:

  1. 每次收到一个字节的数据,onMessage() 被调用 21 次;
  2. 数据分两次到达,第一次收到 2 个字节,不足消息的长度字段;
  3. 数据分两次到达,第一次收到 4 个字节,刚好够长度字段,但是没有 body;
  4. 数据分两次到达,第一次收到 8 个字节,长度完整,但 body 不完整;
  5. 数据分两次到达,第一次收到 9 个字节,长度完整,body 也完整;
  6. 数据分两次到达,第一次收到 10 个字节,第一条消息的长度完整、body 也完整,第二条消息长度不完整;
  7. 请自行移动分割点,验证各种情况;
  8. 数据一次就全部到达,这时必须用 while 循环来读出两条消息,否则消息会堆积。

请读者验证 onMessage() 是否做到了以上几点。这个例子充分说明了 non-blocking read 必须和 input buffer 一起使用。

编解码器 LengthHeaderCodec

有人评论 Muduo 的接收缓冲区不能设置回调函数的触发条件,确实如此。每当 socket 可读,Muduo 的 TcpConnection 会读取数据并存入 Input Buffer,然后回调用户的函数。不过,一个简单的间接层就能解决问题,让用户代码只关心“消息到达”而不是“数据到达”,如本例中的 LengthHeaderCodec 所展示的那一样。

   1: #ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
   2: #define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
   3:  
   4: #include <muduo/base/Logging.h>
   5: #include <muduo/net/Buffer.h>
   6: #include <muduo/net/SocketsOps.h>
   7: #include <muduo/net/TcpConnection.h>
   8:  
   9: #include <boost/function.hpp>
  10: #include <boost/noncopyable.hpp>
  11:  
  12: using muduo::Logger;
  13:  
  14: class LengthHeaderCodec : boost::noncopyable
  15: {
  16:  public:
  17:   typedef boost::function<void (const muduo::net::TcpConnectionPtr&,
  18:                                 const muduo::string& message,
  19:                                 muduo::Timestamp)> StringMessageCallback;
  20:  
  21:   explicit LengthHeaderCodec(const StringMessageCallback& cb)
  22:     : messageCallback_(cb)
  23:   {
  24:   }
  25:  
  26:   void onMessage(const muduo::net::TcpConnectionPtr& conn,
  27:                  muduo::net::Buffer* buf,
  28:                  muduo::Timestamp receiveTime)
  29:   { 同上 }
  30:  
  31:   void send(muduo::net::TcpConnection* conn, const muduo::string& message)
  32:   { 同上 }
  33:  
  34:  private:
  35:   StringMessageCallback messageCallback_;
  36:   const static size_t kHeaderLen = sizeof(int32_t);
  37: };
  38:  
  39: #endif  // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H

这段代码把以 Buffer* 为参数的 MessageCallback 转换成了以 const string& 为参数的 StringMessageCallback,让用户代码不必关心分包操作。客户端和服务端都能从中受益。

服务端的实现

聊天服务器的服务端代码小于 100 行,不到 asio 的一半。

请先阅读第 68 行起的数据成员的定义。除了经常见到的 EventLoop 和 TcpServer,ChatServer 还定义了 codec_ 和 std::set<TcpConnectionPtr> connections_ 作为成员,connections_ 是目前已建立的客户连接,在收到消息之后,服务器会遍历整个容器,把消息广播给其中每一个 TCP 连接。

 

首先,在构造函数里注册回调:

   1: #include "codec.h"
   2:  
   3: #include <muduo/base/Logging.h>
   4: #include <muduo/base/Mutex.h>
   5: #include <muduo/net/EventLoop.h>
   6: #include <muduo/net/SocketsOps.h>
   7: #include <muduo/net/TcpServer.h>
   8:  
   9: #include <boost/bind.hpp>
  10:  
  11: #include <set>
  12: #include <stdio.h>
  13:  
  14: using namespace muduo;
  15: using namespace muduo::net;
  16:  
  17: class ChatServer : boost::noncopyable
  18: {
  19:  public:
  20:   ChatServer(EventLoop* loop,
  21:              const InetAddress& listenAddr)
  22:   : loop_(loop),
  23:     server_(loop, listenAddr, "ChatServer"),
  24:     codec_(boost::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
  25:   {
  26:     server_.setConnectionCallback(
  27:         boost::bind(&ChatServer::onConnection, this, _1));
  28:     server_.setMessageCallback(
  29:         boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
  30:   }
  31:  
  32:   void start()
  33:   {
  34:     server_.start();
  35:   }
  36:  
这里有几点值得注意,在以往的代码里是直接把本 class 的 onMessage() 注册给 server_;这里我们把 LengthHeaderCodec::onMessage() 注册给 server_,然后向 codec_ 注册了 ChatServer::onStringMessage(),等于说让 codec_ 负责解析消息,然后把完整的消息回调给 ChatServer。这正是我前面提到的“一个简单的间接层”,在不增加 Muduo 库的复杂度的前提下,提供了足够的灵活性让我们在用户代码里完成需要的工作。
另外,server_.start() 绝对不能在构造函数里调用,这么做将来会有线程安全的问题,见我在《当析构函数遇到多线程 ── C++ 中线程安全的对象回调》一文中的论述
以下是处理连接的建立和断开的代码,注意它把新建的连接加入到 connections_ 容器中,把已断开的连接从容器中删除。这么做是为了避免内存和资源泄漏,TcpConnectionPtr 是 boost::shared_ptr<TcpConnection>,是 muduo 里唯一一个默认采用 shared_ptr 来管理生命期的对象。以后我们会谈到这么做的原因。
  37:  private:
  38:   void onConnection(const TcpConnectionPtr& conn)
  39:   {
  40:     LOG_INFO << conn->localAddress().toHostPort() << " -> "
  41:         << conn->peerAddress().toHostPort() << " is "
  42:         << (conn->connected() ? "UP" : "DOWN");
  43:  
  44:     MutexLockGuard lock(mutex_);
  45:     if (conn->connected())
  46:     {
  47:       connections_.insert(conn);
  48:     }
  49:     else
  50:     {
  51:       connections_.erase(conn);
  52:     }
  53:   }
  54:  
以下是服务端处理消息的代码,它遍历整个 connections_ 容器,把消息打包发送给各个客户连接。
  55:   void onStringMessage(const TcpConnectionPtr&,
  56:                        const string& message,
  57:                        Timestamp)
  58:   {
  59:     MutexLockGuard lock(mutex_);
  60:     for (ConnectionList::iterator it = connections_.begin();
  61:         it != connections_.end();
  62:         ++it)
  63:     {
  64:       codec_.send(get_pointer(*it), message);
  65:     }
  66:   }
  67:  
数据成员:
  68:   typedef std::set<TcpConnectionPtr> ConnectionList;
  69:   EventLoop* loop_;
  70:   TcpServer server_;
  71:   LengthHeaderCodec codec_;
  72:   MutexLock mutex_;
  73:   ConnectionList connections_;
  74: };
  75:  
main() 函数里边是例行公事的代码:
  76: int main(int argc, char* argv[])
  77: {
  78:   LOG_INFO << "pid = " << getpid();
  79:   if (argc > 1)
  80:   {
  81:     EventLoop loop;
  82:     uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
  83:     InetAddress serverAddr(port);
  84:     ChatServer server(&loop, serverAddr);
  85:     server.start();
  86:     loop.loop();
  87:   }
  88:   else
  89:   {
  90:     printf("Usage: %s port\n", argv[0]);
  91:   }
  92: }

如果你读过 asio 的对应代码,会不会觉得 Reactor 往往比 Proactor 容易使用?

 

客户端的实现

我有时觉得服务端的程序常常比客户端的更容易写,聊天服务器再次验证了我的看法。客户端的复杂性来自于它要读取键盘输入,而 EventLoop 是独占线程的,所以我用了两个线程,main() 函数所在的线程负责读键盘,另外用一个 EventLoopThread 来处理网络 IO。我暂时没有把标准输入输出融入 Reactor 的想法,因为服务器程序的 stdin 和 stdout 往往是重定向了的。

来看代码,首先,在构造函数里注册回调,并使用了跟前面一样的 LengthHeaderCodec 作为中间层,负责打包分包。

   1: #include "codec.h"
   2:  
   3: #include <muduo/base/Logging.h>
   4: #include <muduo/base/Mutex.h>
   5: #include <muduo/net/EventLoopThread.h>
   6: #include <muduo/net/TcpClient.h>
   7:  
   8: #include <boost/bind.hpp>
   9: #include <boost/noncopyable.hpp>
  10:  
  11: #include <iostream>
  12: #include <stdio.h>
  13:  
  14: using namespace muduo;
  15: using namespace muduo::net;
  16:  
  17: class ChatClient : boost::noncopyable
  18: {
  19:  public:
  20:   ChatClient(EventLoop* loop, const InetAddress& listenAddr)
  21:     : loop_(loop),
  22:       client_(loop, listenAddr, "ChatClient"),
  23:       codec_(boost::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
  24:   {
  25:     client_.setConnectionCallback(
  26:         boost::bind(&ChatClient::onConnection, this, _1));
  27:     client_.setMessageCallback(
  28:         boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
  29:     client_.enableRetry();
  30:   }
  31:  
  32:   void connect()
  33:   {
  34:     client_.connect();
  35:   }
  36:  
disconnect() 目前为空,客户端的连接由操作系统在进程终止时关闭。
  37:   void disconnect()
  38:   {
  39:     // client_.disconnect();
  40:   }
  41:  
write() 会由 main 线程调用,所以要加锁,这个锁不是为了保护 TcpConnection,而是保护 shared_ptr。
  42:   void write(const string& message)
  43:   {
  44:     MutexLockGuard lock(mutex_);
  45:     if (connection_)
  46:     {
  47:       codec_.send(get_pointer(connection_), message);
  48:     }
  49:   }
  50:  
onConnection() 会由 EventLoop 线程调用,所以要加锁以保护 shared_ptr。
  51:  private:
  52:   void onConnection(const TcpConnectionPtr& conn)
  53:   {
  54:     LOG_INFO << conn->localAddress().toHostPort() << " -> "
  55:         << conn->peerAddress().toHostPort() << " is "
  56:         << (conn->connected() ? "UP" : "DOWN");
  57:  
  58:     MutexLockGuard lock(mutex_);
  59:     if (conn->connected())
  60:     {
  61:       connection_ = conn;
  62:     }
  63:     else
  64:     {
  65:       connection_.reset();
  66:     }
  67:   }
  68:  
把收到的消息打印到屏幕,这个函数由 EventLoop 线程调用,但是不用加锁,因为 printf() 是线程安全的。
注意这里不能用 cout,它不是线程安全的。
  69:   void onStringMessage(const TcpConnectionPtr&,
  70:                        const string& message,
  71:                        Timestamp)
  72:   {
  73:     printf("<<< %s\n", message.c_str());
  74:   }
  75:  
 
数据成员:
  76:   EventLoop* loop_;
  77:   TcpClient client_;
  78:   LengthHeaderCodec codec_;
  79:   MutexLock mutex_;
  80:   TcpConnectionPtr connection_;
  81: };
  82:  
main() 函数里除了例行公事,还要启动 EventLoop 线程和读取键盘输入。
  83: int main(int argc, char* argv[])
  84: {
  85:   LOG_INFO << "pid = " << getpid();
  86:   if (argc > 2)
  87:   {
  88:     EventLoopThread loopThread;
  89:     uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
  90:     InetAddress serverAddr(argv[1], port);
  91:  
  92:     ChatClient client(loopThread.startLoop(), serverAddr); // 注册到 EventLoopThread 的 EventLoop 上。
  93:     client.connect();
  94:     std::string line;
  95:     while (std::getline(std::cin, line))
  96:     {
  97:       string message(line.c_str()); // 这里似乎多此一举,可直接发送 line。这里是
  98:       client.write(message);
  99:     }
 100:     client.disconnect();
 101:   }
 102:   else
 103:   {
 104:     printf("Usage: %s host_ip port\n", argv[0]);
 105:   }
 106: }
 107:  

 

简单测试

开三个命令行窗口,在第一个运行

$ ./asio_chat_server 3000

 

第二个运行

$ ./asio_chat_client 127.0.0.1 3000

 

第三个运行同样的命令

$ ./asio_chat_client 127.0.0.1 3000

 

这样就有两个客户端进程参与聊天。在第二个窗口里输入一些字符并回车,字符会出现在本窗口和第三个窗口中。

 

 

下一篇文章我会介绍 Muduo 中的定时器,并实现 Boost.Asio 教程中的 timer2~5 示例,以及带流量统计功能的 discard 和 echo 服务器(来自 Java Netty)。流量等于单位时间内发送或接受的字节数,这要用到定时器功能。

(待续)



    本文转自 陈硕  博客园博客,原文链接:http://www.cnblogs.com/Solstice/archive/2011/02/04/1949106.html,如需转载请自行联系原作者



相关文章
|
3月前
|
数据安全/隐私保护
Haskell网络编程:代理服务器的高级使用技巧
Haskell网络编程:代理服务器的高级使用技巧
|
4月前
|
安全 开发者 数据安全/隐私保护
Xamarin 的安全性考虑与最佳实践:从数据加密到网络防护,全面解析构建安全移动应用的六大核心技术要点与实战代码示例
【8月更文挑战第31天】Xamarin 的安全性考虑与最佳实践对于构建安全可靠的跨平台移动应用至关重要。本文探讨了 Xamarin 开发中的关键安全因素,如数据加密、网络通信安全、权限管理等,并提供了 AES 加密算法的代码示例。
62 0
|
4月前
|
运维 网络架构 Python
利用Python查询H3C网络设备示例,运维用了它,都称赞!
利用Python查询H3C网络设备示例,运维用了它,都称赞!
|
5月前
|
网络协议 网络架构
【网络编程入门】TCP与UDP通信实战:从零构建服务器与客户端对话(附简易源码,新手友好!)
在了解他们之前我们首先要知道网络模型,它分为两种,一种是OSI,一种是TCP/IP,当然他们的模型图是不同的,如下
202 1
|
4月前
|
机器学习/深度学习 自然语言处理 TensorFlow
|
6月前
|
Java Android开发
Java Socket编程示例:服务器开启在8080端口监听,接收客户端连接并打印消息。
【6月更文挑战第23天】 Java Socket编程示例:服务器开启在8080端口监听,接收客户端连接并打印消息。客户端连接服务器,发送&quot;Hello, Server!&quot;后关闭。注意Android中需避免主线程进行网络操作。
105 4
|
7月前
|
机器学习/深度学习 JSON PyTorch
图神经网络入门示例:使用PyTorch Geometric 进行节点分类
本文介绍了如何使用PyTorch处理同构图数据进行节点分类。首先,数据集来自Facebook Large Page-Page Network,包含22,470个页面,分为四类,具有不同大小的特征向量。为训练神经网络,需创建PyTorch Data对象,涉及读取CSV和JSON文件,处理不一致的特征向量大小并进行归一化。接着,加载边数据以构建图。通过`Data`对象创建同构图,之后数据被分为70%训练集和30%测试集。训练了两种模型:MLP和GCN。GCN在测试集上实现了80%的准确率,优于MLP的46%,展示了利用图信息的优势。
98 1
|
5月前
|
网络协议 安全 Python
我们将使用Python的内置库`http.server`来创建一个简单的Web服务器。虽然这个示例相对简单,但我们可以围绕它展开许多讨论,包括HTTP协议、网络编程、异常处理、多线程等。
我们将使用Python的内置库`http.server`来创建一个简单的Web服务器。虽然这个示例相对简单,但我们可以围绕它展开许多讨论,包括HTTP协议、网络编程、异常处理、多线程等。
|
5月前
|
数据采集 Perl
错误处理在网络爬虫开发中的重要性:Perl示例 引言
错误处理在网络爬虫开发中的重要性:Perl示例 引言
|
6月前
|
网络协议 Java
Java Socket编程 - 基于TCP方式的客户服务器聊天程序
Java Socket编程 - 基于TCP方式的客户服务器聊天程序
59 0