Thrift线程和状态机分析

简介: Thrift线程和状态机分析.pdf 目录 目录 1 1. 工作线程和IO线程 1 2. TNonblockingServer::TConnection::transition() 2 3.

img_e25d4fb2f8de1caf41a735ec53088516.pngThrift线程和状态机分析.pdf

目录

目录 1

1. 工作线程和IO线程 1

2. TNonblockingServer::TConnection::transition() 2

3. RPC函数被调用过程 3

4. 管道和任务队列 4

5. 对象间关系 5

6. 相关代码摘要 6

 

1. 工作线程和IO线程

启动Thrift时,可启动两类线程,一是TNonblockingIOThread,另一是Worker

 

 

TNonblockingIOThread负责接受连接,和收发数据;而Worker负责回调服务端的用户函数。

 

TNonblockingIOThread::registerEvents主要做了两件事:

1) 注册TNonblockingIOThread::listenHandler(),这个是用来接受连接请求的;

2) 注册TNonblockingIOThread::notifyHandler(),这个是用来监听管道的。

 

TNonblockingIOThreadWorker两类线程间通过队列进行通讯,队列类型为std::queue<:shared_ptr> >

class ThreadManager::Task: public Runnable

{

public:

    void run() 

    {

        // runnable_实际为TNonblockingServer::TConnection::Task

        runnable_->run();

    }

    

private:

    // 这里的Runnable实际为TNonblockingServer::TConnection::Task

    // 在TNonblockingServer::TConnection::transition()中被push进来

    boost::shared_ptrRunnable> runnable_;

};

 

2. TNonblockingServer::TConnection::transition()

transition()为状态切换函数,状态有两种:一是socket的状态,另一是rpc会话的状态。APP开头的是rpc会话的状态,SOCKET开头的是socket的状态。

 

 

APP_READ_REQUEST状态发生在IO线程中,addTask()会将任务转交给或工作线程,然后由工作线程回调服务端的函数。

TNonblockingServer::TConnection::Task

{

public:

    void run()

    {        

        // 回调

        processor_->process(input_, output_, connectionContext_);

 

        // 回调完后通知,

        // 从工作线程重回到IO线程

        connection_->notifyIOThread(); // ioThread_->notify(this);

        // 这个将触发TNonblockingIOThread::notifyHandler()

    }

};

 

TNonblockingIOThread::notifyHandler()

{

    // 从管道中取出connection的指针地址

    TNonblockingServer::TConnection* connection = NULL;

    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);

    connection->transition(); // 进入状态转换函数

}

3. RPC函数被调用过程

IO线程收到完整的RPC请求包时,以任务方式转给工作线程,然后由工作线程回调用户写的RPC函数。

 

 

完成的调用过程如下图所示:

 

 

任务从IO线程进入工作线程:

 

4. 管道和任务队列

IO线程以Task方式将连接交给工作线程,而工作线程在回调完后,以管道方式还回给IO线程。连接从IO线程进入到或工作线程后,会从libevent中删除,返回后再进入libevent

 

5. 对象间关系

class TNonblockingServer: public TServer

{

public:

    void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法

    {

        // 创建socket监听

        // 创建TNonblockingIOThread

        // 通过Thread启动TNonblockingIOThread

    }

};

 

class TServer: public concurrency::Runnable

{

public:

    virtual void serve() = 0;

    virtual void run() // 用户也可以直接调用run()

    {

        serve();

    }

};

 

 

6. 相关代码摘要

// 线程

// thrift支持原生posix线程和boost线程

 

void PthreadThread::start()

{

    // PthreadThread是一个Posix线程类

    pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);

}

 

static void* PthreadThread::threadMain(void* arg)

{

    thread->runnable()->run();

}

 

// 以下为IO线程

 

/// Three states for sockets: recv frame size, recv data, and send mode

enum TSocketState

{

  SOCKET_RECV_FRAMING,

  SOCKET_RECV,

  SOCKET_SEND

};

 

/**

 * Five states for the nonblocking server:

 *  1) initialize

 *  2) read 4 byte frame size

 *  3) read frame of data

 *  4) send back data (if any)

 *  5) force immediate connection close

 */

enum TAppState

{

  APP_INIT,            // 初始化

  APP_READ_FRAME_SIZE, // 接收包大小

  APP_READ_REQUEST,    // 接收包数据

  APP_WAIT_TASK,       

  APP_SEND_RESULT,     // 发送数据

  APP_CLOSE_CONNECTION // 关闭连接

};

 

// 启动监听和IO线程

void TNonblockingServer::serve()

{

    createAndListenOnSocket();

 

    for (uint32_t id = 0; id 

    {

        // TNonblockingIOThread是一个Runnable

        // 以委托方式被运行在PthreadThread中

        thread = new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_);

        ioThreads_.push_back(thread);

    }

    for (uint32_t i = 1; i 

    {

        // PthreadThread thread;

        thread->start();

    }

    

    ioThreads_[0]->run(); // 这将阻塞调用线程

    for (uint32_t i = 0; i 

    {

        ioThreads_[i]->join();

    }

}

 

void TNonblockingIOThread::run()

{

    eventBase_ = event_base_new();

    

    // IO线程在启动时会调用registerEvents()

    // 在registerEvents()中完成两个回调函数的注册:listenHandler和notifyHandler

    // listenHandler回调负责接受请求,并创建连接对象

    registerEvents();

    event_base_loop(eventBase_, 0); // libevent

}

 

void TNonblockingIOThread::registerEvents()

{

    // listenHandler和socket关联

    event_set(&serverEvent_, listenSocket_, EV_READ | EV_PERSIST,

        TNonblockingIOThread::listenHandler, server_);

 

    // notifyHandler和pipe关联

    event_set(?ificationEvent_, getNotificationRecvFD(), EV_READ | EV_PERSIST,

        TNonblockingIOThread::notifyHandler, this);

}

 

static void listenHandler(evutil_socket_t fd, short which, void* v)

{

    ((TNonblockingServer*)v)->handleEvent(fd, which);

}

 

void TNonblockingServer::handleEvent(int fd, short which)

{

    accept();

    createConnection();

}

 

TNonblockingServer::TConnection* TNonblockingServer::createConnection()

{

    // 会将自己绑定到一个线程

    // 采用轮询的方式选择线程

    // int selectedThreadIdx = nextIOThread_;

    // nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();

 

    // std::stack connectionStack_;

    // 使用了内存池connectionStack_    

 

    // App状态:APP_INIT

    // Socket状态:SOCKET_RECV_FRAMING

}

 

static void eventHandler(evutil_socket_t fd, short /* which */, void* v)

{

    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());

    ((TConnection*)v)->workSocket();

}

 

void TNonblockingServer::TConnection::setFlags(short eventFlags)

{

    event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);

}

 

void TNonblockingServer::TConnection::setRead()

{

    setFlags(EV_READ | EV_PERSIST);

}

 

void TNonblockingServer::TConnection::setWrite()

{

    setFlags(EV_WRITE | EV_PERSIST);

}

 

void TNonblockingServer::TConnection::setIdle()

{

    setFlags(0);

}

 

void TNonblockingServer::TConnection::workSocket()

{

    case SOCKET_RECV_FRAMING:

        TSocket::read(); // 接收包大小

        transition();

    case SOCKET_RECV:

        TSocket::read(); // 接收包数据

        transition();

    case SOCKET_SEND:

        TSocket::write_partial(); // 发送数据(非阻塞的)

        transition();

}

 

void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v)

{

    recv();

    connection->transition();

}

 

// transition()为状态迁移函数

void TNonblockingServer::TConnection::transition()

{

    case APP_INIT:

        setRead();

    case APP_WAIT_TASK:

        setWrite();

    case APP_READ_REQUEST:

        setIdle();

}

 

TNonblockingServer::TConnection::Task

{

public:

    void run()

    {        

        // 回调

        processor_->process(input_, output_, connectionContext_);

        // 回调完后通知,

        // 从工作线程重回到IO线程

        // connection_的指针地址将通过管道传给工作线程

        connection_->notifyIOThread(); // ioThread_->notify(this);

    }

};

 

TNonblockingIOThread::notifyHandler()

{

    // 从管道中取出connection的指针地址

    TNonblockingServer::TConnection* connection = NULL;

    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);

    connection->transition(); // 进入状态转换函数

}

 

// 以下为工作线程

class ThreadManager::Impl : public ThreadManager;

class SimpleThreadManager : public ThreadManager::Impl;

class ThreadManager::Worker: public Runnable;

class ThreadManager::Task : public Runnable;

 

void SimpleThreadManager::start()

{

    // workerCount_为工作线程数

    addWorker(workerCount_);

}

 

void ThreadManager::Impl::addWorker(size_t value)

{

    for (size_t ix = 0; ix 

    {

        worker = new ThreadManager::Worker(this);

        

        // thread为PthreadThread

        // 调用了worker->run();

        thread->start();

    }

}

 

void ThreadManager::Worker::run() 

{

    ThreadManager::Task task;

    task->run();

}

 

class ThreadManager::Task: public Runnable

{

public:

    void run() 

    {

        // runnable_实际为TNonblockingServer::TConnection::Task

        runnable_->run();

    }

    

private:

    // 这里的Runnable实际为TNonblockingServer::TConnection::Task

    // 在TNonblockingServer::TConnection::transition()中被push进来

    boost::shared_ptr runnable_;

};

 

void ThreadManager::Impl::add(shared_ptr value)

{

    // std::queue > tasks_;

    task = new ThreadManager::Task(value, expiration);

    tasks_.push(task);

}

 

// 两者关系

class TNonblockingServer: public TServer

{

public:

    TNonblockingServer(const boost::shared_ptr& threadManager);

 

private:

    // TNonblockingServer关联了ThreadManager

    boost::shared_ptr threadManager_;

};

 

// 工作线程将回调TNonblockingServer::TConnection::Task

class TNonblockingServer::TConnection::Task: public Runnable

{

};

 

// task为TNonblockingServer::TConnection::Task

void TNonblockingServer::addTask(boost::shared_ptr task)

{

    // 将任务交给工作线程

    // threadManager_为SimpleThreadManager

    threadManager_->add(task, 0LL, taskExpireTime_);

}

 

void TNonblockingServer::TConnection::transition()

{

    case APP_READ_REQUEST:

        if (server_->isThreadPoolProcessing())

        {

            boost::shared_ptr task =

                new TNonblockingServer::TConnection::Task(

                    processor_, inputProtocol_, outputProtocol_, this);

 

            // server_为TNonblockingServer

            // 回调交给工作线程,IO线程不做这个工作

            server_->addTask(task); // server_为TNonblockingServer

        }

        else

        {

            // 调用TNonblockingServer的构造函数时,

            // 如果没有指定参数ThreadManager,则会走这条分支

            // 这种情况下,isThreadPoolProcessing()返回false

            processor_->process(inputProtocol_, outputProtocol_, connectionContext_);

        }

}

 

void TNonblockingServer::TConnection::Task::run()

{

    // 回调

    processor_->process(input_, output_, connectionContext_);

}

 

内嵌关系:

1) TNonblockingServer内嵌了类TConnection,而TConnection又内嵌了类Task

2) ThreadManager内嵌了类Impl、类Worker和类Task(注意区分于TConnection内嵌的Task),而Impl又是ThreadManager的子类,而Task是对Runnable的实现

 

class TNonblockingServer: public TServer

{

public:

    void serve() // 用户可以直接调用server(),但直接调用run()是更好的做法

    {

        // 创建socket监听

        // 创建TNonblockingIOThread

        // 通过Thread启动TNonblockingIOThread

    }

};

 

class TServer: public concurrency::Runnable

{

public:

    virtual void serve() = 0;

    virtual void run() // 用户可以直接调用run()

    {

        serve();

    }

};

 

 

相关文章
|
1月前
|
Linux
一个进程最多可以创建多少个线程基本分析
一个进程最多可以创建多少个线程基本分析
42 1
|
3月前
|
监控 Linux 编译器
多线程死锁检测的分析与实现(linux c)-有向图的应用
在日常的软件开发中,多线程是不可避免的,使用多线程中的一大问题就是线程对锁的不合理使用造成的死锁,死锁一旦发生,将导致多线程程序响应时间长,吞吐量下降甚至宕机崩溃,那么如何检测出一个多线程程序中是否存在死锁呢?在提出解决方案之前,先对死锁产生的原因以及产生的现象做一个分析。最后在用有向环来检测多线程中是否存在死锁的问题。
56 0
|
1天前
|
SQL Dubbo Java
案例分析|线程池相关故障梳理&总结
本文作者梳理和分享了线程池类的故障,分别从故障视角和技术视角两个角度来分析总结,故障视角可以看到现象和教训,而技术视角可以透过现象看到本质更进一步可以看看如何避免。
|
30天前
|
存储 算法 Linux
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
25 0
|
2月前
|
数据处理 UED 开发者
Python并发编程之协程与多线程对比分析
本文将从Python并发编程的角度出发,对比分析协程与多线程两种并发处理方式的优缺点及适用场景,帮助读者更好地选择适合自己项目的并发方案。
|
2月前
|
程序员 测试技术 数据处理
Python中的装饰器应用与实现Python并发编程之协程与多线程对比分析
在Python编程中,装饰器是一种强大的工具,能够简洁而优雅地扩展函数或类的功能。本文将深入探讨Python中装饰器的原理、应用场景以及实现方法,帮助读者更好地理解和运用这一重要的编程概念。 本文将从Python并发编程的角度出发,对比分析协程与多线程两种并发处理方式的优缺点及适用场景,帮助读者更好地选择适合自己项目的并发方案。
|
3月前
|
运维 监控 Java
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
学习JVM需要一定的编程经验和计算机基础知识,适用于从事Java开发、系统架构设计、性能优化、研究学习等领域的专业人士和技术爱好者。
55 5
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
|
7月前
|
负载均衡 Dubbo Java
dubbo源码v2.7分析:结构、container入口及线程模型
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架,提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。
65 0
|
8月前
|
数据采集 缓存 数据挖掘
GATK4标准分析流程 丨如何选择合适的线程和内存大小?数据预处理方法与注意事项
GATK4标准分析流程 丨如何选择合适的线程和内存大小?数据预处理方法与注意事项