2高并发服务器:多路IO之poll

简介:  1 poll A 依赖的头文件 #include <poll.h> B 函数声明 int poll(struct pollfd *fds, nfds_t nfds,int timeout);   struct pollfd { int fd; /* 文件描述符*/ short events; /* 监控的事件*


1 poll

A 依赖的头文件

#include <poll.h>

B 函数声明

int poll(struct pollfd *fds, nfds_t nfds,int timeout);

 

struct pollfd {

int fd; /* 文件描述符*/

short events; /* 监控的事件*/

short revents; /* 监控事件中满足条件返回的事件*/

};

POLLIN普通或带外优先数据可读,POLLRDNORM |POLLRDBAND

POLLRDNORM-数据可读

POLLRDBAND-优先级带数据可读

POLLPRI 高优先级可读数据

 

POLLOUT普通或带外数据可写

POLLWRNORM-数据可写

POLLWRBAND-优先级带数据可写

 

POLLERR 发生错误

POLLHUP 发生挂起

POLLNVAL 描述字不是一个打开的文件

 

nfds监控数据中有多少文件描述符需要被监控

timeout 毫秒级等待

-1:阻塞等,#defineINFTIM -1 Linux中没有定义此宏

0 :立即返回,不阻塞进程

>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

 

如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1poll不再监控此pollfd,下次返回时,把revents设置为0

    ppollGNU定义了ppoll(POSIX标准),可以支持设置信号屏蔽字,大家可参考poll模型自行实现C/S

#define _GNU_SOURCE /* Seefeature_test_macros(7) */

#include <poll.h>

int ppoll(struct pollfd *fds, nfds_t nfds,

const struct timespec *timeout_ts, constsigset_t *sigmask);

 

案例说明:

Server.c

#include<stdio.h>

#include<stdlib.h>

#include<string.h>

#include<netinet/in.h>

#include<arpa/inet.h>

#include<poll.h>

#include<errno.h>

#include<ctype.h>

#include"wrap.h"

 

#define MAXLINE 80

#define SERV_PORT 8000

#define OPEN_MAX 1024

 

int main(void)

{

    int i, j, maxi, listenfd, connfd, sockfd;

    int nready;

    ssize_t n;

    char buf[MAXLINE], str[INET_ADDRSTRLEN];

    socklen_t clilen;

    struct pollfd client[OPEN_MAX];

    struct sockaddr_in cliaddr, servaddr;

    listenfd = Socket(AF_INET, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));

    servaddr.sin_family = AF_INET;

    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    servaddr.sin_port = htons(SERV_PORT);

    Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

    Listen(listenfd, 20);

    client[0].fd = listenfd;

    client[0].events = POLLRDNORM; /* listenfd监听普通读事件*/

    for (i = 1; i < OPEN_MAX; i++)

        client[i].fd = -1; /* -1初始化client[]里剩下元素*/

    maxi = 0; /* client[]数组有效元素中最大元素下标*/

    for ( ; ; ) {

        nready = poll(client, maxi+1, -1); /* 阻塞*/

        if (client[0].revents & POLLRDNORM) { /* 有客户端链接请求*/

            clilen = sizeof(cliaddr);

            connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);

            printf("received from %s at PORT %d\n",

                    inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),

                    ntohs(cliaddr.sin_port));

            for (i = 1; i < OPEN_MAX; i++)

                if (client[i].fd < 0) {

                    client[i].fd = connfd; /* 找到client[]中空闲的位置,存放accept返回的connfd */

                    break;

                }

            if (i == OPEN_MAX)

                perr_exit("too many clients");

            client[i].events = POLLRDNORM; /* 设置刚刚返回的connfd,监控读事件*/

            if (i > maxi)

                maxi = i; /* 更新client[]中最大元素下标*/

            if (--nready <= 0)

                continue; /* 没有更多就绪事件时,继续回到poll阻塞*/

        }

        for (i = 1; i <= maxi; i++) { /* 检测client[] */

            if ( (sockfd = client[i].fd) < 0)

                continue;

            if (client[i].revents & (POLLRDNORM | POLLERR)) {

                if ( (n = Read(sockfd, buf, MAXLINE)) < 0) {

                    if (errno == ECONNRESET) { /* 当收到RST标志时*/

                        /* connection reset by client */

                        printf("client[%d] aborted connection\n", i);

                        Close(sockfd);

                        client[i].fd = -1;

                    } else

                        perr_exit("read error");

                } else if (n == 0) {

                    /* connection closed by client */

                    printf("client[%d] closed connection\n", i);

                    Close(sockfd);

                    client[i].fd = -1;

                } else {

                    for (j = 0; j < n; j++)

                        buf[j] = toupper(buf[j]);

                    Writen(sockfd, buf, n);

                }

                if (--nready <= 0)

                    break; /* no more readable descriptors */

            }

        }

    }

    return 0;

}

Client.c

#include<stdio.h>

#include<string.h>

#include<unistd.h>

#include<arpa/inet.h>

#include<netinet/in.h>

#include"wrap.h"

 

#define MAXLINE 80

#define SERV_PORT 8000

 

int main(void)

{

    struct sockaddr_in servaddr;

    char buf[MAXLINE];

    int sockfd,n;

 

    sockfd = Socket(AF_INET,SOCK_STREAM,0);

 

    bzero(&servaddr,sizeof(servaddr));

    servaddr.sin_family = AF_INET;

    inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);

    servaddr.sin_port = htons(SERV_PORT);

 

    Connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));

 

    while(fgets(buf,MAXLINE,stdin) != NULL) {

        Write(sockfd,buf,strlen(buf));

        n = Read(sockfd,buf,MAXLINE);

        if(n==0) {

            printf("the other side has been closed\n");

        } else {

            Write(STDOUT_FILENO,buf,n);

        }

    }

 

    Close(sockfd);

    return 0;

}

Wrap.h

#ifndef __WRAP_H_

#define __WRAP_H_

 

void perr_exit(const char *s);

int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);

void Bind(int fd, const struct sockaddr *sa, socklen_t salen);

void Connect(int fd, const struct sockaddr *sa, socklen_t salen);

void Listen(int fd, int backlog);

int Socket(int family, int type, int protocol);

ssize_t Read(int fd, void *ptr, size_t nbytes);

ssize_t Write(int fd, const void *ptr, size_t nbytes);

void Close(int fd);

ssize_t Readn(int fd, void *vptr, size_t n);

ssize_t Writen(int fd, const void *vptr, size_t n);

static ssize_t my_read(int fd, char *ptr);

ssize_t Readline(int fd, void *vptr, size_t maxlen);

 

#endif

Wrap.c

#include <stdlib.h>

#include <errno.h>

#include <sys/socket.h>

 

void perr_exit(const char *s)

{

         perror(s);

         exit(1);

}

 

int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)

{

         int n;

 

again:

         if ( (n = accept(fd, sa, salenptr)) < 0) {

                   if ((errno == ECONNABORTED) || (errno == EINTR))

                            goto again;

                   else

                            perr_exit("accept error");

         }

         return n;

}

 

void Bind(int fd, const struct sockaddr *sa, socklen_t salen)

{

         if (bind(fd, sa, salen) < 0)

                   perr_exit("bind error");

}

 

void Connect(int fd, const struct sockaddr *sa, socklen_t salen)

{

         if (connect(fd, sa, salen) < 0)

                   perr_exit("connect error");

}

 

void Listen(int fd, int backlog)

{

         if (listen(fd, backlog) < 0)

                   perr_exit("listen error");

}

 

int Socket(int family, int type, int protocol)

{

         int n;

 

         if ( (n = socket(family, type, protocol)) < 0)

                   perr_exit("socket error");

         return n;

}

 

ssize_t Read(int fd, void *ptr, size_t nbytes)

{

         ssize_t n;

 

again:

         if ( (n = read(fd, ptr, nbytes)) == -1) {

                   if (errno == EINTR)

                            goto again;

                   else

                            return -1;

         }

         return n;

}

 

ssize_t Write(int fd, const void *ptr, size_t nbytes)

{

         ssize_t n;

 

again:

         if ( (n = write(fd, ptr, nbytes)) == -1) {

                   if (errno == EINTR)

                            goto again;

                   else

                            return -1;

         }

         return n;

}

 

void Close(int fd)

{

         if (close(fd) == -1)

                   perr_exit("close error");

}

ssize_t Readn(int fd, void *vptr, size_t n)

{

         size_t  nleft;

         ssize_t nread;

         char   *ptr;

 

         ptr = vptr;

         nleft = n;

         while (nleft > 0) {

                   if ( (nread = read(fd, ptr, nleft)) < 0) {

                            if (errno == EINTR)

                                     nread = 0;

                            else

                                     return -1;

                   } else if (nread == 0)

                            break;

 

                   nleft -= nread;

                   ptr += nread;

         }

         return n - nleft;

}

 

ssize_t Writen(int fd, const void *vptr, size_t n)

{

         size_t nleft;

         ssize_t nwritten;

         const char *ptr;

 

         ptr = vptr;

         nleft = n;

         while (nleft > 0) {

                   if ( (nwritten = write(fd, ptr, nleft)) <= 0) {

                            if (nwritten < 0 && errno == EINTR)

                                     nwritten = 0;

                            else

                                     return -1;

                   }

 

                   nleft -= nwritten;

                   ptr += nwritten;

         }

         return n;

}

static ssize_t my_read(int fd, char *ptr)

{

         static int read_cnt;

         static char *read_ptr;

         static char read_buf[100];

 

         if (read_cnt <= 0) {

again:

                   if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {

                            if (errno == EINTR)

                                     goto again;

                            return -1;

                   } else if (read_cnt == 0)

                            return 0;

                   read_ptr = read_buf;

         }

         read_cnt--;

         *ptr = *read_ptr++;

         return 1;

}

 

ssize_t Readline(int fd, void *vptr, size_t maxlen)

{

         ssize_t n, rc;

         char    c, *ptr;

 

         ptr = vptr;

         for (n = 1; n < maxlen; n++) {

                   if ( (rc = my_read(fd, &c)) == 1) {

                            *ptr++ = c;

                            if (c  == '\n')

                                     break;

                   } else if (rc == 0) {

                            *ptr = 0;

                            return n - 1;

                   } else

                            return -1;

         }

         *ptr  = 0;

         return n;

}

 

目录
相关文章
|
2月前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
82 2
|
2月前
|
存储 网络协议
TCP服务器 IO多路复用的实现:select、poll、epoll
TCP服务器 IO多路复用的实现:select、poll、epoll
52 0
|
2月前
|
API C++
socket编程之常用api介绍与socket、select、poll、epoll高并发服务器模型代码实现(1)
前言   本文旨在学习socket网络编程这一块的内容,epoll是重中之重,后续文章写reactor模型是建立在epoll之上的。
58 0
|
2月前
|
监控 安全 Linux
socket编程之常用api介绍与socket、select、poll、epoll高并发服务器模型代码实现(3)
高并发服务器模型-poll poll介绍   poll跟select类似, 监控多路IO, 但poll不能跨平台。其实poll就是把select三个文件描述符集合变成一个集合了。
51 0
|
2月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
326 2
|
2月前
|
前端开发 Java API
构建异步高并发服务器:Netty与Spring Boot的完美结合
构建异步高并发服务器:Netty与Spring Boot的完美结合
|
2月前
|
消息中间件 Java Linux
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
|
1月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
110 0
|
26天前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
32 1