为什么要用统一事件源
信号是一种异步事件:信号处理函数和程序的主循环是两条不同的执行路径。即当进程收到信号时,操作系统会中断进程当前的正常流程,转而进入信号处理函数去处理信号,完成后再返回中断的地方继续执行。
很显然,信号处理函数需要尽可能地快速执行完毕,以确保该信号不会被屏蔽太久。因为为了避免信号竞态条件的发生,信号处理期间系统不会再次触发它。
解决方法:
一种典型的解决方法是:把信号的主要处理逻辑放到程序的主循环中,当信号处理函数被触发时,它只是简单的通知主循环程序接收到信号,并把信号值传递给主循环,主循环再根据接收到的信号执行目标信号对应的逻辑代码。
信号处理函数通常使用管道来将信号“传递”给主循环的:信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值。那么主循环怎么知道管道上何时有数据可读呢?这很简单,我们只需要使用I/O复用系统调用来监听管道的读端文件描述符上的可读事件。如此一来,信号事件就能和其他I/O事件一样被处理了。
统一事件源的概念
统一事件源,即将信号和其他事件一样进行处理。
具体就是信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。
很多优秀的I/O框架库和后台服务器程序都统一处理信号和I/O事件,比如Libevent I/O 框架库和 xinetd 超级服务。以下是一个统一事件源的简单实现。
统一事件源的应用
信号处理函数:
//信号处理函数 void sig_handler(int sig) { //保留原来的errno,在函数最后恢复,以保证函数的可重入性 int save_errno = errno; int msg = sig; send(pipefd[1], (char* )&msg, 1, 0); errno = save_errno; }
这里信号处理的方式只是通过管道向主循环发生信号,将信号交给主喜欢出现进行处理,以达到统一事件源的目的。
添加信号到信号处理函数:
void addsig(int sig) { //创建sigaction结构体变量 struct sigaction sa; memset(&sa, 0, sizeof(sa)); //信号处理函数中仅仅发送信号值,不做对应逻辑处理 sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; //将所有信号添加到信号集中 sigfillset(&sa.sa_mask); //执行sigaction函数 assert(sigaction(sig, &sa, nullptr) != -1); }
这里使用了 sigaction 系统调用来设置信号处理函数,相比于 signal 系统调用具有更强的健壮性。
信号通知逻辑:
- 使用socketpair创建管道,其中写端由信号处理函数写入信号值,主循环程序通过 epoll I/O复用监视读事件就绪时从管道的读端读取信号。
- 这里主要设置了对SIGTERM(kill -15)和SIGTINT(ctrl + C) 信号的处理,通过struct sigaction结构体和sigaction系统调用注册对信号的捕捉和处理。
- 利用epoll 监测管道读端的读事件就绪。
- 当主循环收到这两个信号时安全的退出程序。
具体逻辑代码如下:
epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); // 使用 socketpair 创建管道,注册 pipefd[0] 上的可读事件 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setNonBlock(pipefd[1]); addfd(epollfd, pipefd[0]); //设置一些信号的处理函数 addsig(SIGHUP); addsig(SIGCHLD); addsig(SIGTERM); addsig(SIGINT); bool stop_server = false; while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(number < 0 && errno != EINTR) { printf("epoll wait failure!\n"); break; } for(int i = 0; i < number; ++i) { int sockfd = events[i].data.fd; //如果就绪的文件描述符是listenfd, 则处理新连接 if(sockfd == listenfd) { sockaddr_in peer; socklen_t len = sizeof(peer); int connfd = accept(listenfd, (sockaddr*)&peer, &len); addfd(epollfd, connfd); } //如果就绪的文件描述符是pipefd[0],就处理信号 else if(sockfd == pipefd[0] && (events[i].events & EPOLLIN)) { int sig; char signals[1024]; ssize_t s = recv(pipefd[0], signals, sizeof(signals), 0); if(s == -1) { continue; } else if(s == 0) { continue; } else { //每个信号占一个字节,所以按照字节来逐个接收信号。此处以SIGTERM为例,来安全的终止服务器程序: for(int i = 0; i < s; ++i) { switch (signals[i]) { case SIGCHLD: case SIGHUP: { continue; } case SIGTERM: case SIGINT: { stop_server = true; } default: break; } } } } else { //其他I/O就绪 } } } printf("close fds\n"); close(listenfd); close(pipefd[0]); close(pipefd[1]);
运行结果:
运行程序,然后启动另一个终端使用pidof
查看该程序的PID,然后向该程序发送15号信号,即SIGTERM
,可以发现程序安全的