muduo源码剖析之TcpClient客户端类

简介: muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。

简介

muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。

主要成员及属性解析

主要接口

回调setters

这些回调函数会在新连接建立时,通过newConnection内部实现方法传递给TcpConnction对象

核心实现:newConnection

在构造时将这个函数作为回调注册给connector_对象
在Connector中的Channel执行本回调后,创建一个新的TcpConnection对象

connect

调用Connector的start接口

stop

调用Connector的stop接口

主要成员

loop

所属workloop

connector

TcpClient所维护的一个连接器

retry_

重连标志

TcpConnection connection_

TcpClient所维护的一个TCP连接对象
关于连接中回调的传递,参考下面的简图:

eb602a687d5a49408a8209145c038271

源码剖析

代码已编写完整注释,

TcpClient.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H

#include "muduo/base/Mutex.h"
#include "muduo/net/TcpConnection.h"

namespace muduo
{
   
   
namespace net
{
   
   

class Connector;
typedef std::shared_ptr<Connector> ConnectorPtr;

class TcpClient : noncopyable
{
   
   
 public:
  // TcpClient(EventLoop* loop);
  // TcpClient(EventLoop* loop, const string& host, uint16_t port);
  TcpClient(EventLoop* loop,
            const InetAddress& serverAddr,
            const string& nameArg);
  ~TcpClient();  // force out-line dtor, for std::unique_ptr members.

  void connect();//请求连接
  void disconnect();//断开连接
  void stop();//停止连接

  TcpConnectionPtr connection() const
  {
   
   
    MutexLockGuard lock(mutex_);
    return connection_;
  }

  EventLoop* getLoop() const {
   
    return loop_; }
  bool retry() const {
   
    return retry_; }
  void enableRetry() {
   
    retry_ = true; }

  const string& name() const
  {
   
    return name_; }

  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(ConnectionCallback cb)
  {
   
    connectionCallback_ = std::move(cb); }

  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(MessageCallback cb)
  {
   
    messageCallback_ = std::move(cb); }

  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(WriteCompleteCallback cb)
  {
   
    writeCompleteCallback_ = std::move(cb); }

 private:
  /// Not thread safe, but in loop
  //新连接建立后的回调函数,将新连接封装为TcpConnection交给TcpClient来管理
  void newConnection(int sockfd);
  /// Not thread safe, but in loop
  ////释放连接
  void removeConnection(const TcpConnectionPtr& conn);

  //所属loop
  EventLoop* loop_;
  //Connector,用来处理连接阶段,
  ConnectorPtr connector_; // avoid revealing Connector
  const string name_;
  ConnectionCallback connectionCallback_;//连接回调
  MessageCallback messageCallback_;//消息回调
  WriteCompleteCallback writeCompleteCallback_;//数据发送完成回调
  //是否重连
  bool retry_;   // atomic
  //是否连接
  bool connect_; // atomic
  // always in loop thread
  int nextConnId_;
  mutable MutexLock mutex_;
  //管理连接的TcpConnection
  TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};

}  // namespace net
}  // namespace muduo

#endif  // MUDUO_NET_TCPCLIENT_H

TcpClient.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//

#include "muduo/net/TcpClient.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Connector.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/SocketsOps.h"

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

// TcpClient::TcpClient(EventLoop* loop)
//   : loop_(loop)
// {
   
   
// }

// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
//   : loop_(CHECK_NOTNULL(loop)),
//     serverAddr_(host, port)
// {
   
   
// }

namespace muduo
{
   
   
namespace net
{
   
   
namespace detail
{
   
   

//断开连接
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
   
   
  loop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

void removeConnector(const ConnectorPtr& connector)
{
   
   
  //connector->
}

}  // namespace detail
}  // namespace net
}  // namespace muduo

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(nameArg),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
   
   
  //设置Connector新连接建立的回调函数
  connector_->setNewConnectionCallback(
      std::bind(&TcpClient::newConnection, this, _1));
  // FIXME setConnectFailedCallback
  LOG_INFO << "TcpClient::TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
}

TcpClient::~TcpClient()
{
   
   
  LOG_INFO << "TcpClient::~TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
  TcpConnectionPtr conn;
  bool unique = false;
  {
   
   
    MutexLockGuard lock(mutex_);
    //检查所管理对象是否仅由当前 shared_ptr 的实例管理
    unique = connection_.unique();
    conn = connection_;
  }
  if (conn)
  {
   
   
    assert(loop_ == conn->getLoop());
    // FIXME: not 100% safe, if we are in different thread
    //执行断开连接操作
    CloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);
    loop_->runInLoop(
        std::bind(&TcpConnection::setCloseCallback, conn, cb));

    //如果TcpConnection只有一份,那就强行关闭连接
    if (unique)
    {
   
   
      conn->forceClose();
    }
  }
  else
  {
   
   

    connector_->stop();
    // FIXME: HACK
    loop_->runAfter(1, std::bind(&detail::removeConnector, connector_));
  }
}

//请求连接服务器
void TcpClient::connect()
{
   
   
  // FIXME: check state
  LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
           << connector_->serverAddress().toIpPort();
  connect_ = true;
  connector_->start();
}

//断开连接
void TcpClient::disconnect()
{
   
   
  connect_ = false;

  {
   
   
    MutexLockGuard lock(mutex_);
    if (connection_)
    {
   
   
      connection_->shutdown();
    }
  }
}


void TcpClient::stop()
{
   
   
  connect_ = false;
  connector_->stop();
}

//新连接建立后的回调函数,将新连接封装为TcpConnection交给TcpClient来管理
void TcpClient::newConnection(int sockfd)
{
   
   
  loop_->assertInLoopThread();
  //获取服务器地址并打印
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  //获取client地址
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  //创建一个TcpConnection,并设置相关回调
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));

  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
   
   
    MutexLockGuard lock(mutex_);
    connection_ = conn;
  }
  //调用连接建立函数
  conn->connectEstablished();
}

//释放连接
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
   
   
  loop_->assertInLoopThread();
  assert(loop_ == conn->getLoop());

  {
   
   
    MutexLockGuard lock(mutex_);
    assert(connection_ == conn);
    //释放TcpConnection
    connection_.reset();
  }

  //在所属loop中执行连接断开释放操作
  loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
  //如果设置了重连并且连接标志为true,那就重新连接
  if (retry_ && connect_)
  {
   
   
    LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
             << connector_->serverAddress().toIpPort();
    connector_->restart();//重新启动
  }
}
目录
相关文章
|
7月前
muduo源码剖析之Acceptor监听类
Acceptor类用于创建套接字,设置套接字选项,调用socket()->bind()->listen()->accept()函数,接受连接,然后调用TcpServer设置的connect事件的回调。listen()//在TcpServer::start中调用封装了一个listen fd相关的操作,用于mainLoop接受器封装,实质上就是对Channel的多一层封装监听连接 当新连接进入时,调用Socket::accept创建套接字,触发TcpServer的回调TcpServer通过该接口设置回调,
57 0
|
7月前
|
API
muduo源码剖析之SocketOps类
对socket设置API的封装。比较简单,已经编写注释。
45 0
|
7月前
muduo源码剖析之Socket类
封装了一个sockfd相关的设置。比较简单,已经编写注释。
62 0
|
7月前
muduo源码剖析之Connector客户端连接类
Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。
61 0
|
7月前
|
安全 API
muduo源码剖析之EventLoop事件循环类
EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。
88 0
|
7月前
muduo源码剖析之channel通道类
channel是muduo中的事件分发器,它只属于一个EventLoop,Channel类中保存着IO事件的类型以及对应的回调函数,每个channel只负责一个文件描述符,但它并不拥有这个文件描述符。channel是在epoll和TcpConnection之间起沟通作用,故也叫做通道,其它类通过调用channel的setCallbcak来和建立channel沟通关系。
112 0
|
7月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
81 0
|
7月前
Muduo类详解之EventLoop
Muduo类详解之EventLoop
|
7月前
|
安全 C++ 容器
muduo源码剖析之TimerQueue类
通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。
55 0
muduo源码剖析之TimerQueue类
|
7月前
|
网络协议 算法
muduo源码剖析之TcpConnection连接管理
TcpCon用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调 connectionCallback。TcpConnection 构造时接收参数有 TCP 连接的 sockfd,服务端地址 localAddr,客户端地址 peerAddr,并通过 Socket 封装 sockfd。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后 Channel::handleEvent() 执行相应事件的回调。
86 0
muduo源码剖析之TcpConnection连接管理
下一篇
DataWorks