epoll多路复用非阻塞模型
epoll多路复用技术相比select和poll更加高效,相比select和poll每次都轮询O(n),epoll每次返回k个有事件发生的fd,所以是O(k)的复杂度,或者说O(1)。epoll分为水平触发(LT)和垂直触发(ET),这两种方式下对fd的读写是很不一样的,这也是epoll编程的难点,现在很多网络库都是优先提供epoll作为多路复用的,如libev/libevent/muduo/boost.asio,还有一些组件如beanstalkd/nginx…
epoll_create(<#(int)__size#>)
epoll_ctl(<#(int)__epfd#>, <#(int)__op#>, <#(int)__fd#>, <#(struct epoll_event*)__event#>)\
epoll_wait(<#(int)__epfd#>, <#(struct epoll_event*)__events#>, <#(int)__maxevents#>, <#(int)__timeout#>)
//epoll-nonblocking LT
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#define READ_BUF_SIZE 20*1024*1024
#define WRITE_BUF_SIZE 20*1024*1024
#define CHUNCK_SIZE 2*1024*1024
#define KEEP_ALIVE 0
#define MAX_EVENTS 2048
struct epoll_fd_state {
char readbuf[READ_BUF_SIZE];
char writebuf[WRITE_BUF_SIZE];
ssize_t readlen;
ssize_t write_pos;
ssize_t write_upto;
int writing;
};
void run();
int main(int argc, char *argv[])
{
run();
return 0;
}
struct epoll_fd_state *alloc_epoll_fd_state()
{
struct epoll_fd_state *p = (struct epoll_fd_state *)malloc(sizeof(struct epoll_fd_state));
if (!p){
perror("error alloc_epoll_fd_state");
return NULL;
}
p->readlen = p->write_upto = p->write_pos = p->writing = 0;
return p;
}
void free_epoll_fd_state(struct epoll_fd_state *p)
{
free(p);
p = NULL;
}
//handle read event
int do_read(struct epoll_event ev, struct epoll_fd_state *state)
{
ssize_t result;
char buf[CHUNCK_SIZE];
while(1){
result = recv(ev.data.fd, buf, sizeof(buf), 0);
if (result<=0)
break;
int i;
for (i = 0; i < result; ++i) {
if (state->readlen < sizeof(state->readbuf)){
state->readbuf[state->readlen++] = buf[i];
}
printf("%c",buf[i]);
fflush(stdout);
//read until '\n'
/*
* todo: handle the readbuffer for http
*
*/
/*if (buf[i]=='\n'){
state->writing = 1;
state->written_upto = state->readlen;
pfd->events = POLLOUT;//register write event
break;
}*/
}
}
//change state to write
state->writing = 1;
state->write_upto = state->readlen;
printf("readlen result:%d\n",(int)result);
fflush(stdout);
if (result==0)
return 1;
if (result<0){
if (errno== EAGAIN)
return 0;
else
return -1;
}
return 0;
}
//handle write event
int do_write(struct epoll_event ev, struct epoll_fd_state *state) {
ssize_t result;
while (state->write_pos < state->write_upto) {
result = send(ev.data.fd, state->readbuf + state->write_pos, CHUNCK_SIZE, 0);
if (result <= 0)
break;
state->write_pos += result;
}
if (state->write_pos == state->write_upto)
state->write_pos = state->write_upto = state->readlen = 0;
state->writing = 0;
printf("writelen result:%d",(int)result);
fflush(stdout);
if (result == 0)
return 1;
if (result < 0) {
if (errno == EAGAIN)
return 0;
else
return -1;
}
return 0;
}
int create_server_socket()
{
int fd;
int reuse = 1;
if ((fd= socket(AF_INET, SOCK_STREAM, 0))<0){
perror("error create socket");
return fd;
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))<0){
perror("error setsockopt");
}
return fd;
}
void set_socket_nonblocking(int fd)
{
if ((fd, F_SETFL, O_NONBLOCK)<0){
perror("error set nonblocking");
exit(EXIT_FAILURE);
}
}
void do_epoll(int serverfd)
{
int epollfd = epoll_create(2048);
if (epollfd<0){
perror("error epoll_create");
exit(EXIT_FAILURE);
}
struct epoll_event ev;
ev.data.fd = serverfd;
ev.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, serverfd, &ev)<0){
perror("error epoll_ctl");
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
struct epoll_fd_state *fds_state[MAX_EVENTS];
int epoll_ret;
int clientfd;
//epoll loop
while(1){
epoll_ret = epoll_wait(epollfd, events, MAX_EVENTS, 0);
if (epoll_ret<0){
perror("error epoll_wait");
exit(EXIT_FAILURE);
}
int i,j;
//check writing state
for (j = 0; j < epoll_ret; ++j) {
if (events[j].data.fd!=serverfd && fds_state[j]&&fds_state[j]->writing==1){
/*printf("write ready:%d",events[j].data.fd);
fflush(stdout);*/
ev.data.fd = events[j].data.fd;
ev.events = EPOLLOUT;
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, events[j].data.fd, &ev)<0){
perror("error epoll_ctl add epollout");
exit(EXIT_FAILURE);
}
}
}
//handle server and client sock events
for (i = 0; i < epoll_ret; ++i) {
int flag = 0;
//server socket reay?
if (events[i].data.fd==serverfd){
struct sockaddr_in client;
socklen_t slen = sizeof(client);
if ((events[i].events& EPOLLIN) == EPOLLIN){
clientfd = accept(events[i].data.fd, (struct sockaddr *)&client, &slen);
if (clientfd<0){
perror("error accept");
exit(EXIT_FAILURE);
}
set_socket_nonblocking(clientfd);
ev.data.fd = clientfd;
ev.events = EPOLLIN;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &ev)<0){
perror("error epoll_ctl client");
exit(EXIT_FAILURE);
}
struct epoll_fd_state *temp = alloc_epoll_fd_state();
if(!temp){
exit(EXIT_FAILURE);
}
fds_state[i] = temp;
}
}else{
if ((events[i].events& EPOLLIN)== EPOLLIN){
fprintf(stdout, "in do_read\n");
fflush(stdout);
flag = do_read(events[i], fds_state[i]);
printf("read flag:%d\n",flag);
fflush(stdout);
}
if (flag==0&&((events[i].events==EPOLLOUT)== EPOLLOUT)){
fprintf(stdout, "in do_write\n");
fflush(stdout);
flag = do_write(events[i], fds_state[i]);
if (!KEEP_ALIVE){
free_epoll_fd_state(fds_state[i]);
fds_state[i] = NULL;
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, events)<0){
perror("error epoll_ctl delete event");
exit(EXIT_FAILURE);
}
close(events[i].data.fd);
}
}
}
if (flag){
fprintf(stdout, "in error handle\n");
printf("read error flag:%d\n",flag);
fflush(stdout);
free_epoll_fd_state(fds_state[i]);
fds_state[i] = NULL;
close(events[i].data.fd);
}
}
}
}
void run()
{
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(8000);
int serverfd = create_server_socket();
set_socket_nonblocking(serverfd);
if (serverfd<0){
exit(EXIT_FAILURE);
}
if (bind(serverfd, (struct sockaddr *)&sin, sizeof(sin))<0){
perror("error bind");
exit(EXIT_FAILURE);
}
if (listen(serverfd, 20)<0){
perror("error listen");
exit(EXIT_FAILURE);
}
do_epoll(serverfd);
}