muduo源码剖析之Connector客户端连接类

简介: Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。

简介

Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。

主要成员及属性解析

主要接口

setNewConnectionCallback

设置TcpClient交给的回调函数

start

最后通过loop的runInLoop调用
调用connect内部实现

stop

最终通过loop的queueInLoop调用
回收Channel控制的套接字(如果有的话)
设置connect_标记为false

retry

若connect_标记为true,则重连

核心实现:connect

调用Socket::connect方法连接服务端
连接成功后,创建一个Channel
将自身的handleWrite回调注册到Channel上,并激活可写事件关注

核心实现:handleWrite

根据getSockError的情况决定调用创建连接回调,或是错误回调,或retry操作
其中包含了TcpClient注册的回调newConnection

主要成员

loop

所属的workloop

channel

unique_ptr指针,仅在连接建立时动态创建Channel对象
当channel触发可写事件时,执行handleWrite
并在handleWrite中执行TcpClient的newConnection

serverAddr

服务端的inetaddr地址信息

connect_

非常重要的一个标记,决定了是否retry

源码剖析

代码已编写完整注释,

Connector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_CONNECTOR_H
#define MUDUO_NET_CONNECTOR_H

#include "muduo/base/noncopyable.h"
#include "muduo/net/InetAddress.h"

#include <functional>
#include <memory>

namespace muduo
{
   
namespace net
{
   

class Channel;
class EventLoop;

//负责主动发起连接,不负责创建socket,只负责连接的建立
class Connector : noncopyable,
                  public std::enable_shared_from_this<Connector>
{
   
 public:
  typedef std::function<void (int sockfd)> NewConnectionCallback;

  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();

  //设置新连接到来时的回调函数
  void setNewConnectionCallback(const NewConnectionCallback& cb)
  {
    newConnectionCallback_ = cb; }


  void start();  // can be called in any thread
  void restart();  // must be called in loop thread
  void stop();  // can be called in any thread

  //返回服务器的地址信息
  const InetAddress& serverAddress() const {
    return serverAddr_; }

 private:
  enum States {
    kDisconnected, kConnecting, kConnected };
  static const int kMaxRetryDelayMs = 30*1000;
  static const int kInitRetryDelayMs = 500;

  //设置连接状态
  void setState(States s) {
    state_ = s; }

  //在loop中执行的start启动操作
  void startInLoop();

  //在loop中执行的stop暂停操作
  void stopInLoop();

  //请求连接服务器
  void connect();

  //连接服务器成功后设置channel事件
  void connecting(int sockfd);

  //写事件回调
  void handleWrite();

  //错误发生回调
  void handleError();

  //重连
  void retry(int sockfd);

  //移除并释放channel
  int removeAndResetChannel();

  //释放channel
  void resetChannel();



  //所属的EventLoop
  EventLoop* loop_;
  //要连接的server地址
  InetAddress serverAddr_;
  //是否连接服务器标志
  bool connect_; // atomic
  //连接状态
  States state_;  // FIXME: use atomic variable
  //对应的channel
  std::unique_ptr<Channel> channel_;
  //连接成功时的回调函数
  NewConnectionCallback newConnectionCallback_;
  //重连间隔时间
  int retryDelayMs_;
};

}  // namespace net
}  // namespace muduo

#endif  // MUDUO_NET_CONNECTOR_H

Connector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//

#include "muduo/net/Connector.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/SocketsOps.h"

#include <errno.h>

using namespace muduo;
using namespace muduo::net;

const int Connector::kMaxRetryDelayMs;

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false),
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)
{
   
  LOG_DEBUG << "ctor[" << this << "]";
}

Connector::~Connector()
{
   
  LOG_DEBUG << "dtor[" << this << "]";
  assert(!channel_);
}

void Connector::start()
{
   
  connect_ = true;
  //在所属IO线程Lloop中调用该函数,连接服务器
  loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

void Connector::startInLoop()
{
   
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  {
   
    connect();//请求连接
  }
  else
  {
   
    LOG_DEBUG << "do not connect";
  }
}

void Connector::stop()
{
   
  connect_ = false;
  loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
  // FIXME: cancel timer
}

void Connector::stopInLoop()
{
   
  loop_->assertInLoopThread();
  if (state_ == kConnecting)
  {
   
    //将状态设置为断开连接
    setState(kDisconnected);
    //移除并释放channel
    int sockfd = removeAndResetChannel();
    retry(sockfd);
  }
}

//连接服务器
void Connector::connect()
{
   
  //创建一个非阻塞的socket fd
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  //请求连接服务器
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
   
    case 0:
    case EINPROGRESS:
    case EINTR:
    case EISCONN:
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);//重连
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}


void Connector::restart()
{
   
  loop_->assertInLoopThread();
  //设置标志为断开连接
  setState(kDisconnected);
  retryDelayMs_ = kInitRetryDelayMs;
  connect_ = true;
  //启动
  startInLoop();
}

//连接服务器成功后设置channel事件
void Connector::connecting(int sockfd)
{
   
  setState(kConnecting);//设置状态为正在连接
  assert(!channel_);
  //创建一个channel

  channel_.reset(new Channel(loop_, sockfd));
  //设置读事件回调
  channel_->setWriteCallback(
      std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  //设置错误事件回调
  channel_->setErrorCallback(
      std::bind(&Connector::handleError, this)); // FIXME: unsafe

  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr
  //注册可写事件
  channel_->enableWriting();
}

//移除并释放channel
int Connector::removeAndResetChannel()
{
   
  //取消所有事件的监听
  channel_->disableAll();
  //将channel从loop中移除
  channel_->remove();

  //释放channel
  int sockfd = channel_->fd();
  // Can't reset channel_ here, because we are inside Channel::handleEvent
  loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
  return sockfd;
}

void Connector::resetChannel()
{
   
  //释放unique_ptr所指向的资源,也就是channel
  channel_.reset();
}

//可写事件的回调函数
void Connector::handleWrite()
{
   
  LOG_TRACE << "Connector::handleWrite " << state_;

  if (state_ == kConnecting)
  {
   
    //移除channel(Connector的channel只管理建立连接的阶段),成功建立连接后
    //交给TcpClient的TcpConnection来管理
    int sockfd = removeAndResetChannel();

    //可写并不一定连接建立成功
    //如果连接发生错误,socket会是可读可写的
    //所以还需要调用getsockopt检查是否出错
    int err = sockets::getSocketError(sockfd);
    if (err)
    {
   
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
      retry(sockfd);//出错重连
    }
    else if (sockets::isSelfConnect(sockfd))//是否是自连接
    {
   
      LOG_WARN << "Connector::handleWrite - Self connect";
      retry(sockfd);
    }
    else
    {
   
      //连接成功建立,更改状态
      //调用TcpClient设置的回调函数,创建TcpConnection对象    
      setState(kConnected);//设置状态为已经成功连接
      if (connect_)
      {
   
        newConnectionCallback_(sockfd);
      }
      else
      {
   
        sockets::close(sockfd);
      }
    }
  }
  else
  {
   
    // what happened?
    assert(state_ == kDisconnected);
  }
}

void Connector::handleError()
{
   
  LOG_ERROR << "Connector::handleError state=" << state_;
  if (state_ == kConnecting)
  {
   
    int sockfd = removeAndResetChannel();
    int err = sockets::getSocketError(sockfd);
    LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
    retry(sockfd);
  }
}

//重连
void Connector::retry(int sockfd)
{
   
  //socket是一次性的,失败后需要关闭重新创建
  sockets::close(sockfd);
  //将状态设置为断开连接
  setState(kDisconnected);
  if (connect_)
  {
   
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";

    //隔一段时间后重连,重新执行startInLoop
    loop_->runAfter(retryDelayMs_/1000.0,
                    std::bind(&Connector::startInLoop, shared_from_this()));

    //间隔时间翻倍
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else
  {
   
    LOG_DEBUG << "do not connect";
  }
}
目录
相关文章
|
7月前
muduo源码剖析之Acceptor监听类
Acceptor类用于创建套接字,设置套接字选项,调用socket()->bind()->listen()->accept()函数,接受连接,然后调用TcpServer设置的connect事件的回调。listen()//在TcpServer::start中调用封装了一个listen fd相关的操作,用于mainLoop接受器封装,实质上就是对Channel的多一层封装监听连接 当新连接进入时,调用Socket::accept创建套接字,触发TcpServer的回调TcpServer通过该接口设置回调,
57 0
|
7月前
muduo源码剖析之Socket类
封装了一个sockfd相关的设置。比较简单,已经编写注释。
62 0
|
7月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
81 0
|
7月前
|
网络协议
muduo源码剖析之TcpClient客户端类
muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。
95 0
muduo源码剖析之TcpClient客户端类
|
网络协议
netty编程实战02-创建一个带有连接重试的tcp客户端程序
netty编程实战02-创建一个带有连接重试的tcp客户端程序
218 0
|
7月前
|
网络协议 算法
muduo源码剖析之TcpConnection连接管理
TcpCon用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调 connectionCallback。TcpConnection 构造时接收参数有 TCP 连接的 sockfd,服务端地址 localAddr,客户端地址 peerAddr,并通过 Socket 封装 sockfd。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后 Channel::handleEvent() 执行相应事件的回调。
86 0
muduo源码剖析之TcpConnection连接管理
|
7月前
|
Java 容器
muduo源码剖析之TcpServer服务端
TcpServer拥有Acceptor类,新连接到达时new TcpConnection后续客户端和TcpConnection类交互。TcpServer管理连接和启动线程池,用Acceptor接受连接。服务端封装 - muduo的server端维护了多个tcpconnection注意TcpServer本身不带Channel,而是使用Acceptor的Channel。
79 0
muduo源码剖析之TcpServer服务端
|
缓存 移动开发 网络协议
TCP编写服务器,客户端以及遇到的两个问题,Socket,ServerScket 类,flush(),方法。以及多线程解决,及改进的线程池写法,IO多路复用的思想,C10K,C10M的阐述。万字超细
TCP编写服务器,客户端以及遇到的两个问题,Socket,ServerScket 类,flush(),方法。以及多线程解决,及改进的线程池写法,IO多路复用的思想,C10K,C10M的阐述。万字超细
|
网络协议 数据安全/隐私保护 网络架构
Netty实战(十五)UDP广播事件(一)UDP简介和示例程序
用户数据报协议(UDP)上,它通常用在性能至关重要并且能够容忍一定的数据包丢失的情况下使用
498 0
|
网络协议 Java
netty编程实战01-创建一个tcp服务端程序
netty编程实战01-创建一个tcp服务端程序
263 0