UDP构建服务器 x
此处我们也是对UDP(User Datagram Protocol 用户数据报协议)有一个直观的认识; 后面再详细讨论.
传输层协议
无连接
不可靠传输
面向数据报
网络字节序
我们已经知道,内存中的多字节数据相对于内存地址有大端和小端之分, 磁盘文件中的多字节数据相对于文件中的偏
移地址也有大端小端之分, 网络数据流同样有大端小端之分. 那么如何定义网络数据流的地址呢?
发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出;
- 接收主机把从网络上接到的字节依次保存在接收缓冲区中,也是按内存地址从低到高的顺序保存;
因此,网络数据流的地址应这样规定:先发出的数据是低地址,后发出的数据是高地址.
TCP/IP协议规定,网络数据流应采用大端字节序,即低地址高字节.
不管这台主机是大端机还是小端机, 都会按照这个TCP/IP规定的网络字节序来发送/接收数据;
如果当前发送主机是小端, 就需要先将数据转成大端; 否则就忽略, 直接发送即可;
#include <arpa/inet.h> uint32_t htonl(uint32_t hostlong); uint16_t htons(uint16_t hostshort); uint32_t ntohl(uint32_t netlong); uint16_t ntohs(uint16_t netshort);
这些函数名很好记,h表示host(本地),n表示network(网络),l表示32位长整数,s表示16位短整数。
例如htonl表示将32位的长整数从主机字节序转换为网络字节序,例如将IP地址转换后准备发送。
如果主机是小端字节序,这些函数将参数做相应的大小端转换然后返回;
如果主机是大端字节序,这些 函数不做转换,将参数原封不动地返回 。
socket编程接口
``// 创建 socket 文件描述符 (TCP/UDP, 客户端 + 服务器) int socket(int domain, int type, int protocol);`
// 绑定端口号 (TCP/UDP, 服务器)
int bind(int socket, const struct sockaddr *address,socklen_t address_len);*
// 开始监听socket (TCP, 服务器)
int listen(int socket, int backlog);
// 接收请求 (TCP, 服务器)
int accept(int socket, struct sockaddr* address,socklen_t* address_len);
// 建立连接 (TCP, 客户端)
int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
sockaddr结构
- Pv4和IPv6的地址格式定义在netinet/in.h中,IPv4地址用sockaddr_in结构体表示,包括16位地址类型, 16位端口号和32位IP地址.1
- IPv4、 IPv6地址类型分别定义为常数AF_INET、 AF_INET6. 这样,只要取得某种sockaddr结构体的首地址,不需要知道具体是哪种类型的sockaddr结构体,就可以根据地址类型字段确定结构体中的内容.
socket API可以都用struct sockaddr *类型表示, 在使用的时候需要强制转化成sockaddr_in; 这样的好处是程序的通用性, 可以接收IPv4, IPv6, 以及UNIX Domain Socket各种类型的sockaddr结构体指针做为参数;
- 有sockaddr 结构,sockaddr_in 结构, sockaddr_un 结构
sockaddr_in 结构 用于网络中通讯。
sockaddr_un 结构 用于本地进程间的通信。
我们应该要知道的是,sockaddr 结构,sockaddr_in 结构, sockaddr_un 结构,构成了多态
sockaddr 结构是基类,sockaddr_in 结构, sockaddr_un 结构是衍生类。
我们现在只需要了解sockaddr_in 结构。
构建服务器
初始化服务器
1 .创建socket接口,打开网络文件
2 .给服务器指明IP 和 PORT
3 .bind 将 sockaddr 套接字字段和 网络文件描述符 进行绑定
代码:
void Init() { //1. 创建socket接口,打开网络文件 _sock=socket(AF_INET,SOCK_DGRAM,0); if(_sock==-1) { std::cerr<<"socket error"<<strerror(errno)<<std::endl; exit(SOCKET_ERR); } std::cout<<"creat socket success:"<<_sock<<std::endl; //2.给服务器指明IP 和 PORT struct sockaddr_in local; memset(&local,0,sizeof(local)); local.sin_family=AF_INET; //端口号和IP地址都需要通过网络发送给目标,所以需要转换成网络序列 local.sin_port=htons(_port); //字符串风格的IP需要转换成4字节int ,1.1.1.1 -》》int local.sin_addr.s_addr=INADDR_ANY;//char* 类型的 c_str(); //此时的 local 还只是一个临时变量 // bind 将 addr套接字字段和 网络文件描述符 进行绑定 if(bind(_sock,(sockaddr*)&local,sizeof(local))==-1) { std::cerr<<"bind error"<<strerror(errno)<<std::endl; exit(BIND_ERR); } std::cout<<"bind success"<<std::endl; }
到这里,我们服务器的初始化就算成功了,接下来就是启动服务器了.
启动服务器
我们需要接收来自客户端的消息,并将消息进行处理后返回给客户端。
这就意味着,服务器需要有最基础的两项功能,
接收网络消息,并在网络上发送消息
- 定义一个缓冲区,里面用来存储接收的消息
- 我们同时也需要知道是谁给我发送的消息,定义一个sockaddr_in 结构,用于保存客户的IP和PORT
- 最后,将消息返回给客户端
代码:
void Start() { char buffer[1024]; while(true) { struct sockaddr_in client; socklen_t len=sizeof(client);//client是一个接收型参数,存储了给服务器发送消息的客户端的IP+端口号 int n=recvfrom(_sock,buffer,sizeof(buffer)-1,0,(sockaddr*)&client,&len); if(n>0) buffer[n]='\0'; else continue; std::string clientIp=inet_ntoa(client.sin_addr); int clientport=ntohs(client.sin_port); std::cout<<clientIp<<'-'<<clientport<<" "<<"client echo#"<<buffer<<std::endl; sendto(_sock,buffer,strlen(buffer),0,(sockaddr*)&client,len); } }
整个服务端的简易编写就此完成:
代码:
udp_server.hpp
#pragma once #include<sys/socket.h> #include<sys/types.h> #include<iostream> #include<cstring> #include<errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include"err.hpp" class UdpServer{ public: public: static const u_int16_t DEFAULT_PORT=8080; UdpServer(uint16_t port=DEFAULT_PORT):_port(port) { std::cout<<"PORT:"<<_port<<std::endl; } void Init() { //1. 创建socket接口,打开网络文件 _sock=socket(AF_INET,SOCK_DGRAM,0); if(_sock==-1) { std::cerr<<"socket error"<<strerror(errno)<<std::endl; exit(SOCKET_ERR); } std::cout<<"creat socket success:"<<_sock<<std::endl; //2.给服务器指明IP 和 PORT struct sockaddr_in local; memset(&local,0,sizeof(local)); local.sin_family=AF_INET; //端口号和IP地址都需要通过网络发送给目标,所以需要转换成网络序列 local.sin_port=htons(_port); //字符串风格的IP需要转换成4字节int ,1.1.1.1 -》》int local.sin_addr.s_addr=INADDR_ANY;//char* 类型的 c_str(); //此时的 local 还只是一个临时变量 // bind 将 addr套接字字段和 网络文件描述符 进行绑定 if(bind(_sock,(sockaddr*)&local,sizeof(local))==-1) { std::cerr<<"bind error"<<strerror(errno)<<std::endl; exit(BIND_ERR); } std::cout<<"bind success"<<std::endl; } void Start() { char buffer[1024]; while(true) { struct sockaddr_in client; socklen_t len=sizeof(client);//client是一个接收型参数,存储了给服务器发送消息的客户端的IP+端口号 int n=recvfrom(_sock,buffer,sizeof(buffer)-1,0,(sockaddr*)&client,&len); if(n>0) buffer[n]='\0'; else continue; std::string clientIp=inet_ntoa(client.sin_addr); int clientport=ntohs(client.sin_port); std::cout<<clientIp<<'-'<<clientport<<" "<<"client echo#"<<buffer<<std::endl; sendto(_sock,buffer,strlen(buffer),0,(sockaddr*)&client,len); } } private: int _sock; uint16_t _port; //std::string _ip;// 云服务器,或者一款服务器,一般不要指明某一个确定的IP, 让我们的udpserver在启动的时候,bind本主机上的任意IP };
udp_server.cc
#include"udp_server.hpp" #include<iostream> #include<memory> using namespace std; static void usage(string prc) { cout<<"Usage\n\t"<<prc<<"port\n"<<endl; } int main(int argc,char* argv[]) { if(argc!=2) { usage(argv[0]); exit(USAGE_ERR); } uint16_t port=atoi(argv[1]); unique_ptr<UdpServer> us(new UdpServer(port)); us->Init(); us->Start(); return 0; }
客户端简易实现
接下来我们进行客户端的编写,客户端的编写基本按照服务端的端口来进行
现阶段也是只需要向服务端发送消息,接收服务端的消息这两种功能
流程:
- 创建套接字
- 我们要向客户端发送消息,需要知道其IP和PORT,所以需要一个sockaddr_in结构,填充的是服务端的信息
- 然后就是发送消息和接收消息
但是客户端的编写有一些需要注意的地方
1.服务端的端口号和IP地址都sh是需要我们自己绑定的,那么客户端需要吗?
答:客户端也是需要绑定的,但是不需要我们自己手动绑定,而是OS帮我们进行操作。
原因:client的port要随机让OS分配防止client出现启动冲突(例如:我们的手机终端上有很多的软件(客户端),如果自己bind,会可能导致某些服务的端口号被占用,无法启动)
2.服务端为什么需要自己绑定?
原因:
1. server的端口不能随意改变,众所周知且不能随意改变的
2. 同一家公司的port号需要统一规范化
代码:
udp_client.cc
#include<sys/socket.h> #include<sys/types.h> #include<iostream> #include<cstring> #include<errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include"err.hpp" using namespace std; //127.0.0.1(本地回环) 表示的是当前主机,用于进行本地通讯或者测试 static void Usage(string prc) { cout<<"Usage\t\n"<<prc<<" serverip serverport\n"<<endl; } int main(int argc,char* argv[])//./server 目标IP 目标PORT { if(argc!=3) { Usage(argv[0]); exit(USAGE_ERR); } string serverip=argv[1]; uint16_t serverport=atoi(argv[2]); int sock=socket(AF_INET,SOCK_DGRAM,0); if(sock<0) { cerr<<"socket fail"<<strerror(errno)<<endl; exit(SOCKET_ERR); } struct sockaddr_in server;//目标服务器的IP+PORT memset(&server,0,sizeof(server)); server.sin_family=AF_INET; server.sin_port=htons(serverport); server.sin_addr.s_addr=inet_addr(serverip.c_str()); while(true) { char buffer[1024]; cout<<"请输入#"; cin>>buffer; //1. client 这里要不要bind呢?要的!socket通信的本质[clientip:clientport, serverip:serverport] // 2.client不需要自己bind,也不要自己bind,操作系统自动给我们进行bind -- 为什么?client的port要随机让OS分配防止client出现启动冲突(我们的手机终端上有很多的软件(客户端),如果自己bind,会可能导致某些服务的端口号被占用,无法启动) // 3.server 为什么要自己bind?1. server的端口不能随意改变,众所周知且不能随意改变的 2. 同一家公司的port号需要统一规范化 sendto(sock,buffer,strlen(buffer),0,(sockaddr*)&server,sizeof(server)); char Rebuffer[1024]; sockaddr_in tmp; socklen_t len=sizeof(tmp); int n=recvfrom(sock,Rebuffer,sizeof(Rebuffer)-1,0,(sockaddr*)&tmp,&len); if(n>0) Rebuffer[n]='\0'; std::cout<<serverip<<'-'<<serverport<<" "<<"server echo#"<<Rebuffer<<std::endl; } return 0; }
好需要对错误码进行处理:
err.hpp
#pragma once enum ERR{ SOCKET_ERR=1, BIND_ERR, USAGE_ERR };
在服务器中,我们实现了一个简单的echo型服务器,就不对消息进行处理,只是进行将消息简单的返回。
实现效果
构建一个群聊系统
在之前建设的服务器基础上,进行加工处理,便可以实现一个简易的群聊功能
我们可以这么理解群聊:
就是你把自己的消息发送给服务器,服务器在转发给每一个在群聊中的人
所以我们需要做如下改动:
- 需要一个储存客户消息的容器,把连接过来的客户都sockaddr_in结构保存
- 还需要一个缓冲区,保存发送过来的消息
- 同时需要注意的是,不同的客户来对同一个缓冲区发送消息,会有多线程并发的问题,需要加锁
- 服务器也需要进行多线程化,因为单一进程会阻塞在接收或者发送消息上
组件:
RingQueue.hpp
#pragma once #include<pthread.h> #include<semaphore.h> #include<vector> const int DEFAULT_CAP=10; template<class T> class RingQueue{ public: RingQueue(int num=DEFAULT_CAP):_cap(num),_rq(num),_consume_step(0),_produce_step(0) { sem_init(&_consume_sem,0,0); sem_init(&_produce_sem,0,_cap); } ~RingQueue() { sem_destroy(&_consume_sem); sem_destroy(&_produce_sem); } void lock(pthread_mutex_t& m) { pthread_mutex_lock(&m); } void unlock(pthread_mutex_t& m) { pthread_mutex_unlock(&m); } void P(sem_t& s) { sem_wait(&s); } void V(sem_t& s) { sem_post(&s); } void push(const T& in) { P(_produce_sem); lock(_produce_mutex); _rq[_produce_step++]=in; _produce_step%=_cap; unlock(_produce_mutex); V(_consume_sem); } void pop(T* out) { P(_consume_sem); lock(_consume_mutex); *out=_rq[_consume_step++]; _consume_step%=_cap; unlock(_consume_mutex); V(_produce_sem); } private: std::vector<T> _rq; int _cap; //消费者和生产者之间的同步关系通过信号量进行调节 sem_t _consume_sem; sem_t _produce_sem; //当多线程运行时,生产者和生产者,消费锁者和消费者之间的互斥关系需要通过锁来维持 pthread_mutex_t _consume_mutex; pthread_mutex_t _produce_mutex; int _consume_step; int _produce_step; };
Thread.hpp
#pragma once #include <pthread.h> #include <iostream> #include <string> #include <string.h> #include <functional> using namespace std; string to_hex(uint64_t n) { char arr[32]; snprintf(arr, sizeof(arr), "0x%x", n); return arr; } class Thread { public: // typedef void(*func_t)(); using func_t = std::function<void()>; typedef enum { NEW = 0, RUNNING, EXISTED } ThreadStatus; Thread(int num, func_t func) : _tid(0), _func(func) { char name[108]; snprintf(name, sizeof(name), "thread-%d", num); _name = name; _status = NEW; } static void *runHelper(void *args) { Thread *ts = (Thread *)args; ts->_func(); return nullptr; } void run() { int n = pthread_create(&_tid, nullptr, runHelper, this); if (n != 0) { exit(1); } _status = RUNNING; } int get_status() { return _status; } void join() { int n = pthread_join(_tid, nullptr); if (n != 0) exit(2); cout << _name << " " << "has joined" << endl; _status = EXISTED; } const string &get_name() { return _name; } string get_id() { return to_hex(_tid); } // pthread_t get_id() // { // return _tid; // } ~Thread() { } private: pthread_t _tid; string _name; ThreadStatus _status; func_t _func; // void* _args;//函数调用时需要传的参数 };
代码:
udp_server.hpp
#pragma once #include <sys/socket.h> #include <sys/types.h> #include <iostream> #include <cstring> #include <errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include <functional> #include <unordered_map> #include <pthread.h> #include "RingQueue.hpp" #include "LockGuard.hpp" #include "Thread.hpp" #include "err.hpp" using func_t = std::function<std::string(std::string)>; class UdpServer { public: public: static const u_int16_t DEFAULT_PORT = 8080; UdpServer(uint16_t port = DEFAULT_PORT) : _port(port) { std::cout << "PORT:" << _port << std::endl; pthread_mutex_init(&_lock, nullptr); // p=new Thread(1,); p = new Thread(1, std::bind(&UdpServer::Receive, this)); c = new Thread(1, std::bind(&UdpServer::send, this)); } void Start() { // 1. 创建socket接口,打开网络文件 _sock = socket(AF_INET, SOCK_DGRAM, 0); if (_sock == -1) { std::cerr << "socket error" << strerror(errno) << std::endl; exit(SOCKET_ERR); } std::cout << "creat socket success:" << _sock << std::endl; // 2.给服务器指明IP 和 PORT struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; // 端口号和IP地址都需要通过网络发送给目标,所以需要转换成网络序列 local.sin_port = htons(_port); // 字符串风格的IP需要转换成4字节int ,1.1.1.1 -》》int local.sin_addr.s_addr = INADDR_ANY; // char* 类型的 c_str(); // 此时的 local 还只是一个临时变量 // bind 将 addr套接字字段和 网络文件描述符 进行绑定 if (bind(_sock, (sockaddr *)&local, sizeof(local)) == -1) { std::cerr << "bind error" << strerror(errno) << std::endl; exit(BIND_ERR); } std::cout << "bind success" << std::endl; p->run(); c->run(); } void Add_user(std::string ip, int port, struct sockaddr_in user) { std::string name = ip + " + " + to_string(port); LockGuard lockguard(&_lock); _online[name] = user; } void Receive() { char buffer[1024]; while (true) { struct sockaddr_in client; socklen_t len = sizeof(client); // client是一个接收型参数,存储了给服务器发送消息的客户端的IP+端口号 int n = recvfrom(_sock, buffer, sizeof(buffer) - 1, 0, (sockaddr *)&client, &len); if (n > 0) buffer[n] = '\0'; else continue; std::string clientIp = inet_ntoa(client.sin_addr); int clientport = ntohs(client.sin_port); Add_user(clientIp, clientport, client); _rq.push(buffer); std::cout << clientIp << '-' << clientport << " "<< "client echo#" << buffer << std::endl; } } void send() { while (true) { std::string message; _rq.pop(&message); LockGuard lockguard(&_lock); //访问临界资源,需要锁 for (auto &user : _online) { sendto(_sock, message.c_str(), strlen(message.c_str()), 0, (sockaddr *)&user.second, sizeof(user.second)); } } } ~UdpServer() { pthread_mutex_destroy(&_lock); p->join(); c->join(); } private: int _sock; uint16_t _port; func_t _server; std::unordered_map<std::string, sockaddr_in> _online; RingQueue<std::string> _rq; pthread_mutex_t _lock; Thread *p; Thread *c; // std::string _ip;// 云服务器,或者一款服务器,一般不要指明某一个确定的IP, 让我们的udpserver在启动的时候,bind本主机上的任意IP };
udp_server.cc
#include "udp_server.hpp" #include <iostream> #include <memory> using namespace std; static void usage(string prc) { cout << "Usage\n\t" << prc << "port\n" << endl; } bool isPass(const string &command) { bool pass = true; auto pos = command.find("rm"); if (pos != std::string::npos) pass = false; pos = command.find("mv"); if (pos != std::string::npos) pass = false; pos = command.find("while"); if (pos != std::string::npos) pass = false; pos = command.find("kill"); if (pos != std::string::npos) pass = false; return pass; } string excuteCommand(const string &s) { // 可能有些人会传递一些比较恶劣的代码,如rm ,所以我们需要进行安全检查 if (!isPass(s)) return "you are a bad man!"; FILE *fp = popen((s.c_str()), "r"); if (fp == NULL) return "None"; // 获取结果 string result; char line[2048]; while (fgets(line, sizeof(line), fp) != NULL) { result += line; } pclose(fp); return result; } int main(int argc, char *argv[]) { if (argc != 2) { usage(argv[0]); exit(USAGE_ERR); } uint16_t port = atoi(argv[1]); // unique_ptr<UdpServer> us(new UdpServer(excuteCommand,port)); unique_ptr<UdpServer> us(new UdpServer(port)); us->Start(); return 0; }
udp_client.cc
#include <iostream> #include <bits/stdc++.h> #include <sys/socket.h> #include <sys/types.h> #include <string> #include <errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include <functional> #include "Thread.hpp" #include "err.hpp" #include <cstring> using namespace std; // 127.0.0.1(本地回环) 表示的是当前主机,用于进行本地通讯或者测试 static void Usage(string prc) { cout << "Usage\t\n" << prc << " serverip serverport\n" << endl; } void *recver(void *args) { int sock = *(static_cast<int *>(args)); while (true) { // 接受 char buffer[2048]; struct sockaddr_in temp; socklen_t len = sizeof(temp); int n = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&temp, &len); if (n > 0) { buffer[n] = 0; std::cout << buffer << std::endl; // 1 } } } int main(int argc, char *argv[]) //./server 目标IP 目标PORT { if (argc != 3) { Usage(argv[0]); exit(USAGE_ERR); } string serverip = argv[1]; uint16_t serverport = atoi(argv[2]); int sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { cerr << "socket fail" << strerror(errno) << endl; exit(SOCKET_ERR); } struct sockaddr_in server; // 目标服务器的IP+PORT memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverport); server.sin_addr.s_addr = inet_addr(serverip.c_str()); pthread_t tid; pthread_create(&tid, nullptr, recver, &sock); while (true) { char buffer[1024]; cout << "请输入#"; // cin>>buffer; cin.getline(buffer, 1024); // 1. client 这里要不要bind呢?要的!socket通信的本质[clientip:clientport, serverip:serverport] // 2.client不需要自己bind,也不要自己bind,操作系统自动给我们进行bind -- 为什么?client的port要随机让OS分配防止client出现启动冲突(我们的手机终端上有很多的软件(客户端),如果自己bind,会可能导致某些服务的端口号被占用,无法启动) // 3.server 为什么要自己bind?1. server的端口不能随意改变,众所周知且不能随意改变的 2. 同一家公司的port号需要统一规范化 sendto(sock, buffer, strlen(buffer), 0, (sockaddr *)&server, sizeof(server)); } return 0; }
展示效果: