背景
公司业务需要,读取yuv个数的数据,发送到服务端。刚开始使用的阻塞的套接字(注意:创建的套接字默认是阻塞的),想着用非阻塞的模式试一试,经过一番摸索,将整个过程记录一下。
因为一笔yuv数据是12M,所以在非阻塞模式下,send或recv的时候会报错Resource temporarily unavailable,这是因为对方的接收缓冲满了或者己方的接收缓冲区没有数据。
引言
对于套接字来说,阻塞和非阻塞的只会影响以下几个网络编程api
1.客户端的 connect 函数
2.服务端的 accept 函数
3.发送数据 send 函数
4.接收数据 recv 函数
为了行文方便,先做以下说明:
- connfd: 客户端创建的套接字
- listenfd: 服务端创建的监听套接字
- clientfd: 客户端连接时,服务端accept函数返回的套接字
如果有对网络编程、IO模型和IO复用还不熟悉的小伙伴可以先看下我之前的文章:Posix API与网络协议栈实现原理、Linux五种IO模型
一、客户端
对于客户端 connfd 来说:
- 阻塞: connect 函数会一直阻塞直到连接成功或者超时或者出错
- 非阻塞:无论连接是否建立成功,connect函数都会立即返回。 如果返回值为0,表示连接成功;如果小于0,不代表一定失败,需根据实际的错误码errno来执行响应的操作。此时错误码errno可能为:EINTR 或 EINPROGRESS,如果是EINTR,表示连接被信号打断,可以继续尝试连接;如果是 EINPROGRESS 表示连接还在进行中,后面需要通过select或poll来判断connfd是否可写,如果可写表示连接成功。
二、 服务端
对于服务端 listenfd 来说
- 阻塞: accept函数会一直阻塞,直到pending连接队列中有连接要处理
- 非阻塞:accepte函数会立即返回,如果pending连接队列中有连接,则返回clientfd;如果没有要处理的连接,返回值小于0。错误码errno为:EWOULDBLOCK 或 EAGAIN,对应的错误为:Resource temporarily unavailable。当出现EWOULDBLOCK 或 EAGAIN 最好的办法是重试,重试一定次数后还不成功就退出操作。不能无限重试,导致线程卡死
三.、send 和 recv 系统调用
3.1 send函数
如果是sock是阻塞的,对于发送数据send函数来说,当对方的接收窗口太小,会一直卡在send函数,从现象看就是程序卡死了;如果是非阻塞的,send函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。
3.2 recv函数
如果是sock是阻塞的,对于接收数据recv函数来说,如果接收缓冲区中没有数据,会一直卡在recv函数,从现象看就是程序卡死了;如果是非阻塞的,recv函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。
3.3 send 和 recv 函数返回值分析
- ret > 0 send或recv成功的字节数
- ret == 0 对端关闭连接
- ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断
四、错误码
#define EAGAIN 11 /* Try again */ # EAGAIN表示:资源短暂不可用,这个操作可能等下重试后可用。EAGAIN的另一个名字叫做EWOULDAGAIN,这两个宏定义在GNU的c库中永远是同一个值。 #define EINTR 4 /* Interrupted system call */ #define EWOULDBLOCK EAGAIN /* Operation would block */
IO复用epoll + 非阻塞sokcet
服务端代码:server_epoll_noblock.cpp
// 编译方法: // g++ -o server_epoll_noblock server_epoll_noblock.cpp #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <fcntl.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/select.h> #include <netinet/in.h> #include <arpa/inet.h> #include <poll.h> #include <iostream> #define ANET_OK 0 #define ANET_ERR -1 int anetSetBlock(int fd, int non_block) { int flags; if ((flags = fcntl(fd, F_GETFL)) == -1) { return ANET_ERR; } /* Check if this flag has been set or unset, if so, * then there is no need to call fcntl to set/unset it again. */ if (!!(flags & O_NONBLOCK) == !!non_block) return ANET_OK; if (non_block) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) == -1) { return ANET_ERR; } return ANET_OK; } #define CLIENTCOUNT 2048 #define MAX_EVENTS 2048 #define MAX_EPOLL_EVENTS 1024 #define MAX_ACCEPT_PER_CALL 1000 #define NOBLOCK_ACCEPT_OPTIMAL 0 int main(int argc, char **argv) { int listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd < 0) { perror("socket"); return -1; } anetSetBlock(listenfd, true); unsigned short sport = 3000; if(argc == 2) { sport = atoi(argv[1]); } struct sockaddr_in addr; addr.sin_family = AF_INET; printf("port = %d\n", sport); addr.sin_port = htons(sport); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); if(bind(listenfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return -2; } if(listen(listenfd, 20) < 0) { perror("listen"); return -3; } struct sockaddr_in connaddr; socklen_t len = sizeof(connaddr); int i = 0, ret = 0; int epollfd = epoll_create(MAX_EVENTS); // 将listenfd加入到epoll集合 struct epoll_event event; event.events = EPOLLIN; event.data.fd = listenfd; if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0) { perror("epoll_ctl"); return -2; } struct epoll_event events[MAX_EPOLL_EVENTS + 1]; int count = 0; int nready = 0; char buf[1024] = {0}; int clientfd = 0; while(1) { nready = epoll_wait(epollfd, events, MAX_EPOLL_EVENTS, -1); if(nready == -1) { perror("epoll_wait"); return -3; } if(nready == 0) // 肯定不会走到这里,因为上面没设置超时时间 { continue; } for(i = 0; i < nready; i++) { std::cout << "current ready fd is: " << events[i].data.fd << std::endl; if(events[i].data.fd == listenfd) { // listenfd 是非阻塞,我们就可以一直在这里死循环处理pending连接队列中的客户端,直到pending队列为null // 也存在一个问题,当连接的客户端数量较多是,这里的处理会耗时 // Redis 在这里做了优化,每次最多处理1000条连接 #if NOBLOCK_ACCEPT_OPTIMAL int maxAccept = MAX_ACCEPT_PER_CALL; //Redis 优化使用 while (maxAccept--) //Redis 优化 #else while (true) #endif { clientfd = accept(listenfd, (struct sockaddr*)&connaddr, &len); if (clientfd == ANET_ERR){ // 已经没有连接要处理了,退出accept的while循环 if (errno == EWOULDBLOCK || errno == EAGAIN){ std::cout << "no client link need to handler" << std::endl; break; }else if(errno == EINTR){ continue; }else{ // error happend } }else { char strip[64] = {0}; char *ip = inet_ntoa(connaddr.sin_addr); strcpy(strip, ip); printf("client connect, clientfd:%d,ip:%s, port:%d, count:%d\n", clientfd, strip,ntohs(connaddr.sin_port), ++count); // 设置clientfd为非阻塞 anetSetBlock(clientfd, true); // 将clientfd加入到epoll集合,并监听它的读事件 event.data.fd = clientfd;// 这样在epoll_wait返回时就可以直接用了 event.events = EPOLLIN; // 水平触发 // event.events = EPOLLIN|EPOLLET; // 边沿触发 epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &event); } } } else if(events[i].events & EPOLLIN) { std::cout << "clientfd : " << clientfd << " on message" << std::endl; clientfd = events[i].data.fd; if(clientfd < 0) continue; char *buffer = new char[15*1024*1024]; if (!buffer){ std::cout << "malloc buffer failed!" << std::endl; return -1; } std::cout << "malloc buffer success。。。" << std::endl; memset(buffer, 0, 15*1024*1024); int total = 12441600; while (1) { // int ret; memset(buffer, 0, 15*1024*1024); int sendBytes = 0, recvBytes = 0, ret, offset = 0; // 接收客户端的请求报文。 while (recvBytes < total) { if ( (ret = recv(clientfd, buffer+offset, total-recvBytes, 0)) == 0) // 接收服务端的回应报文。 { printf("ret = %d , client disconected!!!\n", ret); return -1; } else if (ret < 0){ if (errno != EWOULDBLOCK || errno != EAGAIN){ std::cout << "read error" << std::endl; } // return -1; } else { offset += ret; recvBytes += ret; // printf("curent recv :%d bytes\n", ret); // break; } } printf("从客户端总共接收:%d 字节的数据\n", recvBytes); ret = 0; offset = 0; while (sendBytes < total) { ret = send(clientfd, buffer + offset, total-sendBytes, 0); if (ret == 0) // 向服务端发送请求报文。 { perror("send"); break; }else if (ret == ANET_ERR){ if (errno == EWOULDBLOCK || errno == EAGAIN) { // std::cout << "send kernal buffer full, retrying..." << std::endl; continue; } else { // 连接出错,关闭connfd close(clientfd); return -1; } } else { sendBytes += ret; offset += ret; // printf("已发送:%d 字节的数据\n", ret); } } printf("回复客户端总共:%d 字节的数据\n", sendBytes); } delete buffer; } } } close(listenfd); return 0; }
客户端代码:client_noblock.cpp
/** * Linux 下使用poll实现异步的connect,linux_nonblocking_connect_poll.cpp * zhangyl 2019.03.16 */ #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <poll.h> #include <iostream> #include <string.h> #include <stdio.h> #include <fcntl.h> #include <errno.h> #include <assert.h> #include <iostream> #include <fstream> #define SERVER_ADDRESS "127.0.0.1" #define SERVER_PORT 3000 #define ANET_OK 0 #define ANET_ERR -1 int main(int argc, char* argv[]) { //1.创建一个socket int connfd = socket(AF_INET, SOCK_STREAM, 0); assert(connfd >= 0); if (connfd == -1) { std::cout << "create client socket error." << std::endl; return -1; } //将 connfd 设置成非阻塞模式 int oldSocketFlag = fcntl(connfd, F_GETFL, 0); int newSocketFlag = oldSocketFlag | O_NONBLOCK; if (fcntl(connfd, F_SETFL, newSocketFlag) == -1) { close(connfd); std::cout << "set socket to nonblock error." << std::endl; return -1; } //2.连接服务器 struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS); serveraddr.sin_port = htons(SERVER_PORT); for (;;) { // 由于connfd是非阻塞模式,无论连接是否建立成功,connect函数都会立即返回。 // 异步connect 或者叫 非阻塞connect int ret = connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); if (ret == 0) { std::cout << "connect to server successfully." << std::endl; // close(connfd); return 0; } else if (ret == ANET_ERR) { if (errno == EINTR) { //connect 动作被信号中断,重试connect std::cout << "connecting interruptted by signal, try again." << std::endl; continue; } else if (errno == EINPROGRESS) { std::cout << "connecting..." << std::endl; //连接正在尝试中 break; } else { // 连接出错,关闭connfd close(connfd); return -1; } } } // 使用poll函数判断socket是否可写,因为对于客户端connfd来说,connect的动作就是写操作 pollfd event; event.fd = connfd; event.events = POLLOUT; int timeout = 3000; if (poll(&event, 1, timeout) != 1) { close(connfd); std::cout << "[poll] connect to server error." << std::endl; return -1; } if (!(event.revents & POLLOUT)) { close(connfd); std::cout << "[POLLOUT] connect to server error." << std::endl; return -1; } int ret; socklen_t len = static_cast<socklen_t>(sizeof ret); // 使用 getsockopt 函数判断此时 connfd是否有错误 if (::getsockopt(connfd, SOL_SOCKET, SO_ERROR, &ret, &len) < 0) return -1; if (ret == 0){ std::cout << "connect to server successfully." << std::endl; } else{ std::cout << "connect to server error." << std::endl; } // 当connfd为非阻塞时,即使对方的窗口太小导致发送失败,send函数也会立即返回;即使己方的接收缓冲区没有数据 // recv函数也会立即返回,此时的返回值都是-1,错误码errno是:EWOULDBLOCK或EAGAIN // ret > 0 send或recv成功的字节数 // ret == 0 对端关闭连接 // ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断 char* buf = new char[15*1024*1024]; if (!buf){ std::cout << "malloc failed!" << std::endl; return -1; } memset(buf, 0, 15*1024*1024); std::ifstream file; std::string fileName = "test.yuv"; file.open (fileName.c_str(), std::ios::binary); // 获取filestr对应buffer对象的指针 std::filebuf *pbuf = file.rdbuf(); // 获取文件大小 size_t fileSize = pbuf->pubseekoff (0,std::ios::end,std::ios::in); pbuf->sgetn (buf, fileSize); file.close(); int total = fileSize; std::cout << "============== prepare total data.size [" << total << "] =============="<< std::endl; // 第3步:与服务端通信,发送一个报文后等待回复,然后再发下一个报文。 for (int i = 0; i < 1; i++) { memset(buf, 0, 15*1024*1024); int sendBytes = 0, recvBytes = 0, ret, offset = 0; while (sendBytes < total) { ret = send(connfd, buf + offset, total-sendBytes, 0); if (ret == 0) // 向服务端发送请求报文。 { perror("send"); break; }else if (ret == ANET_ERR){ if (errno == EWOULDBLOCK || errno == EAGAIN) { // std::cout << "send kernal buffer full, retrying..." << std::endl; continue; } else { // 连接出错,关闭connfd close(connfd); return -1; } } else { sendBytes += ret; offset += ret; // printf("已发送:%d 字节的数据\n", ret); } } std::cout << "============== send total [" << sendBytes <<" ] success. =============="<< std::endl; memset(buf, 0, 15*1024*1024); ret = 0; offset = 0; while (recvBytes < total) { ret = recv(connfd, buf+offset, total-recvBytes, 0); if (ret == 0) // 向服务端发送请求报文。 { perror("recv"); std::cout << "other end close"<< std::endl; break; }else if (ret == ANET_ERR){ if (errno == EWOULDBLOCK || errno == EAGAIN) { // std::cout << "recv kernal buffer full, retrying..." << std::endl; continue; } else { // 连接出错,关闭connfd close(connfd); return -1; } } else { recvBytes += ret; offset += ret; // printf("从服务端接收:%d 字节的数据\n", ret); } } std::cout << "###################### recv total [" << recvBytes <<" ] success. ######################"<< std::endl; // sleep(1); } delete buf; //5. 关闭socket close(connfd); return 0; }