在上一篇中,我们主要讨论了Windows下关于完成端口的一些知识。对应于完成端口,Linux下面在2.5.44内核中有了epoll,这个是为处理大批量句柄而引进的。
先来看看为什么要引进epoll以及它带来的好处。
在Linux内核中,原有的select所用到的FD_SET是有限的,在内核中的参数_FD_SETSIZE来设置的。如果想要同时检测1025个句柄的可读(或可写)状态,则select无法满足。而且,而且select是采用轮询方法进行检测的,也就是说每次检测都要遍历所有FD_SET中的句柄。显然,当随着FD_SET中的句柄数的增多,select的效率会不断的下降。如今的服务器,都是要满足上万甚至更多的连接的,显然想要更高效的实现这一要求,必须采用新的方法。于是,不断的修改后,终于形成了稳定的epoll。
epoll优点:(1)支持大数量的socket描述符(FD)。举个例子来说,在1GB内存的机器上大约可以打开10万个左右的socket,这个数字足以满足一般的服务器需求。(2)epoll的IO效率不会随着FD数量的增加而线性下降(多少肯定会下降的)。至于原理,可以查阅epoll的实现原理;(3)使用mmap加速内核与用户空间的消息传递,无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。当然epoll还有其他一些优点,这里就不一一列举了。
下面来重点说说epoll的使用,这也是大家最关心的部分。在2.6内核中epoll变的简洁而强大。
先来弄清楚一个概念,即epoll的2种工作方式:LT和ET。
LT(level triggered)是缺省的工作方式,同时支持block和non-block。其实这个有点像电路里面的电平触发方式。在这种模式下,内核会告诉你一个文件描述符fd就绪了,然后你就可以对这个fd进行IO操作。如果你不做任何操作,内核会继续通知你。所以,假如你读取数据没有读取完时,内核会继续通知你。其实传统的select/poll就是这种模式。
ET(edge-triggered)是告诉工作方式,只支持non-block。其实这个有点像电路里面的边沿触发方式。这个是高效服务器必选的方式。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll通知你。然后对这个fd只通知你一次,因为之后一直为就绪态,没有了状态的变化,直到你做了某些操作导致了那个fd不再为就绪态。但是注意,如果一直不对这个fd进行IO操作,内核不会发送更多的通知。
在弄清楚了上述两种模式之后,接下来就可以使用epoll了。
主要用到三个函数epoll_create,epoll_ctl,epoll_wait。
int epoll_create(int size);
创建一个
epoll的句柄,
size用来告诉内核这个监听的数目一共有多大。当创建好
epoll句柄后,它就是会占用一个
fd值,所以在使用完
epoll后,必须调用
close()关闭,否则可能导致
fd被耗尽。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll
的事件注册函数第一个参数是
epoll_create()
的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的
fd
,第四个参数是告诉内核需要监听什么事,
struct epoll_event
结构如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events
可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带
外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水
平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要
继续监听这个socket的话,需要再次把这个socket加入
到EPOLL队列里
int epoll_wait(int epfd, struct epoll_event * events,
int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
下面给出epoll编写服务器的模型:
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MAXSIZE 64000
#define MAXEPS 256
//#define EVENTS 100
#define LISTENQ 32
#define SERV_PORT 8000
int setnonblock(int sock)
{
int flags = fcntl(sock, F_GETFL, 0);
if(-1 == flags) {
perror("fcntl(sock, F_GETFL)");
return -1;
}
flags |= O_NONBLOCK;
if(-1 == fcntl(sock, F_SETFL, flags)) {
perror("fcntl(sock, F_SETFT, flags)");
return -2;
}
return 0;
}
int main(int argc, char *argv[])
{
int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
ssize_t n;
char buf[MAXSIZE];
socklen_t clilen;
struct epoll_event ev, events[20];
epfd = epoll_create(MAXEPS);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
setnonblock(listenfd);
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
/*
char *local_addr = "10.0.2.15";
inet_aton(local_addr, &(serveraddr.sin_addr));
*/
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(SERV_PORT);
if(bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) != 0) {
perror("bind failed");
return -1;
}
if(listen(listenfd, LISTENQ) != 0) {
perror("listen failed");
return -2;
}
maxi = 0;
printf("began to accept...\n");
for(;;) {
nfds = epoll_wait(epfd, events, 32, 10000);
if(-1 == m_nfds) {
if(EINTR == errno) {
continue;
}
return -1;
}
for(i=0; i<nfds; ++i) {
if(events[i].data.fd == listenfd) {
connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen);
if(connfd < 0) {
perror("accept failed");
return -3;
}
printf("accepted..\n");
setnonblock(connfd);
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}
else if(events[i].events & EPOLLIN) {
if((sockfd = events[i].data.fd) < 0)
continue;
if((n = read(sockfd, buf, MAXSIZE)) < 0) {
if(errno == ECONNRESET) {
close(sockfd);
events[i].data.fd = -1;
} else {
perror("read failed");
}
}
else if(0 == n) {
close(sockfd);
events[i].data.fd = -1;
}
printf("Read the buf: %s\n", buf);
ev.data.fd = sockfd;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
else if(events[i].events & EPOLLOUT) {
sockfd = events[i].data.fd;
char *sndbuf = "I get your message!";
write(sockfd,sndbuf, 10);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
}
}
return 0;
}
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MAXSIZE 64000
#define MAXEPS 256
//#define EVENTS 100
#define LISTENQ 32
#define SERV_PORT 8000
int setnonblock(int sock)
{
int flags = fcntl(sock, F_GETFL, 0);
if(-1 == flags) {
perror("fcntl(sock, F_GETFL)");
return -1;
}
flags |= O_NONBLOCK;
if(-1 == fcntl(sock, F_SETFL, flags)) {
perror("fcntl(sock, F_SETFT, flags)");
return -2;
}
return 0;
}
int main(int argc, char *argv[])
{
int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
ssize_t n;
char buf[MAXSIZE];
socklen_t clilen;
struct epoll_event ev, events[20];
epfd = epoll_create(MAXEPS);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
setnonblock(listenfd);
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
/*
char *local_addr = "10.0.2.15";
inet_aton(local_addr, &(serveraddr.sin_addr));
*/
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(SERV_PORT);
if(bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) != 0) {
perror("bind failed");
return -1;
}
if(listen(listenfd, LISTENQ) != 0) {
perror("listen failed");
return -2;
}
maxi = 0;
printf("began to accept...\n");
for(;;) {
nfds = epoll_wait(epfd, events, 32, 10000);
if(-1 == m_nfds) {
if(EINTR == errno) {
continue;
}
return -1;
}
for(i=0; i<nfds; ++i) {
if(events[i].data.fd == listenfd) {
connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen);
if(connfd < 0) {
perror("accept failed");
return -3;
}
printf("accepted..\n");
setnonblock(connfd);
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}
else if(events[i].events & EPOLLIN) {
if((sockfd = events[i].data.fd) < 0)
continue;
if((n = read(sockfd, buf, MAXSIZE)) < 0) {
if(errno == ECONNRESET) {
close(sockfd);
events[i].data.fd = -1;
} else {
perror("read failed");
}
}
else if(0 == n) {
close(sockfd);
events[i].data.fd = -1;
}
printf("Read the buf: %s\n", buf);
ev.data.fd = sockfd;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
else if(events[i].events & EPOLLOUT) {
sockfd = events[i].data.fd;
char *sndbuf = "I get your message!";
write(sockfd,sndbuf, 10);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
}
}
return 0;
}
个人总结:
一般epoll服务器模型基本就是上面那个。有人你加入线程池来进行处理,即在epoll-wait和收发处理分开到两个线程中,这样在处理收发数据时,仍然不影响epoll进行监听。也有人建议不使用线程池,因为线程切换等带来的开销不亚于数据处理。这里本人没有进行测试,不便下结论。
主要先想说的,很多在上述模型之外,还需要注意的就是如何读写的问题。因为上述模型只是小量的数据收发,比较简单。
由于epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。那么如何保证读取数据或者是发送数据都已经结束了呢?(即全部读完或全部发送)。
读数据的时候需要考虑的是当read()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取,直到返回的大小小于请求的大小。也可以在一个while(true)循环当中不断的read,直到返回EAGAIN位置。建议使用前一种方法。因为很长时间没有给对方回复后,对方可能会认为此次数据包丢失,接着再次发送同样的数据包,服务器此时可以控制退出此次读取,重新读取。
同理,如果发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send()函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误,同时不理会这次请求发送的数据。这时就需要你对send进行一些改动,等待片刻后继续发送,并检查看发送的数据量是否正确。
本文转自jazka 51CTO博客,原文链接:http://blog.51cto.com/jazka/252620,如需转载请自行联系原作者