1 select
select本质上是通过设置或检查存放fd标志位的数据结构进行下一步处理。
这带来缺点:
单个进程可监视的fd数量被限制,即能监听端口的数量有限
单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是3232,同理64位机器上FD_SETSIZE为3264),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试
一般该数和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认1024个,64位默认2048。
对socket是线性扫描,即轮询,效率较低:
仅知道有I/O事件发生,却不知是哪几个流,只会无差异轮询所有流,找出能读数据或写数据的流进行操作。同时处理的流越多,无差别轮询时间越长 - O(n)。
当socket较多时,每次select都要通过遍历FD_SETSIZE
个socket,不管是否活跃,这会浪费很多CPU时间。如果能给 socket 注册某个回调函数,当他们活跃时,自动完成相关操作,即可避免轮询,这就是epoll与kqueue。
1.1 调用过程
asmlinkage long sys_poll(struct pollfd * ufds, unsigned int nfds, long timeout) { int i, j, fdcount, err; struct pollfd **fds; struct poll_wqueues table, *wait; int nchunks, nleft; /* Do a sanity check on nfds ... */ if (nfds > NR_OPEN) return -EINVAL; if (timeout) { /* Careful about overflow in the intermediate values */ if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ) timeout = (unsigned long)(timeout*HZ+999)/1000+1; else /* Negative or overflow */ timeout = MAX_SCHEDULE_TIMEOUT; } // 2. 注册回调函数__pollwait poll_initwait(&table); wait = &table; if (!timeout) wait = NULL; err = -ENOMEM; fds = NULL; if (nfds != 0) { fds = (struct pollfd **)kmalloc( (1 + (nfds - 1) / POLLFD_PER_PAGE) * sizeof(struct pollfd *), GFP_KERNEL); if (fds == NULL) goto out; } nchunks = 0; nleft = nfds; while (nleft > POLLFD_PER_PAGE) { /* allocate complete PAGE_SIZE chunks */ fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL); if (fds[nchunks] == NULL) goto out_fds; nchunks++; nleft -= POLLFD_PER_PAGE; } if (nleft) { /* allocate last PAGE_SIZE chunk, only nleft elements used */ fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL); if (fds[nchunks] == NULL) goto out_fds; } err = -EFAULT; for (i=0; i < nchunks; i++) // if (copy_from_user(fds[i], ufds + i*POLLFD_PER_PAGE, PAGE_SIZE)) goto out_fds1; if (nleft) { if (copy_from_user(fds[nchunks], ufds + nchunks*POLLFD_PER_PAGE, nleft * sizeof(struct pollfd))) goto out_fds1; } fdcount = do_poll(nfds, nchunks, nleft, fds, wait, timeout); /* OK, now copy the revents fields back to user space. */ for(i=0; i < nchunks; i++) for (j=0; j < POLLFD_PER_PAGE; j++, ufds++) __put_user((fds[i] + j)->revents, &ufds->revents); if (nleft) for (j=0; j < nleft; j++, ufds++) __put_user((fds[nchunks] + j)->revents, &ufds->revents); err = fdcount; if (!fdcount && signal_pending(current)) err = -EINTR; out_fds1: if (nleft) free_page((unsigned long)(fds[nchunks])); out_fds: for (i=0; i < nchunks; i++) free_page((unsigned long)(fds[i])); if (nfds != 0) kfree(fds); out: poll_freewait(&table); return err; }
static int do_poll(unsigned int nfds, unsigned int nchunks, unsigned int nleft, struct pollfd *fds[], struct poll_wqueues *wait, long timeout) { int count; poll_table* pt = &wait->pt; for (;;) { unsigned int i; set_current_state(TASK_INTERRUPTIBLE); count = 0; for (i=0; i < nchunks; i++) do_pollfd(POLLFD_PER_PAGE, fds[i], &pt, &count); if (nleft) do_pollfd(nleft, fds[nchunks], &pt, &count); pt = NULL; if (count || !timeout || signal_pending(current)) break; count = wait->error; if (count) break; timeout = schedule_timeout(timeout); } current->state = TASK_RUNNING; return count; }
- 使用copy_from_user从用户空间拷贝fd_set到内核空间
- 注册回调函数__pollwait
- 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或datagram_poll)
- 以tcp_poll为例,核心实现就是
__pollwait
,即上面注册的回调函数 - __pollwait,就是把current(当前进程)挂到设备的等待队列,不同设备有不同等待队列,如tcp_poll的等待队列是sk->sk_sleep(把进程挂到等待队列中并不代表进程已睡眠)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒。
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p) { struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt); struct poll_table_page *table = p->table; if (!table || POLL_TABLE_FULL(table)) { struct poll_table_page *new_table; new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL); if (!new_table) { p->error = -ENOMEM; __set_current_state(TASK_RUNNING); return; } new_table->entry = new_table->entries; new_table->next = table; p->table = new_table; table = new_table; } /* 添加新节点 */ { struct poll_table_entry * entry = table->entry; table->entry = entry+1; get_file(filp); entry->filp = filp; entry->wait_address = wait_address; init_waitqueue_entry(&entry->wait, current); add_wait_queue(wait_address,&entry->wait); } }
static void do_pollfd(unsigned int num, struct pollfd * fdpage, poll_table ** pwait, int *count) { int i; for (i = 0; i < num; i++) { int fd; unsigned int mask; struct pollfd *fdp; mask = 0; fdp = fdpage+i; fd = fdp->fd; if (fd >= 0) { struct file * file = fget(fd); mask = POLLNVAL; if (file != NULL) { mask = DEFAULT_POLLMASK; if (file->f_op && file->f_op->poll) mask = file->f_op->poll(file, *pwait); mask &= fdp->events | POLLERR | POLLHUP; fput(file); } if (mask) { *pwait = NULL; (*count)++; } } fdp->revents = mask; } }
- poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值
- 若遍历完所有fd,还没返回一个可读写的mask掩码,则调schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。若超过一定超时时间(schedule_timeout指定),还没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有无就绪的fd
- 把fd_set从内核空间拷贝到用户空间