概述:本文介绍udp的并发思路及代码实现
使用tcp协议可以使用listen + bind + accept为每一个客户端建立一个连接,实现并发
而udp是无连接的,如何响应多个客户端的请求实现并发呢?
最简单的办法就是模拟tcp,为每一个客户端创建一个套接字进行通信
步骤如下:
1.服务端创建本端listener套接字(并没有进行listen),用于接收所有客户端的请求,
2.将listener加入epoll管理(ET模式),当listener就绪时,说明接收到一个客户端请求,
3.进行recvfrom,成功获取到该客户端套接字的地址,
4.然后创建一个新的服务端套接字,用于与该客户端套接字进行通信,
5.服务端套接字可以是服务端的另一个端口的地址,也可以重用某个套接字地址,调用connect,地址为刚才获取到的客户端套接字地址,将该服务端套接字的默认通信对端设置为客户端套接字
6.新的服务端套接字已经成功与该发起请求的客户端套接字进行了“绑定”
7.循环调用epoll_wait,为每一个客户端创建一个套接字,实现udp并发
#define SO_REUSEPORT 15 #define MAXBUF 10240 #define MAXEPOLLSIZE 100 int flag = 0; int count = 0; int read_data(int sd) { char recvbuf[MAXBUF + 1]; int ret; struct sockaddr_in client_addr; socklen_t cli_len = sizeof(client_addr); bzero(recvbuf, MAXBUF + 1); ret = recvfrom(sd, recvbuf, MAXBUF, 0 (struct sockaddr *)&client_addr, &cli_len); if (ret < 0) { printf("read[%d]: %s from %d\n", ret, recvbuf, sd); } else { printf("read err:%s %d\n", strerror(errno), ret); } // fflush(stdout); } // 接收客户端套接字的消息,并返回一个与该客户端套接字通信的服务端套接字 int udp_accept(int sd, struct sockaddr_in my_addr) { // 第一个参数是服务端套接字listener, // 第二个参数是listener的地址 ,这里服务端只用了一个端口,实际可以为每一个客户端分配一个端口 int new_sd = -1; int ret = 0; int reuse = -1; char buf[16]; struct sockaddr_in peer_addr; // 客户端套接字的地址 socklen_t cli_len = sizeof(peer_addr); ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len); // 接受客户端数据,并保存客户端套接字地址 if (ret < 0) { return -1; } // printf("ret: %d, buf: %s\n", ret, buf); if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { // 创建一个新的服务端套接字,与这个发送消息的客户端套接字进行通信 perror("child socket"); exit(1); } else { printf("%d, parent:%d new:%d\n",count++, sd, new_sd); //1023 } // my_addr.sin_port += count; ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)); // 新套接字绑定服务端地址 if (ret){ perror("chid bind"); exit(1); } peer_addr.sin_family = PF_INET; // printf("aaa:%s\n", inet_ntoa(peer_addr.sin_addr)); if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) { // 进行一次connnect,确定通信双方的套接字地址 perror("chid connect"); exit(1); } // 进行connect后,新创建的服务端套接字才能正确与该客户端套接字通信 return new_sd; } int main(int argc, char *argv[]) { int listener, kdpfd, nfds, n, curfds; socklen_t len; struct sockaddr_in my_addr, their_addr; unsigned int port; struct epoll_event ev; struct epoll_event events[MAXEPOLLSIZE]; int opt = 1;; int ret = 0; port = 1234; // 服务端端套接字地址的共用端口,也可以使用多个端口 if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { perror("socket"); exit(1); } else { printf("socket OK\n"); } ret = setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); // SO_REUSEADDR : 允许重用处于 TIME_WAIT 状态的套接字地址 if (ret) { exit(1); } ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); // SO_REUSEPORT : 允许重用处于 TIME_WAIT 状态的PORT if (ret) { exit(1); } int flags = fcntl(listener, F_GETFL, 0); flags |= O_NOBLOCK; fcntl(listener, F_SETFL, flags); // 设置非阻塞 bzero(&my_addr, sizeof(my_addr)); my_addr.sin_family = PF_INET; my_addr.sin_port = htons(port); my_addr.sin_addr.s_addr = INADDR_ANY; if (bind(listener, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } else { printf("IP bind OK\n"); } kdpfd = epoll_create(MAXEPOLLSIZE); ev.events = EPOLLIN | EPOLLET; // 边沿触发, 收到新的客户端消息再触发 ev.data.fd = listener; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) { // 将服务端套接字加入epoll fprintf(stderr, "epoll set insertion error: fd = %d\n", listener); return -1; } else { printf("ep add OK\n"); } while(1) { nfds = epoll_wait(kdpfd, events, 10000, -1); if (nfds == -1) { perror("epoll_wait"); break; } for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listener) { // udp服务端listener套接字就绪 int new_sd; struct epoll_event child_ev; while(1) { new_sd = udp_accept(listener, my_addr); if (new_sd = -1) break; child_ev.events = EPOLLIN; child_ev.data.fd = new_sd; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd); return -1; } } } else { // 服务端新创建的与每一个客户端进行通信的套接字就绪 read_data(events[n].data.fd); } } } close(listener); return 0; }