10、main.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/resource.h> /*setrlimit */ #include <unistd.h> #include <string.h> #include <fcntl.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <signal.h> #include <pthread.h> #include "mytimer.h" #include "client.h" #define IPADDRESS "127.0.0.1" #define PORT 1883 #define LISTENQ 512 #define FDSIZE 80000 #define EPOLLEVENTS 100 #define CONFIG_MIN_RESERVED_FDS 32 //come from redis src #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) #define CLIENT_TIMEOUT 60 * 1000//ms int stop_server = 0; int timer_id = 0; //函数声明 //创建套接字并进行绑定 static int socket_bind(const char* ip,int port); //IO多路复用epoll static void do_epoll(int listenfd); //事件处理函数 static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd); //处理接收到的连接 static void handle_accpet(int epollfd,int listenfd); //读处理 static void do_read(int epollfd,int fd); //写处理 static void do_write(int epollfd,int fd); //添加事件 static void add_event(int epollfd,int fd,int state); //修改事件 static void modify_event(int epollfd,int fd,int state); //删除事件 static void delete_event(int epollfd,int fd,int state); //other static int do_error(int fd, int *error); static int setnonblocking(int fd); static void daemonize(void); static int set_fdlimit(); static void signal_exit_handler(); static void signal_exit_func(int signo); static void handle_timer(); static int timerfun_callback(int arg); static int user_read(client_t *client, buffer_t *rbuffer); static int user_write(client_t *client, unsigned char* data, int len); int main(int argc,char *argv[]) { //设置每个进程允许打开的最大文件数,socket if (set_fdlimit() < 0) { return -1; } int background = 0; if (background) { daemonize(); } //设置信号处理,SIG_IGN表示忽略信号,SIG_DFL表示使用信号的默认处理方式 //signal(SIGHUP, SIG_IGN); //开启的话,就捕获不到终端窗口关闭的信号了。即窗口关闭,进程仍然进行。 signal(SIGPIPE, SIG_IGN); /* if (argc != 2) { fprintf(stderr, "Usage: %s port\n", argv[0]); return 1; } int port = atoi(argv[1]);*/ create_fileEvent(FDSIZE + CONFIG_FDSET_INCR); int listenfd; listenfd = socket_bind(IPADDRESS,PORT); listen(listenfd,LISTENQ); printf("start listening...\n"); signal_exit_handler(); timer_init(); do_epoll(listenfd); timer_destroy(); destroy_fileEvent(FDSIZE + CONFIG_FDSET_INCR); return 0; } static int socket_bind(const char* ip,int port) { int listenfd; struct sockaddr_in servaddr; listenfd = socket(AF_INET,SOCK_STREAM,0); if (listenfd == -1) { perror("socket error:"); exit(1); } bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; //inet_pton(AF_INET,ip,&servaddr.sin_addr); servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int error; int reuse = 1; int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); if (ret == -1) { return do_error(listenfd, &error); } if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) { perror("bind error: "); exit(1); } return listenfd; } static void do_epoll(int listenfd) { int epollfd; struct epoll_event events[EPOLLEVENTS]; int ret; //创建一个描述符 int error; epollfd = epoll_create(1024);//1024 is just a hint for the kernel if (epollfd == -1) { return do_error(epollfd, &error); } //添加监听描述符事件 add_event(epollfd,listenfd,EPOLLIN); struct event *event; //struct timeval now; struct timeval tv; struct timeval *tvp = NULL; while ( stop_server == 0 ) { if ((event = timer_top()) != NULL) { long now_sec, now_ms;//come from redis src "ae.c", int aeProcessEvents(aeEventLoop *eventLoop, int flags) aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (event->ev_timeout.tv_sec - now_sec) * 1000 + (event->ev_timeout.tv_usec / 1000 - now_ms); if (ms > 0) { tvp->tv_sec = ms / 1000; tvp->tv_usec = (ms % 1000) * 1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } /* maybe error gettime(&now); tv.tv_sec = event->ev_timeout.tv_sec - now.tv_sec;; tv.tv_usec = event->ev_timeout.tv_usec - now.tv_usec; if ( tv.tv_usec < 0 ) { tv.tv_usec += 1000000; tv.tv_sec--; printf("tv.tv_usec < 0\n"); } tvp = &tv; */ } else { tvp = NULL; printf("tvp == NULL\n"); } //获取已经准备好的描述符事件 if (tvp == NULL) { ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1); } else { printf("timer_wait:%d\n", tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms ret = epoll_wait(epollfd, events, EPOLLEVENTS, tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms } handle_events(epollfd,events,ret,listenfd); handle_timer(); } close(epollfd); } static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd) { int i; int fd; //进行选好遍历 for (i = 0;i < num;i++) { fd = events[i].data.fd; //根据描述符的类型和事件类型进行处理 if ((fd == listenfd) &&(events[i].events & EPOLLIN)) handle_accpet(epollfd,listenfd); else if (events[i].events & EPOLLIN) do_read(epollfd,fd); else if (events[i].events & EPOLLOUT) do_write(epollfd,fd); } } static void handle_timer() { timer_process(); } static void handle_accpet(int epollfd,int listenfd) { int clifd; struct sockaddr_in cliaddr; socklen_t cliaddrlen = sizeof(cliaddr); clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen); if (clifd == -1) perror("accpet error:"); else { printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port); client_t *client = alloc_client(); if (!client) { printf("alloc client error...close socket\n"); close(clifd); return; } client->fd = clifd; client->epollfd = epollfd; client->timerId = timer_id; fileev[clifd].clientData = client; //添加一个客户描述符和事件 add_event(epollfd,clifd,EPOLLIN); //add timer timer_add(timer_id, 1000, timerfun_callback, clifd, CYCLE_TIMER, 0);//ms timer_id++; } } static void do_read(int epollfd,int fd) { client_t *client = fileev[fd].clientData; buffer_t *rbuffer = client->read_buffer; check_buffer_size(rbuffer, DEFAULT_BUFF_SIZE / 2); size_t avlid_size = rbuffer->size - rbuffer->write_idx; //ssize_t readn = anetRead(fd, rbuffer->buff + rbuffer->write_idx, avlid_size); //不能调用anetRead这个函数 //1.客户端下线不好判断 //2.该函数适合linux epoll是边缘模式(ET),数据一定要一次性收完,anetRead里面有while循环 //3.redis源码自身也没有调用anetRead //把读到的网络数据写入活塞缓存 ssize_t readn = read(fd, rbuffer->buff + rbuffer->write_idx, avlid_size); if (readn > 0) { rbuffer->write_idx += readn; user_read(client, rbuffer); client->last_recv_tick = get_tick_count(); } else if (readn == 0) { printf("fd=%d, client disconnect, close it.\n", client->fd); delete_event(epollfd,fd,EPOLLIN); delete_event(epollfd,fd,EPOLLOUT); timer_remove(client->timerId); free_client(client); } else if (readn == -1) { if (errno == EAGAIN) { return; } else { printf("read error,%s.\n", strerror(errno)); delete_event(epollfd,fd,EPOLLIN); delete_event(epollfd,fd,EPOLLOUT); timer_remove(client->timerId); free_client(client); } } } static void do_write(int epollfd,int fd) { client_t *client = fileev[fd].clientData; buffer_t *wbuffer = client->write_buffer; int data_size = (int)get_readable_size(wbuffer); if (data_size == 0) { delete_event(epollfd,fd,EPOLLOUT); return; } //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size); int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size); if (writen > 0) { wbuffer->read_idx += writen; } else if (writen == 0) { printf("Writing 0\n"); } else { //-1 if (errno != EWOULDBLOCK) { printf("Writing error: %s\n", strerror(errno)); } else { printf("Writing EWOULDBLOCK\n"); } } if (get_readable_size(wbuffer) == 0) { delete_event(epollfd,fd,EPOLLOUT); } } static int user_read(client_t *client, buffer_t *rbuffer) { size_t len = get_readable_size(rbuffer); unsigned char data[DEFAULT_BUFF_SIZE]; if (len > DEFAULT_BUFF_SIZE) { len = DEFAULT_BUFF_SIZE; } //把活塞缓存读取出来,作为用户数据 memcpy(data, rbuffer->buff + rbuffer->read_idx, len); rbuffer->read_idx += len; int i = 0; for (i = 0; i < len; i++) { printf("%c", data[i]); } printf("\n"); user_write(client, data, len);//echo return 0; } static int user_write(client_t *client, unsigned char* data, int len) { //把用户数据写入活塞缓存 buffer_t *wbuffer = client->write_buffer; check_buffer_size(wbuffer, len); memcpy((char *)wbuffer->buff + wbuffer->write_idx, data, len); wbuffer->write_idx += len; //把活塞缓存的有效数据通过网络发送出去 //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer)); int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer)); if (writen > 0) { wbuffer->read_idx += writen; } else if (writen == 0) { printf("Writing 0\n"); } else { //-1 if (errno != EWOULDBLOCK) { printf("Writing error: %s\n", strerror(errno)); } else { printf("Writing EWOULDBLOCK\n"); } } //如果writen==-1,表示当前tcp窗口容量不够,需要等待下次机会再发,errno == EWOULDBLOCK //因为活塞缓存的有效数据没有发完,遗留部分需要再给机会 if (get_readable_size(wbuffer) != 0) { /* if (aeCreateFileEvent(client->loop, client->fd, AE_WRITABLE, writeEventHandler, client) == AE_ERR) { printf("create socket writeable event error, close it.\n"); free_client(client); }*/ //modify_event(client->epollfd,client->fd,EPOLLOUT); add_event(client->epollfd,client->fd,EPOLLOUT); } return 0; } static void add_event(int epollfd,int fd,int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev); setnonblocking(fd); } static void delete_event(int epollfd,int fd,int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev); } static void modify_event(int epollfd,int fd,int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev); } static int do_error(int fd, int *error) { fprintf(stderr, "error: %s\n", strerror(errno)); *error = errno; while ((close(fd) == -1) && (errno == EINTR)); errno = *error; return 1; } static int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } static void daemonize(void) { //come from /redis/server.c/daemonize() int fd; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ /* Every output goes to /dev/null. If Redis is daemonized but * the 'logfile' is set to 'stdout' in the configuration file * it will not log at all. */ if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } } static int set_fdlimit() { //设置每个进程允许打开的最大文件数 //这项功能等价于linux终端命令 "ulimit -n 102400" struct rlimit rt; rt.rlim_max = rt.rlim_cur = FDSIZE + CONFIG_FDSET_INCR; if (setrlimit(RLIMIT_NOFILE, &rt) == -1) { perror("setrlimit error"); return -1; } return 0; } static void signal_exit_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_handler = signal_exit_func; sigaction(SIGINT, &sa, NULL);//当按下ctrl+c时,它的效果就是发送SIGINT信号 sigaction(SIGTERM, &sa, NULL);//kill pid sigaction(SIGQUIT, &sa, NULL);//ctrl+\代表退出SIGQUIT //SIGSTOP和SIGKILL信号是不可捕获的,所以下面两句话写了等于没有写 sigaction(SIGKILL, &sa, NULL);//kill -9 pid sigaction(SIGSTOP, &sa, NULL);//ctrl+z代表停止 //#define SIGTERM 15 //#define SIGKILL 9 //kill和kill -9,两个命令在linux中都有杀死进程的效果,然而两命令的执行过程却大有不同,在程序中如果用错了,可能会造成莫名其妙的现象。 //执行kill pid命令,系统会发送一个SIGTERM信号给对应的程序。 //执行kill -9 pid命令,系统给对应程序发送的信号是SIGKILL,即exit。exit信号不会被系统阻塞,所以kill -9能顺利杀掉进程。 } static void signal_exit_func(int signo) { printf("exit signo is %d\n", signo); stop_server = 1; } static int timerfun_callback(int arg) { client_t *client = fileev[arg].clientData; assert(arg == client->fd); printf("timer_id=%d, fd=%d\n", client->timerId, client->fd); //心跳机制:定时检测,如果没有数据来则踢除客户端 uint64_t curr_tick = get_tick_count(); if (curr_tick > client->last_recv_tick + CLIENT_TIMEOUT) { printf("timeout, fd=%d\n", client->fd); //timer_remove(arg);//不能在这里删除timer,因为是回调函数,timer其实已经先pop掉了,我们要做的是不让它再次加入堆 delete_event(client->epollfd, client->fd, EPOLLIN); delete_event(client->epollfd, client->fd, EPOLLOUT); free_client(client); return 0;//kill timer,返回值为0,目的是不让它再次加入堆 } return 1; }