example_firecat.h
#ifndef ANET_SERVER_H #define ANET_SERVER_H //https://github.com/antirez/redis //https://github.com/meili/TeamTalk #include "anet.h" #include "ae.h" #include "config.h" #include "../buffer.h" #include <time.h> #include <sys/time.h> #include <stdint.h> //eg. uint64_t typedef struct { aeEventLoop *loop; int listen_fd; int port; int tcp_backlog; int maxclients; int curclients; char err_info[ANET_ERR_LEN]; } server_t; typedef struct { aeEventLoop *loop; int fd; int timerId; uint64_t last_recv_tick; buffer_t *read_buffer; buffer_t *write_buffer; } client_t; void init_server(server_t *server); void wait_server(server_t *server); uint64_t get_tick_count(); #endif //ANET_SERVER_H
example_firecat.c
#include "example_firecat.h" #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include <bits/sigaction.h> #include <sys/resource.h> /*setrlimit */ #include <fcntl.h> //daemonize #define TIMING_CYCLE_SINGLE 5000//ms #define TIMING_CYCLE_ALL 60000//ms #define CLIENT_TIMEOUT 30000//ms #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define MAX_ACCEPTS_PER_CALL 1000 /* When configuring the server eventloop, we setup it so that the total number * of file descriptors we can handle are server.maxclients + RESERVED_FDS + * a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96 * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) #define CONFIG_DEFAULT_SERVER_PORT 1883 #define CONFIG_DEFAULT_TCP_BACKLOG 511 #define CONFIG_DEFAULT_MAX_CLIENTS 100000 #define UNUSED(V) ((void) V) client_t *alloc_client() { client_t * client = zmalloc(sizeof(client_t)); if (client == NULL) { goto err; } client->loop = NULL; client->fd = -1; client->timerId = -1; client->last_recv_tick = get_tick_count(); client->read_buffer = alloc_buffer(); client->write_buffer = alloc_buffer(); if (client->read_buffer == NULL || client->write_buffer == NULL) { goto err; } return client; err: if (client) { free_client(client); } return NULL; } void free_client(client_t *client) { if (client) { if (client->fd > 0) { aeDeleteFileEvent(client->loop, client->fd, AE_READABLE); aeDeleteFileEvent(client->loop, client->fd, AE_WRITABLE); aeDeleteTimeEvent(client->loop, client->timerId); close(client->fd); } free_buffer(client->read_buffer); free_buffer(client->write_buffer); zfree(client); } } static int onTimer_single(struct aeEventLoop *loop, long long id, void *data) { UNUSED(loop); UNUSED(id); client_t *client = (client_t *)data; printf("onTimer_single, fd=%d, timerId=%d\n", client->fd, client->timerId); //int ret = aeDeleteTimeEvent(client->loop, client->timerId); //printf("ret=%d\n", ret); //unsigned char data1[] = {"hello world"}; //user_write(client, data1, sizeof(data1)); //心跳机制:定时检测,如果没有数据来则踢除客户端 uint64_t curr_tick = get_tick_count(); if (curr_tick > client->last_recv_tick + CLIENT_TIMEOUT) { free_client(client); } return TIMING_CYCLE_SINGLE;//1000ms,第N次的定时时间,N<>1 } static int onTimer_all(struct aeEventLoop *loop, long long id, void *data) { UNUSED(loop); UNUSED(data); printf("onTimer_all, timerId=%d\n", id); return TIMING_CYCLE_ALL;//1000ms,第N次的定时时间,N<>1 } uint64_t get_tick_count() //come from /teamtalk/util.cpp { #ifdef _WIN32 LARGE_INTEGER liCounter; LARGE_INTEGER liCurrent; if (!QueryPerformanceFrequency(&liCounter)) return GetTickCount(); QueryPerformanceCounter(&liCurrent); return (uint64_t)(liCurrent.QuadPart * 1000 / liCounter.QuadPart); #else struct timeval tval; uint64_t ret_tick; gettimeofday(&tval, NULL); ret_tick = tval.tv_sec * 1000L + tval.tv_usec / 1000L; return ret_tick; #endif } static void writeEventHandler(aeEventLoop *loop, int fd, void *data, int mask) { client_t *client = (client_t *)data; buffer_t *wbuffer = client->write_buffer; int data_size = (int)get_readable_size(wbuffer); if (data_size == 0) { aeDeleteFileEvent(client->loop, client->fd, AE_WRITABLE); return; } //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size); int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size); if (writen > 0) { wbuffer->read_idx += writen; } else if (writen == 0) { printf("Writing 0\n"); } else { //-1 if (errno != EWOULDBLOCK) { printf("Writing error: %s\n", strerror(errno)); } else { printf("Writing EWOULDBLOCK\n"); } } if (get_readable_size(wbuffer) == 0) { aeDeleteFileEvent(client->loop, client->fd, AE_WRITABLE); } } static void readEventHandler(aeEventLoop *loop, int fd, void *data, int mask) { UNUSED(loop); UNUSED(mask); client_t *client = (client_t *)data; buffer_t *rbuffer = client->read_buffer; check_buffer_size(rbuffer, DEFAULT_BUFF_SIZE / 2); size_t avlid_size = rbuffer->size - rbuffer->write_idx; //ssize_t readn = anetRead(fd, rbuffer->buff + rbuffer->write_idx, avlid_size); //不能调用anetRead这个函数 //1.客户端下线不好判断 //2.该函数适合linux epoll是边缘模式(ET),数据一定要一次性收完,anetRead里面有while循环 //3.redis源码自身也没有调用anetRead //把读到的网络数据写入活塞缓存 ssize_t readn = read(fd, rbuffer->buff + rbuffer->write_idx, avlid_size); if (readn > 0) { rbuffer->write_idx += readn; user_read(client, rbuffer); client->last_recv_tick = get_tick_count(); } else if (readn == 0) { printf("fd=%d, client disconnect, close it.\n", client->fd); free_client(client); } else if (readn == -1) { if (errno == EAGAIN) { return; } else { printf("read error,%s.\n", strerror(errno)); free_client(client); } } } static void acceptTcpHandler(aeEventLoop *loop, int fd, void *data, int mask) { UNUSED(mask); server_t *server = (server_t *)data; char cip[NET_IP_STR_LEN]; int cfd; int cport; int max = MAX_ACCEPTS_PER_CALL; while (max--) //come from /redis/networking.c/acceptTcpHandler() { cfd = anetTcpAccept(server->err_info, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) { printf("Accepting client connection: %s\n", server->err_info); } else { //printf("Accepting EWOULDBLOCK\n"); } return; } printf("accepted ip: %s:%d\n", cip, cport); acceptCommonHandler(loop, server, cfd); } /* int cfd = anetTcpAccept(server->err_info, fd, cip, sizeof(cip), &cport); if (cfd != -1) { printf("accepted ip: %s:%d\n", cip, cport); anetNonBlock(NULL, cfd); anetEnableTcpNoDelay(NULL, cfd); client_t *client = alloc_client(); if (!client) { printf("alloc client error...close socket\n"); close(cfd); return; } client->loop = loop; client->fd = cfd; if (aeCreateFileEvent(loop, cfd, AE_READABLE, readEventHandler, client) == AE_ERR) { if (errno == ERANGE) { // or use aeResizeSetSize(server->loop, cfd) modify this limit printf("so many client, close new.\n"); } else { printf("create socket readable event error, close it.\n"); } free_client(client); } client->timerId = loop->timeEventNextId; if (aeCreateTimeEvent(loop, 1, onTimer_single, client, NULL) == AE_ERR) { //1ms,第1次的定时启动时间 printf("Can't create event loop timers.\n"); } }*/ } void acceptCommonHandler(aeEventLoop *loop, server_t *server, int cfd) { anetNonBlock(NULL, cfd); anetEnableTcpNoDelay(NULL, cfd); client_t *client = alloc_client(); if (!client) { printf("alloc client error...close socket\n"); close(cfd); return; } // If maxclient directive is set and this is one client more... close the connection. if (server->curclients > server->maxclients) { free_client(client); return; } server->curclients++;//note:where is server->curclients-- ? client->loop = loop; client->fd = cfd; if (aeCreateFileEvent(loop, cfd, AE_READABLE, readEventHandler, client) == AE_ERR) { if (errno == ERANGE) { // or use aeResizeSetSize(server->loop, cfd) modify this limit printf("so many client, close new.\n"); } else { printf("create socket readable event error, close it.\n"); } free_client(client); } client->timerId = loop->timeEventNextId; //if (aeCreateTimeEvent(loop, 1, onTimer_single, client, NULL) == AE_ERR) { //1ms,第1次的定时启动时间 // printf("Can't create event loop timers.\n"); //} } int user_read(client_t *client, buffer_t *rbuffer) { size_t len = get_readable_size(rbuffer); unsigned char data[DEFAULT_BUFF_SIZE]; if (len > DEFAULT_BUFF_SIZE) { len = DEFAULT_BUFF_SIZE; } //把活塞缓存读取出来,作为用户数据 memcpy(data, rbuffer->buff + rbuffer->read_idx, len); rbuffer->read_idx += len; for (int i = 0; i < len; i++) { printf("%c", data[i]); } printf("\n"); user_write(client, data, len); return 0; } int user_write(client_t *client, unsigned char* data, int len) { //把用户数据写入活塞缓存 buffer_t *wbuffer = client->write_buffer; check_buffer_size(wbuffer, len); memcpy((char *)wbuffer->buff + wbuffer->write_idx, data, len); wbuffer->write_idx += len; //把活塞缓存的有效数据通过网络发送出去 //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer)); int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer)); if (writen > 0) { wbuffer->read_idx += writen; } else if (writen == 0) { printf("Writing 0\n"); } else { //-1 if (errno != EWOULDBLOCK) { printf("Writing error: %s\n", strerror(errno)); } else { printf("Writing EWOULDBLOCK\n"); } } //如果writen==-1,表示当前tcp窗口容量不够,需要等待下次机会再发,errno == EWOULDBLOCK //因为活塞缓存的有效数据没有发完,遗留部分需要再给机会 if (get_readable_size(wbuffer) != 0) { if (aeCreateFileEvent(client->loop, client->fd, AE_WRITABLE, writeEventHandler, client) == AE_ERR) { printf("create socket writeable event error, close it.\n"); free_client(client); } } return 0; } void daemonize(void) { //come from /redis/server.c/daemonize() int fd; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ /* Every output goes to /dev/null. If Redis is daemonized but * the 'logfile' is set to 'stdout' in the configuration file * it will not log at all. */ if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } } /* This function will try to raise the max number of open files accordingly to * the configured max number of clients. It also reserves a number of file * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of * persistence, listening sockets, log files and so forth. * * If it will not be possible to set the limit accordingly to the configured * max number of clients, the function will do the reverse setting * server.maxclients to the value that we can actually handle. */ void adjustOpenFilesLimit(server_t *server) { //come from /redis/server.c/adjustOpenFilesLimit() rlim_t maxfiles = server->maxclients+CONFIG_MIN_RESERVED_FDS; struct rlimit limit; if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { printf("Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", strerror(errno)); server->maxclients = 1024-CONFIG_MIN_RESERVED_FDS; } else { rlim_t oldlimit = limit.rlim_cur; /* Set the max number of files if the current limit is not enough * for our needs. */ if (oldlimit < maxfiles) { rlim_t bestlimit; int setrlimit_error = 0; /* Try to set the file limit to match 'maxfiles' or at least * to the higher value supported less than maxfiles. */ bestlimit = maxfiles; while(bestlimit > oldlimit) { rlim_t decr_step = 16; limit.rlim_cur = bestlimit; limit.rlim_max = bestlimit; if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; setrlimit_error = errno; /* We failed to set file limit to 'bestlimit'. Try with a * smaller limit decrementing by a few FDs per iteration. */ if (bestlimit < decr_step) break; bestlimit -= decr_step; } /* Assume that the limit we get initially is still valid if * our last try was even lower. */ if (bestlimit < oldlimit) bestlimit = oldlimit; if (bestlimit < maxfiles) { unsigned int old_maxclients = server->maxclients; server->maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS; /* maxclients is unsigned so may overflow: in order * to check if maxclients is now logically less than 1 * we test indirectly via bestlimit. */ if (bestlimit <= CONFIG_MIN_RESERVED_FDS) { printf("Your current 'ulimit -n' " "of %llu is not enough for the server to start. " "Please increase your open file limit to at least " "%llu. Exiting.", (unsigned long long) oldlimit, (unsigned long long) maxfiles); exit(1); } printf("You requested maxclients of %d " "requiring at least %llu max file descriptors.", old_maxclients, (unsigned long long) maxfiles); printf("Server can't set maximum open files " "to %llu because of OS error: %s.", (unsigned long long) maxfiles, strerror(setrlimit_error)); printf("Current maximum open files is %llu. " "maxclients has been reduced to %d to compensate for " "low ulimit. " "If you need higher maxclients increase 'ulimit -n'.", (unsigned long long) bestlimit, server->maxclients); } else { printf("Increased maximum number of open files " "to %llu (it was originally set to %llu).", (unsigned long long) maxfiles, (unsigned long long) oldlimit); } } } } /* Check that server.tcp_backlog can be actually enforced in Linux according * to the value of /proc/sys/net/core/somaxconn, or warn about it. */ void checkTcpBacklogSettings(server_t *server) { //come from /redis/server.c/checkTcpBacklogSettings() #ifdef HAVE_PROC_SOMAXCONN FILE *fp = fopen("/proc/sys/net/core/somaxconn","r"); char buf[1024]; if (!fp) return; if (fgets(buf,sizeof(buf),fp) != NULL) { int somaxconn = atoi(buf); if (somaxconn > 0 && somaxconn < server->tcp_backlog) { printf("WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server->tcp_backlog, somaxconn); } } fclose(fp); #endif } void init_server(server_t *server) { server->loop = aeCreateEventLoop(server->maxclients + CONFIG_FDSET_INCR); if (server->loop == NULL) { printf("Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } //TCP Server, NULL代表INADDR_ANY, 绑定所有网卡所有IP server->listen_fd = anetTcpServer(server->err_info, server->port, NULL, server->tcp_backlog); if (server->listen_fd != ANET_ERR) { //anetSetReuseAddr(server->err_info, server->listen_fd);//这个接口不对外,anetTcpServer默认已经实现 anetNonBlock(NULL, server->listen_fd); } else { exit(1); } if (aeCreateFileEvent(server->loop, server->listen_fd, AE_READABLE, acceptTcpHandler, server) != AE_ERR) { char conn_info[64]; anetFormatSock(server->listen_fd, conn_info, sizeof(conn_info)); printf("listen on: %s\n", conn_info); } //Timer if (aeCreateTimeEvent(server->loop, 1, onTimer_all, NULL, NULL) == AE_ERR) { //1ms,第1次的定时启动时间 printf("Can't create event loop timers.\n"); exit(1); } } void wait_server(server_t *server) { aeMain(server->loop); aeDeleteEventLoop(server->loop); } void signal_exit_func(int signo) { printf("exit signo is %d\n", signo); //aeStop(m_loop); } void signal_exit_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_handler = signal_exit_func; sigaction(SIGINT, &sa, NULL);//当按下ctrl+c时,它的效果就是发送SIGINT信号 sigaction(SIGTERM, &sa, NULL);//kill pid sigaction(SIGQUIT, &sa, NULL);//ctrl+\代表退出SIGQUIT //SIGSTOP和SIGKILL信号是不可捕获的,所以下面两句话写了等于没有写 sigaction(SIGKILL, &sa, NULL);//kill -9 pid sigaction(SIGSTOP, &sa, NULL);//ctrl+z代表停止 //#define SIGTERM 15 //#define SIGKILL 9 //kill和kill -9,两个命令在linux中都有杀死进程的效果,然而两命令的执行过程却大有不同,在程序中如果用错了,可能会造成莫名其妙的现象。 //执行kill pid命令,系统会发送一个SIGTERM信号给对应的程序。 //执行kill -9 pid命令,系统给对应程序发送的信号是SIGKILL,即exit。exit信号不会被系统阻塞,所以kill -9能顺利杀掉进程。 } int main() { int background = 0; if (background) { daemonize(); } //signal(SIGHUP, SIG_IGN); //开启的话,就捕获不到终端窗口关闭的信号了。即窗口关闭,进程仍然进行。 signal(SIGPIPE, SIG_IGN); server_t server; bzero(&server, sizeof(server)); server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;//DEFAULT_LISTEN_BACKLOG; server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;//DEFAULT_MAX_CLIENT_COUNT; server.curclients = 0; server.port = CONFIG_DEFAULT_SERVER_PORT;//DEFAULT_LISTEN_PORT; adjustOpenFilesLimit(&server); init_server(&server); signal_exit_handler(); checkTcpBacklogSettings(&server); wait_server(&server); return 0; }
完整的工程源码下载(malloc使用原生的libc):https://download.csdn.net/download/libaineu2004/10468733
完整的工程源码下载(malloc使用jemalloc):https://download.csdn.net/download/libaineu2004/10468734
如果不清楚libc和jemalloc的概念,请看http://blog.csdn.net/libaineu2004/article/details/79400357
优化改进建议:
1、epoll_wait主线程用于处理io事件,用户的业务数据处理可以采用线程池来解决。
2、判断客户端心跳超时机制,这里用的是笨方法--轮询,效率低下。建议使用时间轮,即循环链表。nginx使用的是红黑树,libevent使用的是最小堆。可以参考文章《10w定时任务,如何高效触发超时》