非阻塞socket网络编程之数据收发完整代码示例

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 非阻塞socket网络编程之数据收发完整代码示例

背景

公司业务需要,读取yuv个数的数据,发送到服务端。刚开始使用的阻塞的套接字(注意:创建的套接字默认是阻塞的),想着用非阻塞的模式试一试,经过一番摸索,将整个过程记录一下。

因为一笔yuv数据是12M,所以在非阻塞模式下,send或recv的时候会报错Resource temporarily unavailable,这是因为对方的接收缓冲满了或者己方的接收缓冲区没有数据。

引言

对于套接字来说,阻塞和非阻塞的只会影响以下几个网络编程api

1.客户端的 connect 函数
2.服务端的 accept 函数
3.发送数据 send 函数
4.接收数据 recv 函数

为了行文方便,先做以下说明:

  1. connfd: 客户端创建的套接字
  2. listenfd: 服务端创建的监听套接字
  3. clientfd: 客户端连接时,服务端accept函数返回的套接字

如果有对网络编程、IO模型和IO复用还不熟悉的小伙伴可以先看下我之前的文章:Posix API与网络协议栈实现原理Linux五种IO模型

一、客户端

对于客户端 connfd 来说:

  • 阻塞: connect 函数会一直阻塞直到连接成功或者超时或者出错
  • 非阻塞:无论连接是否建立成功,connect函数都会立即返回。 如果返回值为0,表示连接成功;如果小于0,不代表一定失败,需根据实际的错误码errno来执行响应的操作。此时错误码errno可能为:EINTR 或 EINPROGRESS,如果是EINTR,表示连接被信号打断,可以继续尝试连接;如果是 EINPROGRESS 表示连接还在进行中,后面需要通过select或poll来判断connfd是否可写,如果可写表示连接成功。

二、 服务端

对于服务端 listenfd 来说

  • 阻塞: accept函数会一直阻塞,直到pending连接队列中有连接要处理
  • 非阻塞:accepte函数会立即返回,如果pending连接队列中有连接,则返回clientfd;如果没有要处理的连接,返回值小于0。错误码errno为:EWOULDBLOCK 或 EAGAIN,对应的错误为:Resource temporarily unavailable。当出现EWOULDBLOCK 或 EAGAIN 最好的办法是重试,重试一定次数后还不成功就退出操作。不能无限重试,导致线程卡死

三.、send 和 recv 系统调用

3.1 send函数

如果是sock是阻塞的,对于发送数据send函数来说,当对方的接收窗口太小,会一直卡在send函数,从现象看就是程序卡死了;如果是非阻塞的,send函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。

3.2 recv函数

如果是sock是阻塞的,对于接收数据recv函数来说,如果接收缓冲区中没有数据,会一直卡在recv函数,从现象看就是程序卡死了;如果是非阻塞的,recv函数会立即返回,返回值是-1,errno是EWOULDBLOCK或EAGAIN。

3.3 send 和 recv 函数返回值分析
  1. ret > 0 send或recv成功的字节数
  2. ret == 0 对端关闭连接
  3. ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断

四、错误码

#define EAGAIN 11 /* Try again */ 
# EAGAIN表示:资源短暂不可用,这个操作可能等下重试后可用。EAGAIN的另一个名字叫做EWOULDAGAIN,这两个宏定义在GNU的c库中永远是同一个值。
#define EINTR 4 /* Interrupted system call */
 #define EWOULDBLOCK EAGAIN /* Operation would block */

IO复用epoll + 非阻塞sokcet

服务端代码:server_epoll_noblock.cpp

// 编译方法:
// g++ -o server_epoll_noblock server_epoll_noblock.cpp
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <iostream>
#define ANET_OK  0
#define ANET_ERR -1
int anetSetBlock(int fd, int non_block) {
    int flags;
    if ((flags = fcntl(fd, F_GETFL)) == -1) {
        return ANET_ERR;
    }
    /* Check if this flag has been set or unset, if so, 
     * then there is no need to call fcntl to set/unset it again. */
    if (!!(flags & O_NONBLOCK) == !!non_block)
        return ANET_OK;
    if (non_block)
        flags |= O_NONBLOCK;
    else
        flags &= ~O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        return ANET_ERR;
    }
    return ANET_OK;
}
#define CLIENTCOUNT 2048
#define MAX_EVENTS 2048
#define MAX_EPOLL_EVENTS 1024
#define MAX_ACCEPT_PER_CALL 1000
#define NOBLOCK_ACCEPT_OPTIMAL 0
int main(int argc, char **argv)
{
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd < 0)
    {
        perror("socket");
        return -1;
    }
    anetSetBlock(listenfd, true);
    unsigned short sport = 3000;
    if(argc == 2)
    {
        sport = atoi(argv[1]);
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    printf("port = %d\n", sport);
    addr.sin_port = htons(sport);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    if(bind(listenfd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
    {
        perror("bind");
        return -2;
    }
    if(listen(listenfd, 20) < 0)
    {
        perror("listen");
        return -3;
    }
    struct sockaddr_in connaddr;
    socklen_t len = sizeof(connaddr);
    int i = 0, ret = 0;
    int epollfd = epoll_create(MAX_EVENTS);
    // 将listenfd加入到epoll集合
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = listenfd;
    if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0)
    {
        perror("epoll_ctl");
        return -2;
    }
    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int count = 0;
    int nready = 0;
    char buf[1024] = {0};
    int clientfd  = 0;
    while(1)
    {
        nready = epoll_wait(epollfd, events, MAX_EPOLL_EVENTS, -1);
        if(nready == -1)
        {
            perror("epoll_wait");
                        return -3;
        }
        if(nready == 0) // 肯定不会走到这里,因为上面没设置超时时间
        {
            continue;
        }
        for(i = 0; i < nready; i++)
        {
            std::cout << "current ready fd is: " << events[i].data.fd << std::endl;
            if(events[i].data.fd == listenfd)
            {
                // listenfd 是非阻塞,我们就可以一直在这里死循环处理pending连接队列中的客户端,直到pending队列为null
                // 也存在一个问题,当连接的客户端数量较多是,这里的处理会耗时
                // Redis 在这里做了优化,每次最多处理1000条连接
#if NOBLOCK_ACCEPT_OPTIMAL
                int maxAccept = MAX_ACCEPT_PER_CALL;  //Redis 优化使用
                while (maxAccept--)  //Redis 优化
#else
                while (true)
#endif
                {
                    clientfd = accept(listenfd, (struct sockaddr*)&connaddr, &len);
                    if (clientfd == ANET_ERR){
                        // 已经没有连接要处理了,退出accept的while循环
                        if (errno == EWOULDBLOCK || errno == EAGAIN){
                            std::cout << "no client link need to handler" << std::endl;
                            break;
                        }else if(errno == EINTR){
                            continue;
                        }else{
                            // error happend
                        }
                    }else {
                        char strip[64] = {0};
                        char *ip = inet_ntoa(connaddr.sin_addr);
                        strcpy(strip, ip);
                        printf("client connect, clientfd:%d,ip:%s, port:%d, count:%d\n", clientfd, strip,ntohs(connaddr.sin_port), ++count);
                        // 设置clientfd为非阻塞
                        anetSetBlock(clientfd, true);
                        // 将clientfd加入到epoll集合,并监听它的读事件
                        event.data.fd = clientfd;// 这样在epoll_wait返回时就可以直接用了
                        event.events = EPOLLIN;  // 水平触发
                        // event.events = EPOLLIN|EPOLLET; // 边沿触发
                        epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &event);
                    }
                }
            }
            else if(events[i].events & EPOLLIN)
            {
                std::cout << "clientfd : " << clientfd << " on message" << std::endl;
                clientfd = events[i].data.fd;
                if(clientfd < 0)
                    continue;
                char *buffer = new char[15*1024*1024];
                if (!buffer){
                    std::cout << "malloc buffer failed!" << std::endl;
                    return -1;
                }
                std::cout << "malloc buffer success。。。" << std::endl;
                memset(buffer, 0, 15*1024*1024);
                int total = 12441600;
                while (1)
                {
                    // int ret;
                    memset(buffer, 0, 15*1024*1024);
                    int sendBytes = 0, recvBytes = 0, ret, offset = 0;
                    // 接收客户端的请求报文。
                    while (recvBytes < total)
                    {
                        if ( (ret = recv(clientfd, buffer+offset, total-recvBytes, 0)) == 0) // 接收服务端的回应报文。
                        {
                            printf("ret = %d , client disconected!!!\n", ret); 
                            return -1;
                        }
                        else if (ret < 0){
                            if (errno != EWOULDBLOCK || errno != EAGAIN){
                                std::cout << "read error" << std::endl;
                            }
                            // return -1;
                        }
                        else
                        {
                            offset += ret;
                            recvBytes += ret;
                            // printf("curent recv :%d bytes\n", ret);
                            // break;
                        }
                    }
                    printf("从客户端总共接收:%d 字节的数据\n", recvBytes);
                    ret = 0;
                    offset = 0;
                    while (sendBytes < total)
                    {
                        ret = send(clientfd, buffer + offset, total-sendBytes, 0);
                        if (ret == 0) // 向服务端发送请求报文。
                        { 
                            perror("send"); 
                            break; 
                        }else if (ret == ANET_ERR){
                            if (errno == EWOULDBLOCK || errno == EAGAIN)
                            {
                                // std::cout << "send kernal buffer full, retrying..." << std::endl;
                                continue;
                            } else {
                                // 连接出错,关闭connfd
                                close(clientfd);
                                return -1;
                            }
                        }
                        else
                        {
                            sendBytes += ret;
                            offset += ret;
                            // printf("已发送:%d 字节的数据\n", ret);
                        }
                    }
                    printf("回复客户端总共:%d 字节的数据\n", sendBytes);  
                }
                delete buffer;
            }
        }    
    }
    close(listenfd);
    return 0;
}

客户端代码:client_noblock.cpp

/**
 * Linux 下使用poll实现异步的connect,linux_nonblocking_connect_poll.cpp
 * zhangyl 2019.03.16
 */
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <iostream>
#include <fstream>
#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT     3000
#define ANET_OK  0
#define ANET_ERR -1
int main(int argc, char* argv[])
{
    //1.创建一个socket
    int connfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(connfd >= 0);
    if (connfd == -1)
    {
        std::cout << "create client socket error." << std::endl;
        return -1;
    }
  //将 connfd 设置成非阻塞模式 
  int oldSocketFlag = fcntl(connfd, F_GETFL, 0);
  int newSocketFlag = oldSocketFlag | O_NONBLOCK;
  if (fcntl(connfd, F_SETFL,  newSocketFlag) == -1)
  {
    close(connfd);
    std::cout << "set socket to nonblock error." << std::endl;
    return -1;
  }
    //2.连接服务器
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
    serveraddr.sin_port = htons(SERVER_PORT);
  for (;;)
  {
        // 由于connfd是非阻塞模式,无论连接是否建立成功,connect函数都会立即返回。
        // 异步connect 或者叫 非阻塞connect
    int ret = connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    if (ret == 0)
    {
      std::cout << "connect to server successfully." << std::endl;
      // close(connfd);
      return 0;
    } 
    else if (ret == ANET_ERR) 
    {
      if (errno == EINTR)
      {
        //connect 动作被信号中断,重试connect
        std::cout << "connecting interruptted by signal, try again." << std::endl;
        continue;
      } else if (errno == EINPROGRESS)
      {
                std::cout << "connecting..." << std::endl;
        //连接正在尝试中
        break;
      } else {
        // 连接出错,关闭connfd
        close(connfd);
        return -1;
      }
    }
  }
    // 使用poll函数判断socket是否可写,因为对于客户端connfd来说,connect的动作就是写操作
  pollfd event;
  event.fd = connfd;
  event.events = POLLOUT;
  int timeout = 3000;
  if (poll(&event, 1, timeout) != 1)
  {
    close(connfd);
    std::cout << "[poll] connect to server error." << std::endl;
    return -1;
  }
  if (!(event.revents & POLLOUT))
  {
    close(connfd);
    std::cout << "[POLLOUT] connect to server error." << std::endl;
    return -1;
  }
  int ret;
    socklen_t len = static_cast<socklen_t>(sizeof ret);
    // 使用 getsockopt 函数判断此时 connfd是否有错误
    if (::getsockopt(connfd, SOL_SOCKET, SO_ERROR, &ret, &len) < 0)
        return -1;
    if (ret == 0){
        std::cout << "connect to server successfully." << std::endl;
    } else{
        std::cout << "connect to server error." << std::endl;
    }
    // 当connfd为非阻塞时,即使对方的窗口太小导致发送失败,send函数也会立即返回;即使己方的接收缓冲区没有数据
    // recv函数也会立即返回,此时的返回值都是-1,错误码errno是:EWOULDBLOCK或EAGAIN
    // ret > 0  send或recv成功的字节数
    // ret == 0 对端关闭连接
    // ret < 0 对端Tcp窗口太小或者缓冲器无数据可读;或者出错或者被信号中断
    char* buf = new char[15*1024*1024];
    if (!buf){
        std::cout << "malloc failed!" << std::endl;
        return -1;
    }
    memset(buf, 0, 15*1024*1024);
    std::ifstream file;
    std::string fileName = "test.yuv";
    file.open (fileName.c_str(), std::ios::binary);
    // 获取filestr对应buffer对象的指针
    std::filebuf *pbuf = file.rdbuf();
    // 获取文件大小
    size_t fileSize = pbuf->pubseekoff (0,std::ios::end,std::ios::in);
    pbuf->sgetn (buf, fileSize);
    file.close();
    int total = fileSize;
    std::cout << "============== prepare total data.size [" << total << "] =============="<< std::endl;
  // 第3步:与服务端通信,发送一个报文后等待回复,然后再发下一个报文。
  for (int i = 0; i < 1; i++)
  {
        memset(buf, 0, 15*1024*1024);
    int sendBytes = 0, recvBytes = 0, ret, offset = 0;
        while (sendBytes < total)
        {
            ret = send(connfd, buf + offset, total-sendBytes, 0);
            if (ret == 0) // 向服务端发送请求报文。
            { 
                perror("send"); 
                break; 
            }else if (ret == ANET_ERR){
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    // std::cout << "send kernal buffer full, retrying..." << std::endl;
                    continue;
                } else {
                    // 连接出错,关闭connfd
                    close(connfd);
                    return -1;
                }
            }
            else
            {
                sendBytes += ret;
                offset += ret;
                // printf("已发送:%d 字节的数据\n", ret);
            }
        }
        std::cout << "============== send total [" << sendBytes <<" ] success. =============="<< std::endl;
        memset(buf, 0, 15*1024*1024);
    ret = 0;
        offset = 0;
        while (recvBytes < total)
        {
            ret = recv(connfd, buf+offset, total-recvBytes, 0);
            if (ret == 0) // 向服务端发送请求报文。
            { 
                perror("recv"); 
                std::cout << "other end close"<< std::endl;
                break; 
            }else if (ret == ANET_ERR){
                if (errno == EWOULDBLOCK || errno == EAGAIN)
                {
                    // std::cout << "recv kernal buffer full, retrying..." << std::endl;
                    continue;
                } else {
                    // 连接出错,关闭connfd
                    close(connfd);
                    return -1;
                }
            }
            else
            {
                recvBytes += ret;
                offset += ret;
                // printf("从服务端接收:%d 字节的数据\n", ret);
            }
        }
        std::cout << "###################### recv total [" << recvBytes <<" ] success. ######################"<< std::endl;
    // sleep(1);
  }
    delete buf;
  //5. 关闭socket
  close(connfd);
    return 0;
}


推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习:

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
17天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
17天前
|
程序员 开发者 Python
Python网络编程基础(Socket编程) 错误处理和异常处理的最佳实践
【4月更文挑战第11天】在网络编程中,错误处理和异常管理不仅是为了程序的健壮性,也是为了提供清晰的用户反馈以及优雅的故障恢复。在前面的章节中,我们讨论了如何使用`try-except`语句来处理网络错误。现在,我们将深入探讨错误处理和异常处理的最佳实践。
|
1月前
|
机器学习/深度学习 算法 PyTorch
RPN(Region Proposal Networks)候选区域网络算法解析(附PyTorch代码)
RPN(Region Proposal Networks)候选区域网络算法解析(附PyTorch代码)
240 1
|
17天前
|
存储 算法 Linux
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
40 6
|
4天前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
|
16天前
|
网络协议 Java API
Python网络编程基础(Socket编程)Twisted框架简介
【4月更文挑战第12天】在网络编程的实践中,除了使用基本的Socket API之外,还有许多高级的网络编程库可以帮助我们更高效地构建复杂和健壮的网络应用。这些库通常提供了异步IO、事件驱动、协议实现等高级功能,使得开发者能够专注于业务逻辑的实现,而不用过多关注底层的网络细节。
|
20天前
|
Python
Python网络编程基础(Socket编程)UDP服务器编程
【4月更文挑战第8天】Python UDP服务器编程使用socket库创建UDP套接字,绑定到特定地址(如localhost:8000),通过`recvfrom`接收客户端数据报,显示数据长度、地址和内容。无连接的UDP协议使得服务器无法主动发送数据,通常需应用层实现请求-响应机制。当完成时,用`close`关闭套接字。
|
1月前
|
机器学习/深度学习 PyTorch 算法框架/工具
卷积神经元网络中常用卷积核理解及基于Pytorch的实例应用(附完整代码)
卷积神经元网络中常用卷积核理解及基于Pytorch的实例应用(附完整代码)
20 0
|
1月前
|
机器学习/深度学习 数据采集 人工智能
m基于深度学习网络的手势识别系统matlab仿真,包含GUI界面
m基于深度学习网络的手势识别系统matlab仿真,包含GUI界面
43 0
|
1月前
|
机器学习/深度学习 算法 计算机视觉
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真