简介
Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。
主要成员及属性解析
主要接口
setNewConnectionCallback
设置TcpClient交给的回调函数
start
最后通过loop的runInLoop调用
调用connect内部实现
stop
最终通过loop的queueInLoop调用
回收Channel控制的套接字(如果有的话)
设置connect_标记为false
retry
若connect_标记为true,则重连
核心实现:connect
调用Socket::connect方法连接服务端
连接成功后,创建一个Channel
将自身的handleWrite回调注册到Channel上,并激活可写事件关注
核心实现:handleWrite
根据getSockError的情况决定调用创建连接回调,或是错误回调,或retry操作
其中包含了TcpClient注册的回调newConnection
主要成员
loop
所属的workloop
channel
unique_ptr指针,仅在连接建立时动态创建Channel对象
当channel触发可写事件时,执行handleWrite
并在handleWrite中执行TcpClient的newConnection
serverAddr
服务端的inetaddr地址信息
connect_
非常重要的一个标记,决定了是否retry
源码剖析
代码已编写完整注释,
Connector.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_CONNECTOR_H
#define MUDUO_NET_CONNECTOR_H
#include "muduo/base/noncopyable.h"
#include "muduo/net/InetAddress.h"
#include <functional>
#include <memory>
namespace muduo
{
namespace net
{
class Channel;
class EventLoop;
//负责主动发起连接,不负责创建socket,只负责连接的建立
class Connector : noncopyable,
public std::enable_shared_from_this<Connector>
{
public:
typedef std::function<void (int sockfd)> NewConnectionCallback;
Connector(EventLoop* loop, const InetAddress& serverAddr);
~Connector();
//设置新连接到来时的回调函数
void setNewConnectionCallback(const NewConnectionCallback& cb)
{
newConnectionCallback_ = cb; }
void start(); // can be called in any thread
void restart(); // must be called in loop thread
void stop(); // can be called in any thread
//返回服务器的地址信息
const InetAddress& serverAddress() const {
return serverAddr_; }
private:
enum States {
kDisconnected, kConnecting, kConnected };
static const int kMaxRetryDelayMs = 30*1000;
static const int kInitRetryDelayMs = 500;
//设置连接状态
void setState(States s) {
state_ = s; }
//在loop中执行的start启动操作
void startInLoop();
//在loop中执行的stop暂停操作
void stopInLoop();
//请求连接服务器
void connect();
//连接服务器成功后设置channel事件
void connecting(int sockfd);
//写事件回调
void handleWrite();
//错误发生回调
void handleError();
//重连
void retry(int sockfd);
//移除并释放channel
int removeAndResetChannel();
//释放channel
void resetChannel();
//所属的EventLoop
EventLoop* loop_;
//要连接的server地址
InetAddress serverAddr_;
//是否连接服务器标志
bool connect_; // atomic
//连接状态
States state_; // FIXME: use atomic variable
//对应的channel
std::unique_ptr<Channel> channel_;
//连接成功时的回调函数
NewConnectionCallback newConnectionCallback_;
//重连间隔时间
int retryDelayMs_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_CONNECTOR_H
Connector.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
#include "muduo/net/Connector.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/SocketsOps.h"
#include <errno.h>
using namespace muduo;
using namespace muduo::net;
const int Connector::kMaxRetryDelayMs;
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false),
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
Connector::~Connector()
{
LOG_DEBUG << "dtor[" << this << "]";
assert(!channel_);
}
void Connector::start()
{
connect_ = true;
//在所属IO线程Lloop中调用该函数,连接服务器
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();//请求连接
}
else
{
LOG_DEBUG << "do not connect";
}
}
void Connector::stop()
{
connect_ = false;
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
// FIXME: cancel timer
}
void Connector::stopInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnecting)
{
//将状态设置为断开连接
setState(kDisconnected);
//移除并释放channel
int sockfd = removeAndResetChannel();
retry(sockfd);
}
}
//连接服务器
void Connector::connect()
{
//创建一个非阻塞的socket fd
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
//请求连接服务器
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS:
case EINTR:
case EISCONN:
connecting(sockfd);
break;
case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd);//重连
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
void Connector::restart()
{
loop_->assertInLoopThread();
//设置标志为断开连接
setState(kDisconnected);
retryDelayMs_ = kInitRetryDelayMs;
connect_ = true;
//启动
startInLoop();
}
//连接服务器成功后设置channel事件
void Connector::connecting(int sockfd)
{
setState(kConnecting);//设置状态为正在连接
assert(!channel_);
//创建一个channel
channel_.reset(new Channel(loop_, sockfd));
//设置读事件回调
channel_->setWriteCallback(
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
//设置错误事件回调
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
//注册可写事件
channel_->enableWriting();
}
//移除并释放channel
int Connector::removeAndResetChannel()
{
//取消所有事件的监听
channel_->disableAll();
//将channel从loop中移除
channel_->remove();
//释放channel
int sockfd = channel_->fd();
// Can't reset channel_ here, because we are inside Channel::handleEvent
loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
return sockfd;
}
void Connector::resetChannel()
{
//释放unique_ptr所指向的资源,也就是channel
channel_.reset();
}
//可写事件的回调函数
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
//移除channel(Connector的channel只管理建立连接的阶段),成功建立连接后
//交给TcpClient的TcpConnection来管理
int sockfd = removeAndResetChannel();
//可写并不一定连接建立成功
//如果连接发生错误,socket会是可读可写的
//所以还需要调用getsockopt检查是否出错
int err = sockets::getSocketError(sockfd);
if (err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd);//出错重连
}
else if (sockets::isSelfConnect(sockfd))//是否是自连接
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);
}
else
{
//连接成功建立,更改状态
//调用TcpClient设置的回调函数,创建TcpConnection对象
setState(kConnected);//设置状态为已经成功连接
if (connect_)
{
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
void Connector::handleError()
{
LOG_ERROR << "Connector::handleError state=" << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel();
int err = sockets::getSocketError(sockfd);
LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
retry(sockfd);
}
}
//重连
void Connector::retry(int sockfd)
{
//socket是一次性的,失败后需要关闭重新创建
sockets::close(sockfd);
//将状态设置为断开连接
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
//隔一段时间后重连,重新执行startInLoop
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
//间隔时间翻倍
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}