pgpool-II中是如何实现进程池的

简介:
看了pgpool-II的代码后,对其扒皮抽筋,大体了解了其思路:

首先有一般网络通信中使用的 scoket/bind/listen函数设置:

复制代码
/*                    
* create inet domain socket                    
*/                    
static int create_inet_domain_socket(const char *hostname, const int port)
{                    
    struct sockaddr_in addr;                
    int fd;                
    int status;                
    int one = 1;                
    int len;                
    int backlog;                
                    
    fd = socket(AF_INET, SOCK_STREAM, 0);                
    ……                
    status = bind(fd, (struct sockaddr *)&addr, len);                
    ……                
    status = listen(fd, backlog);                
    if (status < 0)                
    {                
        pool_error("listen() failed. reason: %s", strerror(errno));            
        myexit(1);            
    }                
    return fd;                
}                    
复制代码
父进程调用上述函数,把 fd 传递给各个子进程,并且获得了各个子进程的进程ID:

复制代码
/*                    
* pgpool main program                    
*/                    
int main(int argc, char **argv)                    
{                    
    ……                
    /* create unix domain socket */                
    unix_fd = create_unix_domain_socket(un_addr);                
                    
    /* create inet domain socket if any */                
    if (pool_config->listen_addresses[0])                
    {                
        inet_fd = create_inet_domain_socket
          (pool_config->listen_addresses, pool_config->port);            
    }                
                    
    ……                
    /*                
     * We need to block signal here. Otherwise child might send some               
     * signals, for example SIGUSR1(fail over).  Children will inherit             
     * signal blocking but they do unblock signals at the very beginning           
     * of process.  So this is harmless.                
     */                
    POOL_SETMASK(&BlockSig);                
                    
    /* fork the children */                
    for (i=0;i<pool_config->num_init_children;i++){                
        process_info[i].pid = fork_a_child(unix_fd, inet_fd, i);            
        process_info[i].start_time = time(NULL);            
    }                
                    
    /* set up signal handlers */                               
    pool_signal(SIGTERM, exit_handler);                
    pool_signal(SIGINT, exit_handler);                
    pool_signal(SIGQUIT, exit_handler);                
    pool_signal(SIGCHLD, reap_handler);                
    pool_signal(SIGUSR1, failover_handler);                
    pool_signal(SIGUSR2, wakeup_handler);                
    pool_signal(SIGHUP, reload_config_handler);                
                    
    /* create pipe for delivering event */                
    if (pipe(pipe_fds) < 0){                
        pool_error("failed to create pipe");            
        myexit(1);            
    }                
             
    pool_log("%s successfully started. version %s (%s)", 
               PACKAGE, VERSION, PGPOOLVERSION);                
                    
    …… ////main loop is here                
            
    pool_shmem_exit(0);                
}                    
复制代码
而下面的声称子进程的函数里面,子进程一生成,就开始调用 do_child:

复制代码
/*                    
* fork a child                    
*/                    
pid_t fork_a_child(int unix_fd, int inet_fd, int id)                    
{                    
    pid_t pid;                
                    
    pid = fork();                
                    
    if (pid == 0)                
    {                
        /* Before we unconditionally closed pipe_fds[0] and pipe_fds[1]            
         * here, which is apparently wrong since in the start up of            
         * pgpool, pipe(2) is not called yet and it mistakenly closes            
         * fd 0. Now we check the fd > 0 before close(), expecting            
         * pipe returns fds greater than 0.  Note that we cannot            
         * unconditionally remove close(2) calls since fork_a_child()            
         * may be called *after* pgpool starting up.            
         */            
        if (pipe_fds[0] > 0)            
        {            
            close(pipe_fds[0]);        
            close(pipe_fds[1]);        
        }            
                    
        myargv = save_ps_display_args(myargc, myargv);            
                    
        /* call child main */            
        POOL_SETMASK(&UnBlockSig);            
        reload_config_request = 0;            
        my_proc_id = id;            
        run_as_pcp_child = false;            
        do_child(unix_fd, inet_fd);            
    }else if (pid == -1){                
        pool_error("fork() failed. reason: %s", strerror(errno));            
        myexit(1);            
    }                
    return pid;                
}
复制代码
再来看 do_child的逻辑:

复制代码
/*                    
* child main loop                    
*/                    
void do_child(int unix_fd, int inet_fd)                    
{                    
    ……                
    /* set up signal handlers */                
    signal(SIGALRM, SIG_DFL);                
    signal(SIGTERM, die);                
    signal(SIGINT, die);                
    signal(SIGHUP, reload_config_handler);                
    signal(SIGQUIT, die);                
    signal(SIGCHLD, SIG_DFL);                
    signal(SIGUSR1, close_idle_connection);                
    signal(SIGUSR2, wakeup_handler);                
    signal(SIGPIPE, SIG_IGN);                
                    
    ……                
    for (;;){                       
        ……            
        accepted = 0;            
                    
        /* perform accept() */            
        frontend = do_accept(unix_fd, inet_fd, &timeout);            
                    
        pool_log("I am %d", getpid());            
                    
        if (frontend == NULL)  /* connection request from frontend timed out */
        {            
            ……        
            continue;        
        }            
        ……            
                    
        /*            
         * Ok, negotiaton with frontend has been done. Let's go to the            
         * next step.  Connect to backend if there's no existing            
         * connection which can be reused by this frontend.            
         * Authentication is also done in this step.            
         */            
        ……                    
        /*            
         * if there's no connection associated with user and database,            
         * we need to connect to the backend and send the startup packet.          
         */            
                    
        /* look for existing connection */            
        found = 0;            
        backend = pool_get_cp(sp->user, sp->database, sp->major, 1);            
                    
        if (backend != NULL){            
            ……        
        }            
                    
        if (backend == NULL){            
            /* create a new connection to backend */        
            if ((backend = connect_backend(sp, frontend)) == NULL){        
                connection_count_down();    
                continue;    
            }        
        }else{            
            /* reuse existing connection */        
            if (!connect_using_existing_connection(frontend, backend, sp))        
                continue;    
        }                    
        ……            
        /*            
         * Initialize per session context            
         */            
        pool_init_session_context(frontend, backend);            
                    
        /* Mark this connection pool is conncted from frontend */            
        pool_coninfo_set_frontend_connected(
           pool_get_process_context()->proc_id, pool_pool_index());            
                    
        /* query process loop */            
        for (;;){            
            ……        
        }            
                    
        /* Destroy session context */            
        pool_session_context_destroy();            
                    
        /* Mark this connection pool is not conncted from frontend */            
        pool_coninfo_unset_frontend_connected(
             pool_get_process_context()->proc_id, pool_pool_index());            
        ……            
    }                
                    
    child_exit(0);                
}                    
复制代码
do_child中要调 do_accept,看do_accept的逻辑,高度概括,去掉无关代码后,大致是这样:

复制代码
/*                    
* perform accept() and return new fd                    
*/                    
static POOL_CONNECTION *do_accept(int unix_fd, int inet_fd, struct timeval *timeout){                    
    ……                
    fds = select(Max(unix_fd, inet_fd)+1, &readmask, NULL, NULL, timeoutval);      
    ……                
}
复制代码
结合上述代码,如果站在子进程的角度来看,就是:
它进行了 socket/bind/listen(由父进程代劳), 又进行了 select 操作,并在select出阻塞或超时。

这就是pgpool-II的 进程池实现的方式。我觉得可以借鉴之,用在自己的程序上面。


本文转自健哥的数据花园博客园博客,原文链接:http://www.cnblogs.com/gaojian/archive/2012/08/05/2623778.html,如需转载请自行联系原作者
目录
相关文章
|
3月前
|
Python
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
|
Java
线程池的核心参数及执行原理你知道嘛?
线程池是一种管理和复用线程的机制,它可以提高线程的利用率和系统的性能。
368 0
|
10月前
|
存储 前端开发 rax
协程切换的三种底层实现方式
协程切换的三种底层实现方式
104 0
|
12月前
|
缓存 移动开发 网络协议
TCP编写服务器,客户端以及遇到的两个问题,Socket,ServerScket 类,flush(),方法。以及多线程解决,及改进的线程池写法,IO多路复用的思想,C10K,C10M的阐述。万字超细
TCP编写服务器,客户端以及遇到的两个问题,Socket,ServerScket 类,flush(),方法。以及多线程解决,及改进的线程池写法,IO多路复用的思想,C10K,C10M的阐述。万字超细
|
设计模式 并行计算 容器
多线程设计模式 : Master-Worker模式
多线程设计模式 : Master-Worker模式
236 0
PHP进程池的数量是如何配置的?底层原理是什么?
PHP进程池的数量是如何配置的?底层原理是什么?
170 0
|
Java
40. 说一下线程池内部工作原理
40. 说一下线程池内部工作原理
88 0
40. 说一下线程池内部工作原理
|
Java
多线程相关面试题:并行和并发的区别、线程和进程、线程的创建方式、运行状态
多线程相关面试题:并行和并发的区别、线程和进程、线程的创建方式、运行状态
122 0
|
数据可视化 Java 应用服务中间件
眼见为实:被误导的Tomcat工作原理系列之poller线程是做socket读写的线程吗?
眼见为实:用页面可视化的方式带你看到Tomcat工作原理,它里面有哪些线程,在每次请求过来的时候做了什么工作? 纠正部分网友对于Tomcat工作原理理解的误区,明确Tomcat里到底是哪个线程在做socket读写?
222 0
眼见为实:被误导的Tomcat工作原理系列之poller线程是做socket读写的线程吗?
|
Java
【多线程:Monitor 概念】
【多线程:Monitor 概念】
139 0