C++ FFLIB之FFRPC:多线程&多进程的scalability探索

简介: 摘要: 近来在完成通用的数据分析系统ffcount时,使用了ffrpc完成了事件源和service的通信。顺便对ffrpc进行了优化和精简,接口也更易用一些。在跟一个朋友讨论多线程和多进程的问题时,引发了如何才能是系统更加scalability的思考。

摘要:

近来在完成通用的数据分析系统ffcount时,使用了ffrpc完成了事件源和service的通信。顺便对ffrpc进行了优化和精简,接口也更易用一些。在跟一个朋友讨论多线程和多进程的问题时,引发了如何才能是系统更加scalability的思考。把自己的一些想法用ffrpc写了一个demo。无论是使用多线程还是多进程,并发都是为了使系统在吞吐量或响应延迟等特性上达到更佳的效果。那么什么样的设计能够尽量保证scalability呢?

  • 如何更好的使用多线程,或者说使用多线程应该遵循什么样的原则才能避免麻烦。
  • 如果线程的资源不足以满足要求,那么如何利用多进程的资源但却不至于大范围的修改系统实现。

关于多线程&多进程:

对应服务器开发人员来说,多线程编程是最重要的开发技术之一,但是随着实际开发中接触的多线程场景越多,反而越来越尽量少的使用多线程。多线程往往是看起来甜美,用起来苦涩。许多人过度的注意到了多线程的优点,而极大的忽视了其缺点。下面是一些多线程的技巧:

  • 尽量不要使用多线程,如果有明确数据显示单线程无法达到要求,再考虑行之。
  • 如果使用了多线程,多线程之间不要共享数据,哪怕是一点点。多线程之间的通讯使用任务队列或者消息队列之类的完成。
  • 一般而言普通的cpu计算不需要多线程,若有io操作,并且业务可以并行,可以考虑使用异步加回调的方式使用多线程。
  • 使用多线程,切勿因为多线程而多线程,如果业务是不可并行切分的,那么强行拆分则会得不偿失,甚至系统的正确和稳定都难以确保,更不要说后续的扩展和维护。

使用多进程可以避免以上的尴尬问题,多进程本身数据不共享,通讯只能使用消息通讯,这些硬性限制反而确保了多进程更加理想。但问题是管理多个进程往往让人不够情愿,尤其是只是用一台机器的时候。能不能有权衡二者的scalability方案?

FFRPC实现scalability:

在设计ffrpc的时候,首先其适合类似于网游多进程架构的场景。幸运的是,ffrpc封装的是节点与节点之间的通讯,并不限制节点是否在同一个进程。这样在单进程内使用ffrpc开启多个服务实例,从而利用多线程。若实例开启在多个进程中,则又适配多进程环境。其demo设计为如下系统:

  • client 请求logic_service,调用其test接口,根据uid的不同,调用不同的logic_service实例,从而实现logic_service并发。
  • test_msg_t::in_t in;
    in.uid = i;
    test_msg_t::out_t out;
    ffrpc.call("logic_service", 1 + in.uid % ffrpc.service_num("logic_service"), in, out);

     

  • logic_service的test接口被调用后,调用db_service接口,根据uid’不同调用的db_service实例也不同,从而实现db_service的并发。
  • struct lambda_t
    {
        static void async_callback(update_msg_t::out_t& msg_)
        {
            sleep(2);
            printf("logic_service_t 接收db_service的返回值 ret_bool=[%d]\n", msg_.value);
        }
    };
    update_msg_t::in_t in;
    in.uid = in_msg_.uid;
    ffrpc->async_call("db_service", 1 + in_msg_.uid % ffrpc->service_num("db_service"), in, &lambda_t::async_callback);

     

  • db_service的update被调用后,回调对应的logic_service的回调函数返回结果。
  • int update(update_msg_t::in_t& in_msg_, rpc_callcack_t<update_msg_t::out_t>& cb_)
        {
            sleep(2);
            printf("in db_service_t::update[index=%d], 被logic_service调用uid[%ld]\n", m_index, in_msg_.uid);
            update_msg_t::out_t out;
            out.value = true;
            cb_(out);
            return 0;
        }

     

 示例源码:

View Code
#include <stdio.h>

#include "count/ffcount.h"
#include "rpc/broker_application.h"
#include "base/daemon_tool.h"
#include "base/arg_helper.h"
#include "base/strtool.h"

using namespace ff;
bool g_run = false;

struct test_msg_t
{
    struct in_t: public ffmsg_t<test_msg_t::in_t>
    {
        virtual string encode()
        {
            return (init_encoder() << uid).get_buff() ;
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> uid;
        }
        long    uid;
    };
    typedef ffmsg_bool_t out_t;
};
struct update_msg_t
{
    struct in_t: public ffmsg_t<update_msg_t::in_t>
    {
        virtual string encode()
        {
            return (init_encoder() << uid).get_buff() ;
        }
        virtual void decode(const string& src_buff_)
        {
            init_decoder(src_buff_) >> uid;
        }
        long    uid;
    };
    typedef ffmsg_bool_t out_t;
};

class logic_service_t
{
public:
    logic_service_t(ffrpc_t* p, int i):ffrpc(p), m_index(i){}
    int test(test_msg_t::in_t& in_msg_, rpc_callcack_t<test_msg_t::out_t>& cb_)
    {
        sleep(2);
        printf("in logic_service_t::test[index=%d], 被client调用 uid[%ld]\n", m_index, in_msg_.uid);

        test_msg_t::out_t out;
        out.value = true;
        cb_(out);
        
        struct lambda_t
        {
            static void async_callback(update_msg_t::out_t& msg_)
            {
                sleep(2);
                printf("logic_service_t 接收db_service的返回值 ret_bool=[%d]\n", msg_.value);
            }
        };
        update_msg_t::in_t in;
        in.uid = in_msg_.uid;
        ffrpc->async_call("db_service", 1 + in_msg_.uid % ffrpc->service_num("db_service"), in, &lambda_t::async_callback);
        return 0;
    }
    
    ffrpc_t* ffrpc;
    int      m_index;
};

class db_service_t
{
public:
    db_service_t(ffrpc_t* p, int i):ffrpc(p), m_index(i){}
    int update(update_msg_t::in_t& in_msg_, rpc_callcack_t<update_msg_t::out_t>& cb_)
    {
        sleep(2);
        printf("in db_service_t::update[index=%d], 被logic_service调用uid[%ld]\n", m_index, in_msg_.uid);
        update_msg_t::out_t out;
        out.value = true;
        cb_(out);
        return 0;
    }
    
    ffrpc_t* ffrpc;
    int      m_index;
};

int start_logic_service(ffrpc_t& ffrpc, logic_service_t& service, arg_helper_t* arg_helper_, int index_)
{
    //printf("start_logic_service index[%d] begin\n", index_);
    assert(0 == ffrpc.open(arg_helper_->get_option_value("-l")) && "can't connnect to broker");

    ffrpc.create_service("logic_service", index_)
            .bind_service(&service)
            .reg(&logic_service_t::test);
    //printf("start_logic_service index[%d] end\n", index_);
    return 0;
}
int start_db_service(ffrpc_t& ffrpc, db_service_t& service, arg_helper_t* arg_helper_, int index_)
{
    //printf("start_db_service index[%d] begin\n", index_);
    assert(0 == ffrpc.open(arg_helper_->get_option_value("-l")) && "can't connnect to broker");

    ffrpc.create_service("db_service", index_)
            .bind_service(&service)
            .reg(&db_service_t::update);
    //printf("start_db_service index[%d] end\n", index_);
    return 0;
}
int main(int argc, char* argv[])
{
    if (argc == 1)
    {
        printf("usage: app -broker -client -l tcp://127.0.0.1:10241 -service db_service@1-4,logic_service@1-4\n");
        return 1;
    }
    arg_helper_t arg_helper(argc, argv);
    if (arg_helper.is_enable_option("-broker"))
    {
        broker_application_t::run(argc, argv);
    }

    if (arg_helper.is_enable_option("-d"))
    {
        daemon_tool_t::daemon();
    }
    
    vector<string> all_service_name;
    strtool_t::split(arg_helper.get_option_value("-service"), all_service_name, ",");
    
    vector<ffrpc_t*>            vt_rpc;
    vector<db_service_t*>       vt_db_service;
    vector<logic_service_t*>    vt_logic_service;
    for (size_t i = 0; i < all_service_name.size(); ++i)
    {
        vector<string> opts;
        strtool_t::split(all_service_name[i], opts, "@");
        int index_begin = 0;
        int index_end   = 0;
        if (opts.size() > 1)
        {
            vector<string> vt_index;
            strtool_t::split(opts[1], vt_index, "-");
            if (vt_index.empty() == false)
            {
                index_begin = ::atoi(vt_index[0].c_str());
                if (vt_index.size() > 1)
                {
                    index_end = ::atoi(vt_index[1].c_str());
                }
            }
        }
        if (index_end < index_begin) index_end = index_begin;
        printf("service includes<%s:%d-%d>\n", opts[0].c_str(), index_begin, index_end);
        
        for (int i = index_begin; i <= index_end; ++i)
        {
            ffrpc_t* ffrpc = new ffrpc_t();
            vt_rpc.push_back(ffrpc);
            if (opts[0] == "db_service")
            {
                db_service_t* service = new db_service_t(ffrpc, i);
                start_db_service(*ffrpc, *service, &arg_helper, i);
                vt_db_service.push_back(service);
            }
            else if (opts[0] == "logic_service")
            {
                logic_service_t* service = new logic_service_t(ffrpc, i);
                start_logic_service(*ffrpc, *service, &arg_helper, i);
                vt_logic_service.push_back(service);
            }
        }
    }
    
    if (arg_helper.is_enable_option("-client"))
    {
        ffrpc_t ffrpc;
        for (int i = 1; i < 100000; ++i)
        {
            sleep(1);
            printf("client 准备调用logic_service[index=%d]\n", i);

            assert(0 == ffrpc.open(arg_helper.get_option_value("-l")) && "can't connnect to broker");

            test_msg_t::in_t in;
            in.uid = i;

            test_msg_t::out_t out;
            ffrpc.call("logic_service", 1 + in.uid % ffrpc.service_num("logic_service"), in, out);
            sleep(8);
            printf("logic_service[index=%d] 调用返回=%d\n", i, out.value);
        }
        ffrpc.close();
    }
    signal_helper_t::wait();
    for (size_t i = 0; i < vt_rpc.size(); ++i)
    {
        vt_rpc[i]->close();
        delete vt_rpc[i];
    }
    for (size_t i = 0; i < vt_db_service.size(); ++i)
    {
        delete vt_db_service[i];
    }
    for (size_t i = 0; i < vt_logic_service.size(); ++i)
    {
        delete vt_logic_service[i];
    }
    return 0;
}

 

运行命令:

git clone https://github.com/fanchy/fflib

cd  cd fflib/example/book/rpc/

make

#运行4个db_service实例和4个logic_service实例

./app_rpc -client -broker -l tcp://127.0.0.1:10241 -service db_service@1-4,logic_service@1-4

 

总结:

本例中logic_service和db_service集成到了一个程序中,使用ffrpc可以通过多线程实现并发,各个服务使用异步回调和消息通信,通过配置实例的个数,从而实现多线程的scalability。如果要把服务分别部署到其他机器上,只需启动多个app进程,例如:

启动4个logic_service实例:

./app_rpc -broker -l tcp://127.0.0.1:10241 -service logic_service@1-4

启动4个db_service实例:

./app_rpc  -l tcp://127.0.0.1:10241 -service db_service@1-4 -client

 

目录
相关文章
|
1月前
|
调度 开发者 Python
深入浅出操作系统:进程与线程的奥秘
在数字世界的底层,操作系统扮演着不可或缺的角色。它如同一位高效的管家,协调和控制着计算机硬件与软件资源。本文将拨开迷雾,深入探索操作系统中两个核心概念——进程与线程。我们将从它们的诞生谈起,逐步剖析它们的本质、区别以及如何影响我们日常使用的应用程序性能。通过简单的比喻,我们将理解这些看似抽象的概念,并学会如何在编程实践中高效利用进程与线程。准备好跟随我一起,揭开操作系统的神秘面纱,让我们的代码运行得更加流畅吧!
|
2天前
|
消息中间件 调度
如何区分进程、线程和协程?看这篇就够了!
本课程主要探讨操作系统中的进程、线程和协程的区别。进程是资源分配的基本单位,具有独立性和隔离性;线程是CPU调度的基本单位,轻量且共享资源,适合并发执行;协程更轻量,由程序自身调度,适合I/O密集型任务。通过学习这些概念,可以更好地理解和应用它们,以实现最优的性能和资源利用。
30 11
|
1天前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
16 6
|
1月前
|
消息中间件 Unix Linux
【C语言】进程和线程详解
在现代操作系统中,进程和线程是实现并发执行的两种主要方式。理解它们的区别和各自的应用场景对于编写高效的并发程序至关重要。
65 6
|
1月前
|
调度 开发者
深入理解:进程与线程的本质差异
在操作系统和计算机编程领域,进程和线程是两个核心概念。它们在程序执行和资源管理中扮演着至关重要的角色。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
65 5
|
1月前
|
算法 调度 开发者
深入理解操作系统:进程与线程的管理
在数字世界的复杂编织中,操作系统如同一位精明的指挥家,协调着每一个音符的奏响。本篇文章将带领读者穿越操作系统的幕后,探索进程与线程管理的奥秘。从进程的诞生到线程的舞蹈,我们将一起见证这场微观世界的华丽变奏。通过深入浅出的解释和生动的比喻,本文旨在揭示操作系统如何高效地处理多任务,确保系统的稳定性和效率。让我们一起跟随代码的步伐,走进操作系统的内心世界。
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
62 4
|
2月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
2月前
|
Linux 调度 C语言
深入理解操作系统:进程和线程的管理
【10月更文挑战第32天】本文旨在通过浅显易懂的语言和实际代码示例,带领读者探索操作系统中进程与线程的奥秘。我们将从基础知识出发,逐步深入到它们在操作系统中的实现和管理机制,最终通过实践加深对这一核心概念的理解。无论你是编程新手还是希望复习相关知识的资深开发者,这篇文章都将为你提供有价值的见解。
|
2月前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
34 1

热门文章

最新文章

相关实验场景

更多