sock.hpp
#pragma once #include <iostream> #include <string> #include <cstring> #include <cerrno> #include <cassert> #include <unistd.h> #include <memory> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <ctype.h> class Sock { private: // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1 const static int gbacklog = 10; public: Sock() {} static int Socket() { int listensock = socket(AF_INET, SOCK_STREAM, 0); if (listensock < 0) { exit(2); } int opt = 1; setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); return listensock; } static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0") { struct sockaddr_in local; memset(&local, 0, sizeof local); local.sin_family = AF_INET; local.sin_port = htons(port); inet_pton(AF_INET, ip.c_str(), &local.sin_addr); if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0) { exit(3); } } static void Listen(int sock) { if (listen(sock, gbacklog) < 0) {exit(4);} } // 一般经验 // const std::string &: 输入型参数 // std::string *: 输出型参数 // std::string &: 输入输出型参数 static int Accept(int listensock, std::string *ip, uint16_t *port) { struct sockaddr_in src; socklen_t len = sizeof(src); int servicesock = accept(listensock, (struct sockaddr *)&src, &len); if (servicesock < 0) { return -1; } if(port) *port = ntohs(src.sin_port); if(ip) *ip = inet_ntoa(src.sin_addr); return servicesock; } static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(server_port); server.sin_addr.s_addr = inet_addr(server_ip.c_str()); if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true; else return false; } ~Sock() {} };
main.cc
#include "pollserver.hpp" using namespace std; int main() { PollServer pollserver; cout<<"runring......."<<endl; pollserver.start(); }
/O多路转接之epoll
epoll初识
epoll也是系统提供的一个多路转接接口。
epoll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位是一样的,适用场景也相同。
epoll在命名上比poll多了一个e,这个e可以理解成是extend,epoll就是为了同时处理大量文件描述符而改进的poll。
epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll的相关系统调用
epoll有三个相关的系统调用,分别是epoll_create、epoll_ctl和epoll_wait。
epoll工作原理
epoll.hpp
#pragma once #include <iostream> #include <sys/epoll.h> #include <unistd.h> using namespace std; class Epoll { public: static const int gsize = 256; public: static int CreateEpoll() { int epfd = epoll_create(gsize); if(epfd > 0) return epfd; exit(5); } static bool CtrlEpoll(int epfd,int oper,int sock,uint32_t events) { struct epoll_event ev; ev.events = events; ev.data.fd = sock; int n = epoll_ctl(epfd,oper,sock,&ev); return n == 0; } static int WaitEpoll(int epfd,struct epoll_event* revs,int num,int timeout) { // 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿 // 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有 // 就绪的event按照顺序放入到revs数组中!一共有返回值个! return epoll_wait(epfd,revs,num,timeout); } };
log.hpp
#pragma once #include <iostream> #include <cstdio> #include <cstdarg> #include <ctime> #include <string> // 日志是有日志级别的 #define DEBUG 0 #define NORMAL 1 #define WARNING 2 #define ERROR 3 #define FATAL 4 const char *gLevelMap[] = { "DEBUG", "NORMAL", "WARNING", "ERROR", "FATAL" }; #define LOGFILE "./selectServer.log" // 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名) void logMessage(int level, const char *format, ...) { // va_list ap; // va_start(ap, format); // while() // int x = va_arg(ap, int); // va_end(ap); //ap=nullptr char stdBuffer[1024]; //标准部分 time_t timestamp = time(nullptr); // struct tm *localtime = localtime(×tamp); snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp); char logBuffer[1024]; //自定义部分 va_list args; va_start(args, format); // vprintf(format, args); vsnprintf(logBuffer, sizeof logBuffer, format, args); va_end(args); // FILE *fp = fopen(LOGFILE, "a"); printf("%s%s\n", stdBuffer, logBuffer); // fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // fclose(fp); }
sock.hpp
#pragma once #include <iostream> #include <string> #include <cstring> #include <cerrno> #include <cassert> #include <unistd.h> #include <memory> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <ctype.h> //全加静态成员让他变成一个方法 class Sock { private: // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1 const static int gbacklog = 10; public: Sock() {} static int Socket() { int listensock = socket(AF_INET, SOCK_STREAM, 0); if (listensock < 0) { exit(2); } int opt = 1; setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); return listensock; } static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0") { struct sockaddr_in local; memset(&local, 0, sizeof local); local.sin_family = AF_INET; local.sin_port = htons(port); inet_pton(AF_INET, ip.c_str(), &local.sin_addr); if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0) { exit(3); } } static void Listen(int sock) { if (listen(sock, gbacklog) < 0) { exit(4); } } // 一般经验 // const std::string &: 输入型参数 // std::string *: 输出型参数 // std::string &: 输入输出型参数 static int Accept(int listensock, std::string *ip, uint16_t *port) { struct sockaddr_in src; socklen_t len = sizeof(src); int servicesock = accept(listensock, (struct sockaddr *)&src, &len); if (servicesock < 0) { return -1; } if(port) *port = ntohs(src.sin_port); if(ip) *ip = inet_ntoa(src.sin_addr); return servicesock; } static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(server_port); server.sin_addr.s_addr = inet_addr(server_ip.c_str()); if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true; else return false; } ~Sock() {} };
epollserver.hpp
#include "sock.hpp" #include "log.hpp" #include <memory> #include <sys/types.h> #include <cstring> #include <functional> #include <string> #include "epoll.hpp" using namespace std; const static int default_port = 8080; const static int gnum = 64; using func_t = function<void(string)>; class EpollServer { public: EpollServer(func_t HandlerRequese,const int &port = default_port) :_port(port),_HandlerRequest(HandlerRequese),_revs_num(gnum) { // 0. 申请对应的空间 _revs = new struct epoll_event[_revs_num]; // 1. 创建listensock _listensock = Sock::Socket(); Sock::Bind(_listensock,_port); Sock::Listen(_listensock); //2.创建epoll模型 _epfd = Epoll::CreateEpoll(); logMessage(DEBUG,"init success,listensock: %d,epfd: %d",_listensock,_epfd); //3.将listensock加入到epoll模型中,以关系读的事务加入 if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,_listensock,EPOLLIN)) exit(6);// //EPOLLIN是读事务 logMessage(DEBUG, "add listensock to epoll success."); // 3, 4 } void Accepter(int listensock) { string clientip; uint16_t clientport; int sock = Sock::Accept(listensock,&clientip,&clientport); if(sock < 0) { logMessage(WARNING, "accept error!"); return; } // 能不能直接读取?不能,因为你并不清楚,底层是否有数据! // 将新的sock,添加给epoll if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,sock,EPOLLIN)) exit(6); logMessage(DEBUG, "add new sock : %d to epoll success", sock); } void Recver(int sock) { char buffer[10240]; ssize_t n = recv(sock,buffer,sizeof(buffer) - 1,0); if(n > 0) { //假设这里就是读到了一个完整的报文 // 如何保证?? buffer[n] = 0; _HandlerRequest(buffer); // 2. 处理数据 } else if(n == 0) { // 1. 先在epoll中去掉对sock的关 bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0); assert(res); (void)res; //2.在close文件 close(sock); logMessage(NORMAL,"client %d quit,me too...",sock); } else { //1.先在epoll中去掉对sock的关心 bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0); assert(res); (void)res; close(sock); logMessage(NORMAL, "client recv %d error, close error sock", sock); } } void HandlerEvents(int n) { assert(n > 0); for(int i = 0;i < n;i++) { uint32_t revents = _revs[i].events; int sock = _revs[i].data.fd;//看看是哪个fd就绪了 if(revents & EPOLLIN) { if(sock == _listensock) Accepter(_listensock); else Recver(sock); } if(revents & EPOLLOUT) { //TODO } } } void looponce(int timeout) { int n = Epoll::WaitEpoll(_epfd,_revs,_revs_num,timeout); //if(n == _revs_num) //扩容 switch (n)//返回值n,代表有一个关心的事务就绪 { case 0: logMessage(DEBUG, "timeout..."); // 3, 4 break; case -1: logMessage(WARNING, "epoll wait error: %s", strerror(errno)); break; default: //等待成功,有至少一个关系的事务已经就绪 //去执行 logMessage(DEBUG, "get a event"); HandlerEvents(n); break; } } void start() { int timeout = 1000; while(1) { looponce(timeout); } } ~EpollServer() { if (_listensock >= 0) close(_listensock); if (_epfd >= 0) close(_epfd); if (_revs) delete[] _revs; } private: int _listensock; uint16_t _port; int _epfd; struct epoll_event* _revs; int _revs_num; func_t _HandlerRequest; };
main.cc
#include "epollserver.hpp" #include <memory> #include <iostream> #include <string> using namespace std; void change(string str) { cout<<str<<endl; } int main() { EpollServer epollserver(change); cout<<"runring......."<<endl; epollserver.start(); }