把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(5)(★firecat推荐★)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(5)(★firecat推荐★)

10、main.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/resource.h>    /*setrlimit */
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include "mytimer.h"
#include "client.h"
#define IPADDRESS               "127.0.0.1"
#define PORT                    1883
#define LISTENQ                 512
#define FDSIZE                  80000
#define EPOLLEVENTS             100
#define CONFIG_MIN_RESERVED_FDS 32 //come from redis src
#define CONFIG_FDSET_INCR       (CONFIG_MIN_RESERVED_FDS+96)
#define CLIENT_TIMEOUT          60 * 1000//ms
int stop_server = 0;
int timer_id = 0;
//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd);
//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd);
//写处理
static void do_write(int epollfd,int fd);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);
//other
static int do_error(int fd, int *error);
static int setnonblocking(int fd);
static void daemonize(void);
static int set_fdlimit();
static void signal_exit_handler();
static void signal_exit_func(int signo);
static void handle_timer();
static int timerfun_callback(int arg);
static int user_read(client_t *client, buffer_t *rbuffer);
static int user_write(client_t *client, unsigned char* data, int len);
int main(int argc,char *argv[])
{
    //设置每个进程允许打开的最大文件数,socket
    if (set_fdlimit() < 0)
    {
        return -1;
    }
    int background = 0;
    if (background)
    {
        daemonize();
    }
    //设置信号处理,SIG_IGN表示忽略信号,SIG_DFL表示使用信号的默认处理方式
    //signal(SIGHUP, SIG_IGN); //开启的话,就捕获不到终端窗口关闭的信号了。即窗口关闭,进程仍然进行。
    signal(SIGPIPE, SIG_IGN);
    /*
    if (argc != 2) {
        fprintf(stderr, "Usage: %s port\n", argv[0]);
        return 1;
    }
    int port = atoi(argv[1]);*/
    create_fileEvent(FDSIZE + CONFIG_FDSET_INCR);
    int  listenfd;
    listenfd = socket_bind(IPADDRESS,PORT);
    listen(listenfd,LISTENQ);
    printf("start listening...\n");
    signal_exit_handler();
    timer_init();
    do_epoll(listenfd);
    timer_destroy();
    destroy_fileEvent(FDSIZE + CONFIG_FDSET_INCR);
    return 0;
}
static int socket_bind(const char* ip,int port)
{
    int  listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET,SOCK_STREAM,0);
    if (listenfd == -1)
    {
        perror("socket error:");
        exit(1);
    }
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    //inet_pton(AF_INET,ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    int error;
    int reuse = 1;
    int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    if (ret == -1)
    {
        return do_error(listenfd, &error);
    }
    if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error: ");
        exit(1);
    }
    return listenfd;
}
static void do_epoll(int listenfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    //创建一个描述符
    int error;
    epollfd = epoll_create(1024);//1024 is just a hint for the kernel
    if (epollfd == -1)
    {
        return do_error(epollfd, &error);
    }
    //添加监听描述符事件
    add_event(epollfd,listenfd,EPOLLIN);
    struct event *event;
    //struct timeval now;
    struct timeval tv;
    struct timeval *tvp = NULL;
    while ( stop_server == 0 )
    {
        if ((event = timer_top()) != NULL)
        {
            long now_sec, now_ms;//come from redis src "ae.c", int aeProcessEvents(aeEventLoop *eventLoop, int flags)
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms = (event->ev_timeout.tv_sec - now_sec) * 1000 +
                    (event->ev_timeout.tv_usec / 1000 - now_ms);
            if (ms > 0) {
                tvp->tv_sec = ms / 1000;
                tvp->tv_usec = (ms % 1000) * 1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
            /* maybe error
            gettime(&now);
            tv.tv_sec = event->ev_timeout.tv_sec - now.tv_sec;;
            tv.tv_usec = event->ev_timeout.tv_usec - now.tv_usec;
            if ( tv.tv_usec < 0 )
            {
                tv.tv_usec += 1000000;
                tv.tv_sec--;
                printf("tv.tv_usec < 0\n");
            }
            tvp = &tv;
            */
        }
        else
        {
            tvp = NULL;
            printf("tvp == NULL\n");
        }
        //获取已经准备好的描述符事件
        if (tvp == NULL)
        {
            ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
        }
        else
        {
            printf("timer_wait:%d\n", tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms
            ret = epoll_wait(epollfd, events, EPOLLEVENTS, tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms
        }
        handle_events(epollfd,events,ret,listenfd);
        handle_timer();
    }
    close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd)
{
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd);
    }
}
static void handle_timer()
{
    timer_process();
}
static void handle_accpet(int epollfd,int listenfd)
{
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t  cliaddrlen = sizeof(cliaddr);
    clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
    if (clifd == -1)
        perror("accpet error:");
    else
    {
        printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
        client_t *client = alloc_client();
        if (!client) {
            printf("alloc client error...close socket\n");
            close(clifd);
            return;
        }
        client->fd = clifd;
        client->epollfd = epollfd;
        client->timerId = timer_id;
        fileev[clifd].clientData = client;
        //添加一个客户描述符和事件
        add_event(epollfd,clifd,EPOLLIN);
        //add timer
        timer_add(timer_id, 1000, timerfun_callback, clifd, CYCLE_TIMER, 0);//ms
        timer_id++;
    }
}
static void do_read(int epollfd,int fd)
{
    client_t *client = fileev[fd].clientData;
    buffer_t *rbuffer = client->read_buffer;
    check_buffer_size(rbuffer, DEFAULT_BUFF_SIZE / 2);
    size_t avlid_size = rbuffer->size - rbuffer->write_idx;
    //ssize_t readn = anetRead(fd, rbuffer->buff + rbuffer->write_idx, avlid_size);
    //不能调用anetRead这个函数
    //1.客户端下线不好判断
    //2.该函数适合linux epoll是边缘模式(ET),数据一定要一次性收完,anetRead里面有while循环
    //3.redis源码自身也没有调用anetRead
    //把读到的网络数据写入活塞缓存
    ssize_t readn = read(fd, rbuffer->buff + rbuffer->write_idx, avlid_size);
    if (readn > 0)
    {
        rbuffer->write_idx += readn;
        user_read(client, rbuffer);
        client->last_recv_tick = get_tick_count();
    }
    else if (readn == 0)
    {
        printf("fd=%d, client disconnect, close it.\n", client->fd);
        delete_event(epollfd,fd,EPOLLIN);
        delete_event(epollfd,fd,EPOLLOUT);
        timer_remove(client->timerId);
        free_client(client);
    }
    else if (readn == -1)
    {
        if (errno == EAGAIN) {
            return;
        } else {
            printf("read error,%s.\n", strerror(errno));
            delete_event(epollfd,fd,EPOLLIN);
            delete_event(epollfd,fd,EPOLLOUT);
            timer_remove(client->timerId);
            free_client(client);
        }
    }
}
static void do_write(int epollfd,int fd)
{
    client_t *client = fileev[fd].clientData;
    buffer_t *wbuffer = client->write_buffer;
    int data_size = (int)get_readable_size(wbuffer);
    if (data_size == 0) {
        delete_event(epollfd,fd,EPOLLOUT);
        return;
    }
    //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size);
    int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size);
    if (writen > 0) {
        wbuffer->read_idx += writen;
    } else if (writen == 0) {
        printf("Writing 0\n");
    } else { //-1
        if (errno != EWOULDBLOCK) {
            printf("Writing error: %s\n", strerror(errno));
        } else {
            printf("Writing EWOULDBLOCK\n");
        }
    }
    if (get_readable_size(wbuffer) == 0) {
        delete_event(epollfd,fd,EPOLLOUT);
    }
}
static int user_read(client_t *client, buffer_t *rbuffer)
{
    size_t len = get_readable_size(rbuffer);
    unsigned char data[DEFAULT_BUFF_SIZE];
    if (len > DEFAULT_BUFF_SIZE)
    {
        len = DEFAULT_BUFF_SIZE;
    }
    //把活塞缓存读取出来,作为用户数据
    memcpy(data, rbuffer->buff + rbuffer->read_idx, len);
    rbuffer->read_idx += len;
    int i = 0;
    for (i = 0; i < len; i++)
    {
        printf("%c", data[i]);
    }
    printf("\n");
    user_write(client, data, len);//echo
    return 0;
}
static int user_write(client_t *client, unsigned char* data, int len)
{
    //把用户数据写入活塞缓存
    buffer_t *wbuffer = client->write_buffer;
    check_buffer_size(wbuffer, len);
    memcpy((char *)wbuffer->buff + wbuffer->write_idx, data, len);
    wbuffer->write_idx += len;
    //把活塞缓存的有效数据通过网络发送出去
    //int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer));
    int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer));
    if (writen > 0) {
        wbuffer->read_idx += writen;
    } else if (writen == 0) {
        printf("Writing 0\n");
    } else { //-1
        if (errno != EWOULDBLOCK) {
            printf("Writing error: %s\n", strerror(errno));
        } else {
            printf("Writing EWOULDBLOCK\n");
        }
    }
    //如果writen==-1,表示当前tcp窗口容量不够,需要等待下次机会再发,errno == EWOULDBLOCK
    //因为活塞缓存的有效数据没有发完,遗留部分需要再给机会
    if (get_readable_size(wbuffer) != 0) {
        /*
        if (aeCreateFileEvent(client->loop, client->fd,
                              AE_WRITABLE, writeEventHandler, client) == AE_ERR) {
            printf("create socket writeable event error, close it.\n");
            free_client(client);
        }*/
        //modify_event(client->epollfd,client->fd,EPOLLOUT);
        add_event(client->epollfd,client->fd,EPOLLOUT);
    }
    return 0;
}
static void add_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
    setnonblocking(fd);
}
static void delete_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
static int do_error(int fd, int *error)
{
    fprintf(stderr, "error: %s\n", strerror(errno));
    *error = errno;
    while ((close(fd) == -1) && (errno == EINTR));
    errno = *error;
    return 1;
}
static int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}
static void daemonize(void) { //come from /redis/server.c/daemonize()
    int fd;
    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */
    /* Every output goes to /dev/null. If Redis is daemonized but
     * the 'logfile' is set to 'stdout' in the configuration file
     * it will not log at all. */
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if (fd > STDERR_FILENO) close(fd);
    }
}
static int set_fdlimit()
{
    //设置每个进程允许打开的最大文件数
    //这项功能等价于linux终端命令 "ulimit -n 102400"
    struct rlimit rt;
    rt.rlim_max = rt.rlim_cur = FDSIZE + CONFIG_FDSET_INCR;
    if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
    {
        perror("setrlimit error");
        return -1;
    }
    return 0;
}
static void signal_exit_handler()
{
    struct sigaction sa;
    memset(&sa, 0, sizeof(sa));
    sa.sa_handler = signal_exit_func;
    sigaction(SIGINT, &sa, NULL);//当按下ctrl+c时,它的效果就是发送SIGINT信号
    sigaction(SIGTERM, &sa, NULL);//kill pid
    sigaction(SIGQUIT, &sa, NULL);//ctrl+\代表退出SIGQUIT
    //SIGSTOP和SIGKILL信号是不可捕获的,所以下面两句话写了等于没有写
    sigaction(SIGKILL, &sa, NULL);//kill -9 pid
    sigaction(SIGSTOP, &sa, NULL);//ctrl+z代表停止
    //#define    SIGTERM        15
    //#define    SIGKILL        9
    //kill和kill -9,两个命令在linux中都有杀死进程的效果,然而两命令的执行过程却大有不同,在程序中如果用错了,可能会造成莫名其妙的现象。
    //执行kill pid命令,系统会发送一个SIGTERM信号给对应的程序。
    //执行kill -9 pid命令,系统给对应程序发送的信号是SIGKILL,即exit。exit信号不会被系统阻塞,所以kill -9能顺利杀掉进程。
}
static void signal_exit_func(int signo)
{
    printf("exit signo is %d\n", signo);
    stop_server = 1;
}
static int timerfun_callback(int arg)
{
    client_t *client = fileev[arg].clientData;
    assert(arg == client->fd);
    printf("timer_id=%d, fd=%d\n", client->timerId, client->fd);
    //心跳机制:定时检测,如果没有数据来则踢除客户端
    uint64_t curr_tick = get_tick_count();
    if (curr_tick > client->last_recv_tick + CLIENT_TIMEOUT)
    {
        printf("timeout, fd=%d\n", client->fd);
        //timer_remove(arg);//不能在这里删除timer,因为是回调函数,timer其实已经先pop掉了,我们要做的是不让它再次加入堆
        delete_event(client->epollfd, client->fd, EPOLLIN);
        delete_event(client->epollfd, client->fd, EPOLLOUT);
        free_client(client);
        return 0;//kill timer,返回值为0,目的是不让它再次加入堆
    }
    return 1;
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
设计模式 JavaScript API
Nodejs 第十七章(events)
Nodejs 第十七章(events)
55 0
|
2月前
|
JavaScript 前端开发 开发者
探索yocto-queue库:替代数组的实现原理与方法
在需要高性能队列结构的场景下,yocto-queue提供了一个轻量级且高效的解决方案。它的实现原理优雅且有效,使得在实际应用中,特别是在性能敏感的环境下,成为了数组的一个强大替代者。通过减少性能开销,yocto-queue使得JavaScript开发者能够构建更快、更可靠的应用程序,从而提高用户体验和应用性能。
43 2
|
6月前
|
缓存 PHP Apache
一起从根上学习Swoole
一起从根上学习Swoole
76 0
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(2)(★firecat推荐★)
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(2)(★firecat推荐★)
189 0
composer一共有哪些命令?底层原理是什么?
composer一共有哪些命令?底层原理是什么?
|
存储 Python 容器
Python标准库collections库:超好用的counter计数器,不接受反驳!
Python标准库collections库:超好用的counter计数器,不接受反驳!
Python标准库collections库:超好用的counter计数器,不接受反驳!
|
区块链 C++ Windows
写了个VC++ MFC小工具,一键整理Resource.h文件,重新排序并去重复ID(★firecat推荐★)
写了个VC++ MFC小工具,一键整理Resource.h文件,重新排序并去重复ID(★firecat推荐★)
993 0
写了个VC++ MFC小工具,一键整理Resource.h文件,重新排序并去重复ID(★firecat推荐★)
|
Linux
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(3)(★firecat推荐★)
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(3)(★firecat推荐★)
145 0
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(4)(★firecat推荐★)
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(4)(★firecat推荐★)
104 0
|
Linux C++
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(1)(★firecat推荐★)
把libevent 2.1.8源码的最小堆提取出来,自己封装成定时器使用(1)(★firecat推荐★)
130 0