muduo源码剖析之TcpConnection连接管理

简介: TcpCon用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调 connectionCallback。TcpConnection 构造时接收参数有 TCP 连接的 sockfd,服务端地址 localAddr,客户端地址 peerAddr,并通过 Socket 封装 sockfd。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后 Channel::handleEvent() 执行相应事件的回调。

简介

TcpConnection

用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调 connectionCallback。

TcpConnection 有四个状态,简单的状态图:

img

成员及属性解析

主要接口

send

发送数据的主要接口,最终通过内部实现在runInLoop中发送数据

回调setter

connectionEstablished

当连接建立时,应当只执行一次
将自身的shared_from_this指针与Channel绑定
令Channel激活对可读IO事件的关注

connectionDestroyed

当连接断开时,应当只执行一次
将自己的Channel从所属EventLoop中移除

主要成员

loop

主要回调都通过EventLoop所在线程处理

Channel

通过Channel的回调调用自己的回调

Socket

连接所属的套接字fd

localaddr,peeraddr

本地和对端socketaddr

各种回调callback

inputbuffer,outputbuffer

应用层输入,输出缓冲区

源码剖析

源码已经编写注释

TcpConnection.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include "muduo/base/noncopyable.h"
#include "muduo/base/StringPiece.h"
#include "muduo/base/Types.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/Buffer.h"
#include "muduo/net/InetAddress.h"

#include <memory>

#include <boost/any.hpp>

// struct tcp_info is in <netinet/tcp.h>
struct tcp_info;

namespace muduo
{
   
   
namespace net
{
   
   

class Channel;
class EventLoop;
class Socket;

///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.

/**
 * TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
 * =》 TcpConnection 设置回调 =》 Channel =》 Poller =》 Channel的回调操作
 * 
 */ 

class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection>
{
   
   
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,//存储自己的addr信息
                const InetAddress& peerAddr);//存储对方的addr信息
  ~TcpConnection();

  EventLoop* getLoop() const {
   
    return loop_; }
  const string& name() const {
   
    return name_; }
  const InetAddress& localAddress() const {
   
    return localAddr_; }
  const InetAddress& peerAddress() const {
   
    return peerAddr_; }
  bool connected() const {
   
    return state_ == kConnected; }
  bool disconnected() const {
   
    return state_ == kDisconnected; }
  // return true if success.
  bool getTcpInfo(struct tcp_info*) const;
  string getTcpInfoString() const;

  // void send(string&& message); // C++11
  void send(const void* message, int len);
  void send(const StringPiece& message);
  // void send(Buffer&& message); // C++11
  void send(Buffer* message);  // this one will swap data
  void shutdown(); // NOT thread safe, no simultaneous calling
  // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
  void forceClose();
  void forceCloseWithDelay(double seconds);
  void setTcpNoDelay(bool on);
  // reading or not
  void startRead();
  void stopRead();
  bool isReading() const {
   
    return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
  /* 设置TCP上下文 */ 
  void setContext(const boost::any& context)
  {
   
    context_ = context; }
  /* 获取TCP上下文 */
  const boost::any& getContext() const
  {
   
    return context_; }

  boost::any* getMutableContext()
  {
   
    return &context_; }

  void setConnectionCallback(const ConnectionCallback& cb)
  {
   
    connectionCallback_ = cb; }

  void setMessageCallback(const MessageCallback& cb)
  {
   
    messageCallback_ = cb; }

  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  {
   
    writeCompleteCallback_ = cb; }
  /* 设置高水位回调函数和高水位值,当缓冲区的size达到highWaterMark时触发此请求 */
  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  {
   
    highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

  /// Advanced interface
  Buffer* inputBuffer()
  {
   
    return &inputBuffer_; }

  Buffer* outputBuffer()
  {
   
    return &outputBuffer_; }

  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  {
   
    closeCallback_ = cb; }

  // called when TcpServer accepts a new connection
  // 连接建立
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  // 连接建立
  void connectDestroyed();  // should be called only once

 private:
     /*
     kDisconnected    已经断开连接
    kConnecting        正在连接
    kConnected        已经连接
    kDisconnecting    正在断开连接
     */
  enum StateE {
   
    kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleWrite();
  void handleClose();
  void handleError();
  // void sendInLoop(string&& message);
  void sendInLoop(const StringPiece& message);
  void sendInLoop(const void* message, size_t len);
  void shutdownInLoop();
  // void shutdownAndForceCloseInLoop(double seconds);
  void forceCloseInLoop();
  void setState(StateE s) {
   
    state_ = s; }
  const char* stateToString() const;
  void startReadInLoop();
  void stopReadInLoop();

  EventLoop* loop_;//subloop
  const string name_;
  StateE state_;  // FIXME: use atomic variable,连接状态
  bool reading_;//监听read标志
  // we don't expose those classes to client.
  //每一个client connect都有一个socket fd和这个fd的channel
  std::unique_ptr<Socket> socket_;
  std::unique_ptr<Channel> channel_;

  const InetAddress localAddr_;//当前主机的地址信息
  const InetAddress peerAddr_;//对端主机地址信息
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  WriteCompleteCallback writeCompleteCallback_;
  HighWaterMarkCallback highWaterMarkCallback_;
  CloseCallback closeCallback_;
  size_t highWaterMark_;//存储高水位变量
  Buffer inputBuffer_;//接收数据的buffer
  Buffer outputBuffer_; //发送数据的buffer FIXME: use list<Buffer> as output buffer.
  boost::any context_;
  // FIXME: creationTime_, lastReceiveTime_
  //        bytesReceived_, bytesSent_
};

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;

}  // namespace net
}  // namespace muduo

#endif  // MUDUO_NET_TCPCONNECTION_H

TcpConnection.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/TcpConnection.h"

#include "muduo/base/Logging.h"
#include "muduo/base/WeakCallback.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/Socket.h"
#include "muduo/net/SocketsOps.h"

#include <errno.h>

using namespace muduo;
using namespace muduo::net;
/* 默认的当连接建立和断开时的回调函数 */
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
   
   
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
  // do not call conn->forceClose(), because some users want to register message callback only.
}
/* 默认的收消息的回调函数 */
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
   
   
  buf->retrieveAll();//直接把消息全扔掉
}

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),//设置 正在连接 标志
    reading_(true),//开始监听读取消息事件
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)//高水位设置64mb
{
   
   
  channel_->setReadCallback(
      std::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      std::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      std::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      std::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);//设置保持这个fd活跃,类似心跳包,不过间隔是2h(很长)
}

TcpConnection::~TcpConnection()
{
   
   
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd()
            << " state=" << stateToString();
  assert(state_ == kDisconnected);
}

bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const//获取关于这个fd的相关tcp信息
{
   
   
  return socket_->getTcpInfo(tcpi);
}

string TcpConnection::getTcpInfoString() const//将fd的相关tcp信息格式化为字符串
{
   
   
  char buf[1024];
  buf[0] = '\0';
  socket_->getTcpInfoString(buf, sizeof buf);
  return buf;
}

void TcpConnection::send(const void* data, int len)
{
   
   
  send(StringPiece(static_cast<const char*>(data), len));
}

void TcpConnection::send(const StringPiece& message)
{
   
   
  if (state_ == kConnected)
  {
   
   
  //如果调用该函数的线程与subloop是同一个线程,则直接调用,否则调用runInLoop将事件加入subloop的处理队列
    if (loop_->isInLoopThread())
    {
   
   
      sendInLoop(message);
    }
    else
    {
   
   
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                    this,     // FIXME
                    message.as_string()));
                    //std::forward<string>(message)));
    }
  }
}

// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
   
   
  if (state_ == kConnected)
  {
   
   
  //如果调用该函数的线程与subloop是同一个线程,则直接调用,否则调用runInLoop将事件加入subloop的处理队列
    if (loop_->isInLoopThread())
    {
   
   
      sendInLoop(buf->peek(), buf->readableBytes());
      buf->retrieveAll();
    }
    else
    {
   
   
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                    this,     // FIXME
                    buf->retrieveAllAsString()));
                    //std::forward<string>(message)));
    }
  }
}

void TcpConnection::sendInLoop(const StringPiece& message)
{
   
   
  sendInLoop(message.data(), message.size());
}

//发送数据
void TcpConnection::sendInLoop(const void* data, size_t len)
{
   
   
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool faultError = false;
  if (state_ == kDisconnected)//如果连接已经关闭,则直接返回
  {
   
   
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  //如果没有监听发送数据事件,并且缓冲区无数据,那么直接调用write发送
  // 说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
  // 注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
  // 也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
   
   
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
   
   
      remaining = len - nwrote;//刷新剩余未发送的数据
      // 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了
      //如果数据全部发送完了,并且注册了发送完毕的回调函数,则调用
      if (remaining == 0 && writeCompleteCallback_)
      {
   
   
        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0表示出错了
    {
   
   
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
   
   
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
        {
   
   
          faultError = true;
        }
      }
    }
  }

  assert(remaining <= len);
  if (!faultError && remaining > 0)//如果前面没有设置错误,并且还有数据没有发送
  {
   
   
    size_t oldLen = outputBuffer_.readableBytes();//获取缓冲区未发送数据的长度
    //如果需要发送的数据达到了高水位,则调用设置的回调函数
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
   
   
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);//将尚未发送的数据加入缓冲区
    if (!channel_->isWriting())
    {
   
   
      //监控写事件,当缓冲区可写时subloop会调用handleWrite()函数将剩下的写入,写完后会关闭可写事件的监控
      channel_->enableWriting();
    }
  }
}

void TcpConnection::shutdown()//关闭读写通道,并设置标志为断开连接
{
   
   
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
   
   
    setState(kDisconnecting);//将状态设置为正在关闭
    // FIXME: shared_from_this()?
    loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()//关闭socket fd写通道
{
   
   
  loop_->assertInLoopThread();
  if (!channel_->isWriting())//判断是否监听了写事件
  {
   
   
    // we are not writing
    socket_->shutdownWrite();//关闭写通道
  }
}

// void TcpConnection::shutdownAndForceCloseAfter(double seconds)
// {
   
   
//   // FIXME: use compare and swap
//   if (state_ == kConnected)
//   {
   
   
//     setState(kDisconnecting);
//     loop_->runInLoop(std::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
//   }
// }

// void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
// {
   
   
//   loop_->assertInLoopThread();
//   if (!channel_->isWriting())
//   {
   
   
//     // we are not writing
//     socket_->shutdownWrite();
//   }
//   loop_->runAfter(
//       seconds,
//       makeWeakCallback(shared_from_this(),
//                        &TcpConnection::forceCloseInLoop));
// }

void TcpConnection::forceClose()//强制关闭
{
   
   
  // FIXME: use compare and swap
  if (state_ == kConnected || state_ == kDisconnecting)
  {
   
   
    setState(kDisconnecting);
    loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
  }
}

void TcpConnection::forceCloseWithDelay(double seconds)//强制延迟seconds关闭
{
   
   
  if (state_ == kConnected || state_ == kDisconnecting)
  {
   
   
    setState(kDisconnecting);
    loop_->runAfter(
        seconds,
        makeWeakCallback(shared_from_this(),
                         &TcpConnection::forceClose));  // not forceCloseInLoop to avoid race condition
  }
}

void TcpConnection::forceCloseInLoop()//强制关闭
{
   
   
  loop_->assertInLoopThread();
  if (state_ == kConnected || state_ == kDisconnecting)
  {
   
   
    // as if we received 0 byte in handleRead();
    handleClose();
  }
}

const char* TcpConnection::stateToString() const//将连接状态转化为字符串
{
   
   
  switch (state_)
  {
   
   
    case kDisconnected:
      return "kDisconnected";
    case kConnecting:
      return "kConnecting";
    case kConnected:
      return "kConnected";
    case kDisconnecting:
      return "kDisconnecting";
    default:
      return "unknown state";
  }
}

void TcpConnection::setTcpNoDelay(bool on)//禁用nagle算法
{
   
   
  socket_->setTcpNoDelay(on);
}

void TcpConnection::startRead()//开始监听读事件
{
   
   
  loop_->runInLoop(std::bind(&TcpConnection::startReadInLoop, this));
}

void TcpConnection::startReadInLoop()//开始监听读事件
{
   
   
  loop_->assertInLoopThread();
  if (!reading_ || !channel_->isReading())
  {
   
   
    channel_->enableReading();
    reading_ = true;
  }
}

void TcpConnection::stopRead()//停止监听读事件
{
   
   
  loop_->runInLoop(std::bind(&TcpConnection::stopReadInLoop, this));
}

void TcpConnection::stopReadInLoop()//停止监听读事件
{
   
   
  loop_->assertInLoopThread();
  if (reading_ || channel_->isReading())
  {
   
   
    channel_->disableReading();
    reading_ = false;
  }
}

//在TcpServer->Acceptor接收到connect时候调用
void TcpConnection::connectEstablished()
{
   
   
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);//设置连接状态为完成连接
  channel_->tie(shared_from_this()); //将this给weak_ptr传给channel监控存活状态
  channel_->enableReading();//监听读事件

  connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed()
{
   
   
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
   
   
    setState(kDisconnected);//设置连接状态为完成关闭
    channel_->disableAll();//关闭所有事件的监听

    connectionCallback_(shared_from_this());
  }
  channel_->remove();//删除channel
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
   
   
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);//将内核read缓冲区的数据读到input缓冲区
  if (n > 0)
  {
   
   
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//调用接收信息的回调函数
  }
  else if (n == 0)//表示对方已经断开socket connect
  {
   
   
    handleClose();
  }
  else
  {
   
   
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}

void TcpConnection::handleWrite()
{
   
   
  loop_->assertInLoopThread();
  if (channel_->isWriting())//判断是否监听了写事件
  {
   
   
      //将outputBuffer的数据写入内核发送缓冲区
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
   
   
      outputBuffer_.retrieve(n);//调整buffer的下标
      if (outputBuffer_.readableBytes() == 0)//表示已经全部写完了,缓冲区没有数据可以发送了
      {
   
   
        channel_->disableWriting();//关闭写时间的监测
        if (writeCompleteCallback_)//调用写完毕的回调函数
        {
   
   
          loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
        }
        //如果之前设置了关闭连接的标志,那么现在可以关闭写端了
        //(如果之前在调用关闭连接时候,还在发送数据,就要等发送完毕了在关闭)
        if (state_ == kDisconnecting)
        {
   
   
          shutdownInLoop();
        }
      }
    }
    else
    {
   
   
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
   
   
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
   
   
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

// poller => channel::closeCallback => TcpConnection::handleClose
void TcpConnection::handleClose()
{
   
   
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);
  channel_->disableAll();//不在关心任何事件

  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);// 执行连接关闭的回调
  // must be the last line
  closeCallback_(guardThis); // 关闭连接的回调  执行的是TcpServer::removeConnection回调方法
}

void TcpConnection::handleError()
{
   
   
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

总结

TcpConnection 构造时接收参数有 TCP 连接的 sockfd,服务端地址 localAddr,客户端地址 peerAddr,并通过 Socket 封装 sockfd。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后 Channel::handleEvent() 执行相应事件的回调。

  • TcpConnection::handleRead() :当连接对应的 sockfd 有可读事件发生时调用,主要是将数据读到 Buffer 中,执行消息回调函数 messageCallback_()。
  • _TcpConnection::handleWrite():当连接对应的 sockfd 有可写事件发生时调用,主要是将 Buffer 中的数据发送出去,如果一次性发送完毕,则执行用户指定的回调 writeCompleteCallback_(),若一次没有发送完, muduo 采用 LT 模式, 会反复触发可写事件,下次还有机会发送剩下的数据。

  • send 一系列函数是可以用户或者其他线程调用,用于发送信息。如果不是在IO线程,它会把实际工作转移到IO线程调用。首先检查 TcpConnection 对应的 Socket 是否注册了可写事件,若注册了可写事件表明输出缓冲区 outputBuffer中已经有数据等待发送,为了保证不乱序,这次的数据只要追加到输出缓冲区中,通过 Channel::handleEvent() -> TcpConnection::handleWrite() 来发送。如果Socket 没有注册可写事件,输出缓冲区没有数据,那么这次的消息可以直接通过 write 发送,如果没有一次性发送完毕,那么 message 剩余的数据仍然要 append 到 outputBuffer 中,并向 Poller 注册可写事件,当 socket 变得可写时,Channel 会调用 TcpConnection::handleWrite() 来发送 outputBuffer 中堆积的数据,发送完毕后立刻停止监听可写事件,避免 busy loop。无论是 sendInLoop() -> write() 还是 Channel::handleEvent() -> handleWrite(),只要确定发送完 message 或者 outputBuffer 中的数据,那么都要调用用户指定的回调 writeCompleteCallback()。

使用 epoll 的 LT 模式,当 socket 可写时,会不停的触发 socket 的可写事件,这个时候如何解决?

第一种方式:需要向 socket 写数据时,注册此 socket 的可写事件,接收到可写事件后,然后调用 write/send 写数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。

第二种,需要发送数据时,直接调用 write/send 写,如果没有发送完,那么开是监听此 socket 的 writable 事件,然后接收到可写事件后,调用 write/send 发送数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。 muduo 采用的 LT 模式,就是用的第二种解决这个问题。

此外 TcpConnection 还有几个小功能,比如 TcpConnection::setTcpNoDelay() 和 TcpConnection::setKeepAlive()。TCP No Delay 和 TCP keepalive 都是常用的 TCP 选项,前者的作用是禁止 Nagle 算法,避免连续发包出现延迟,这对编写低延迟网络服务很重要。后者的作用是定期探查 TCP 连接是否还存在,一般来说如果有应用层心跳的话,TCP keepalive 不是必须的。并且时间很长(默认2h)

leEvent() -> handleWrite(),只要确定发送完 message 或者 outputBuffer_ 中的数据,那么都要调用用户指定的回调 writeCompleteCallback()。

使用 epoll 的 LT 模式,当 socket 可写时,会不停的触发 socket 的可写事件,这个时候如何解决?

第一种方式:需要向 socket 写数据时,注册此 socket 的可写事件,接收到可写事件后,然后调用 write/send 写数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。

第二种,需要发送数据时,直接调用 write/send 写,如果没有发送完,那么开是监听此 socket 的 writable 事件,然后接收到可写事件后,调用 write/send 发送数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。 muduo 采用的 LT 模式,就是用的第二种解决这个问题。

此外 TcpConnection 还有几个小功能,比如 TcpConnection::setTcpNoDelay() 和 TcpConnection::setKeepAlive()。TCP No Delay 和 TCP keepalive 都是常用的 TCP 选项,前者的作用是禁止 Nagle 算法,避免连续发包出现延迟,这对编写低延迟网络服务很重要。后者的作用是定期探查 TCP 连接是否还存在,一般来说如果有应用层心跳的话,TCP keepalive 不是必须的。并且时间很长(默认2h)

目录
相关文章
|
7月前
muduo源码剖析之Acceptor监听类
Acceptor类用于创建套接字,设置套接字选项,调用socket()->bind()->listen()->accept()函数,接受连接,然后调用TcpServer设置的connect事件的回调。listen()//在TcpServer::start中调用封装了一个listen fd相关的操作,用于mainLoop接受器封装,实质上就是对Channel的多一层封装监听连接 当新连接进入时,调用Socket::accept创建套接字,触发TcpServer的回调TcpServer通过该接口设置回调,
57 0
|
7月前
muduo源码剖析之Socket类
封装了一个sockfd相关的设置。比较简单,已经编写注释。
62 0
|
7月前
muduo源码剖析之Connector客户端连接类
Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。
61 0
|
7月前
|
API
muduo源码剖析之SocketOps类
对socket设置API的封装。比较简单,已经编写注释。
45 0
|
7月前
muduo源码剖析之channel通道类
channel是muduo中的事件分发器,它只属于一个EventLoop,Channel类中保存着IO事件的类型以及对应的回调函数,每个channel只负责一个文件描述符,但它并不拥有这个文件描述符。channel是在epoll和TcpConnection之间起沟通作用,故也叫做通道,其它类通过调用channel的setCallbcak来和建立channel沟通关系。
112 0
|
3月前
|
网络协议 C语言
C语言 网络编程(十三)并发的TCP服务端-以进程完成功能
这段代码实现了一个基于TCP协议的多进程并发服务端和客户端程序。服务端通过创建子进程来处理多个客户端连接,解决了粘包问题,并支持不定长数据传输。客户端则循环发送数据并接收服务端回传的信息,同样处理了粘包问题。程序通过自定义的数据长度前缀确保了数据的完整性和准确性。
|
4月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
170 3
|
7月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
81 0
|
7月前
|
网络协议
muduo源码剖析之TcpClient客户端类
muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。
95 0
muduo源码剖析之TcpClient客户端类
|
7月前
|
安全 C++ 容器
muduo源码剖析之TimerQueue类
通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。
55 0
muduo源码剖析之TimerQueue类
下一篇
DataWorks