简介
TcpServer拥有Acceptor类,新连接到达时new TcpConnection后续客户端和TcpConnection类交互。TcpServer管理连接和启动线程池,用Acceptor接受连接。
服务端封装 - muduo的server端维护了多个tcpconnection
注意TcpServer本身不带Channel,而是使用Acceptor的Channel
成员及属性解析
主要接口
回调setters
这些回调函数会在新连接建立时传递给TcpConnction对象
start
启动threadPool线程池
在runInLoop中执行acceptor的listen
这里专门设置了一个started标记,防止多次运行start
核心实现:newConnection
从accept回调中获得的fd动态创建新的TcpConnection对象
为连接对象注册各类回调函数
将连接对象存入connectionMap_映射表里
主要成员
loop
这个loop也是acceptor的loop
acceptor
threadPool
一个EventLoopThreadPool,用来存放io线程的EventLoopThread
socket
服务端用来监听的socket
ConnectionMap
一个连接名和实例(TcpConnectionPtr)的映射容器
TcpServer中回调的传递示意简图:
源码剖析
源码已经编写注释
TcpServer.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H
#include "muduo/base/Atomic.h"
#include "muduo/base/Types.h"
#include "muduo/net/TcpConnection.h"
#include <map>
namespace muduo
{
namespace net
{
class Acceptor;
class EventLoop;
class EventLoopThreadPool;
///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
enum Option
{
kNoReusePort,//不设置端口复用
kReusePort,//设置端口复用
};
//TcpServer(EventLoop* loop, const InetAddress& listenAddr);
TcpServer(EventLoop* loop,//mainloop
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
~TcpServer(); // force out-line dtor, for std::unique_ptr members.
const string& ipPort() const {
return ipPort_; }
const string& name() const {
return name_; }
EventLoop* getLoop() const {
return loop_; }
/// Set the number of threads for handling input.
///
/// Always accepts new connection in loop's thread.
/// Must be called before @c start
/// @param numThreads
/// - 0 means all I/O in loop's thread, no thread will created.
/// this is the default value.
/// - 1 means all I/O in another thread.
/// - N means a thread pool with N threads, new connections
/// are assigned on a round-robin basis.
void setThreadNum(int numThreads);//设置subloop数量
void setThreadInitCallback(const ThreadInitCallback& cb)//设置subloop线程初始化时调用的函数
{
threadInitCallback_ = cb; }
/// valid after calling start()
std::shared_ptr<EventLoopThreadPool> threadPool()//返回EventLoopThread线程池
{
return threadPool_; }
/// Starts the server if it's not listening.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb)
{
connectionCallback_ = cb; }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{
messageCallback_ = cb; }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{
writeCompleteCallback_ = cb; }
private:
/// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress& peerAddr);
/// Thread safe.
void removeConnection(const TcpConnectionPtr& conn);
/// Not thread safe, but in loop
void removeConnectionInLoop(const TcpConnectionPtr& conn);
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // the acceptor loop
const string ipPort_;
const string name_;
std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
std::shared_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
AtomicInt32 started_;
// always in loop thread
int nextConnId_;
ConnectionMap connections_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_TCPSERVER_H
TcpServer.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/net/TcpServer.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Acceptor.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThreadPool.h"
#include "muduo/net/SocketsOps.h"
#include <stdio.h> // snprintf
using namespace muduo;
using namespace muduo::net;
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)), //TcpServer所在的主线程下运行的事件驱动循环
ipPort_(listenAddr.toIpPort()),/* 服务器负责监听的本地ip和端口 */
name_(nameArg),/* 服务器名字,创建时传入 */
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),/* Acceptor对象,负责监听客户端连接请求,运行在主线程的EventLoop中 */
threadPool_(new EventLoopThreadPool(loop, name_)),/* 事件驱动线程池,池中每个线程运行一个EventLoop */
connectionCallback_(defaultConnectionCallback),/* 用户传入,有tcp连接到达或tcp连接关闭时调用,传给TcpConnection */
messageCallback_(defaultMessageCallback),/* 用户传入,对端发来消息时调用,传给TcpConnection */
nextConnId_(1) /* TcpConnection特有id,每增加一个TcpConnection,nextConnId_加一 */
{
/*
* 设置回调函数,当有客户端请求时,Acceptor接收客户端请求,然后调用这里设置的回调函数
* 回调函数用于创建TcpConnection连接
*/
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}
void TcpServer::setThreadNum(int numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);//启动线程池,threadInitCallback_创建好所有线程后调用的回调函数
assert(!acceptor_->listenning());
loop_->runInLoop( //直接调用linsten函数
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
/*
* Acceptor接收客户端请求后调用的回调函数
* @param sockfd: 已经接收完成(三次握手完成)后的客户端套接字
* @param peerAddr: 客户端地址
*
* Acceptor只负责接收客户端请求
* TcpServer需要生成一个TcpConnection用于管理tcp连接
*
* 1.TcpServer内有一个EventLoopThreadPool,即事件循环线程池,池子中每个线程都是一个EventLoop
* 2.每个EventLoop包含一个Poller用于监听注册到这个EventLoop上的所有Channel
* 3.当建立起一个新的TcpConnection时,这个连接会放到线程池中的某个EventLoop中
* 4.TcpServer中的baseLoop只用来检测客户端的连接
*
* 从libevent的角度看就是
* 1.EventLoopThreadPool是一个struct event_base的池子,池子中全是struct event_base
* 2.TcpServer独占一个event_base,这个event_base不在池子中
* 3.TcpConnection会扔到这个池子中的某个event_base中
*/
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();//从事件驱动线程池中取出一个线程给TcpConnection
/* 为TcpConnection生成独一无二的名字 */
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
/*
* 根据sockfd获取tcp连接在本地的<地址,端口>
* getsockname(int fd, struct sockaddr*, int *size);
*/
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
/* 创建一个新的TcpConnection代表一个Tcp连接 */
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
/* 添加到所有tcp 连接的map中,键是tcp连接独特的名字(服务器名+客户端<地址,端口>) */
connections_[connName] = conn;
/* 为tcp连接设置回调函数(由用户提供) */
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
/*
* 关闭回调函数,由TcpServer设置,作用是将这个关闭的TcpConnection从map中删除
* 当poll返回后,发现被激活的原因是EPOLLHUP,此时需要关闭tcp连接
* 调用Channel的CloseCallback,进而调用TcpConnection的handleClose,进而调用removeConnection
*/
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
/*
* 连接建立后,调用TcpConnection连接建立成功的函数
* 1.新建的TcpConnection所在事件循环是在事件循环线程池中的某个线程
* 2.所以TcpConnection也就属于它所在的事件驱动循环所在的那个线程
* 3.调用TcpConnection的函数时也就应该在自己所在线程调用
* 4.所以需要调用runInLoop在自己的那个事件驱动循环所在线程调用这个函数
* 5.当前线程是TcpServer的主线程,不是TcpConnection的线程,如果在这个线程直接调用会阻塞监听客户端请求
* 6.其实这里不是因为线程不安全,即使在这个线程调用也不会出现线程不安全,因为TcpConnection本就是由这个线程创建的
*/
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
//关闭连接,把fd从epoll中del掉,要释放connector(包括描述符)和channel
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}