四、线程池版TCP网络程序
单纯多线程存在的问题
- 每当有新连接到来时,服务端的主线程都会重新为该客户端创建为其提供服务的新线程,而当服务结束后又会将该新线程销毁。这样不仅麻烦,且效率低下,每当连接到来的时候服务端才创建对应提供服务的线程
- 若有大量的客户端连接请求,此时服务端要为每一个客户端创建对应的服务线程。计算机中的线程越多,CPU的压力就越大,因为CPU要不断在这些线程之间来回切换,此时CPU在调度线程的时候,线程和线程之间切换的成本就会变得很高。此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也体验也会变差
解决方案
- 可以在服务端预先创建一批线程,当有客户端请求连接时就让这些线程为客户端提供服务,此时客户端一来就有线程为其提供服务,而不是当客户端来了才创建对应的服务线程
- 当某个线程为客户端提供完服务后,不要让该线程退出,而是让该线程继续为下一个客户端提供服务,若当前没有客户端连接请求,则可以让该线程先进入休眠状态,当有客户端连接到来时再将该线程唤醒
- 服务端创建的这一批线程的数量不能太多,CPU的压力也就不会太大。此外,若有客户端连接到来,但此时这一批线程都在给其他客户端提供服务,这时服务端不应该再创建线程,而应该让这个新来的连接请求在全连接队列进行排队,等服务端这一批线程中有空闲线程后,再将该连接请求获取上来并为其提供服务
引入线程池
要解决问题就需在服务端引入线程池,线程池的存在就是为了避免处理短时间任务时创建与销毁线程的代价,还能够保证内核充分利用,防止过分调度(调度周期过长)
在线程池中存在一个任务队列,当有新的任务到来的时候,就可以将任务Push到线程池中,在线程池中默认创建10个线程,这些线程不断检测任务队列中是否有任务,若有任务就取出任务,然后调用该任务对应的Run函数对该任务进行处理,若线程池中没有任务当前线程就会进入休眠状态
下面直接将线程池的代码接入到当前的TCP服务器,下面只会讲解线程池接入的方法,若对线程池的实现有疑问的可以去阅读博主的博客《理解与实现线程池》
服务类新增线程池成员
服务端引入线程池,因此在服务类中需要新增一个指向线程池的指针成员:
- 在构造线程池对象时可以指定线程池中线程的个数,此时默认线程的个数为10
- 构造线程池时,线程池中的若干线程就会创建出来,而这些线程创建出来后就会不断检测任务队列,从任务队列中取出任务进行处理
- 当服务进程调用accept函数获取到一个连接请求后,就会根据该客户端的套接字、IP地址以及端口号构建出一个任务,然后调用线程池提供的Push接口将该任务塞入任务队列
实际上就是一个生产者消费者模型,其中服务进程就作为了任务的生产者,而后端线程池中的若干线程就不断从任务队列当中获取任务进行处理,承担的就是消费者的角色,其中生产者和消费者的交易场所就是线程池中的任务队列
class TcpServer { public: TcpServer(uint16_t port):_socket_listen_fd(-1),_server_port(port),_thread_pool(ThreadPool<Task>::GetThreadPool()) {} void InitServer(); void StartUp(); static void* HandlerClient(void*); static void Service(int, string, uint16_t); ~TcpServer(); private: int _socket_listen_fd; uint16_t _server_port; unique_ptr<ThreadPool<Task>> _thread_pool; };
void TcpServer::StartUp() { _thread_pool->Run();//启动线程池 while(true) { //获取连接 struct sockaddr_in foreign; memset(&foreign, '\0', sizeof foreign); socklen_t length = sizeof foreign; int server_socket_fd = accept(_socket_listen_fd, (struct sockaddr*)&foreign, &length); if(server_socket_fd < 0) { cerr << "accept fail" << endl; continue; } string client_ip = inet_ntoa(foreign.sin_addr); uint16_t client_port = ntohs(foreign.sin_port); cout << "New Link: [" << server_socket_fd << "] [" << client_ip << "] [" << client_port << "]" << endl; //构造任务并推送到任务队列中 Task task(server_socket_fd, client_ip, client_port, Service); _thread_pool->PushTask(task); } }
设计任务类
该任务类中需要包含accept客户端对应的套接字、IP地址、端口号,表示该任务是为哪一个客户端提供服务,对应操作的套接字是哪一个
任务类中需包含一个仿函数方法,当线程池中的线程取到任务后就会直接调用仿函数对该任务进行处理,而实际处理这个任务的方法是服务类中的Service函数,服务端就是通过调用Service函数为客户端提供服务的
typedef void(*fun_t)(int, std::string, uint16_t); class Task { public: Task() {} Task(int sock, std::string client_ip, int client_port, fun_t handler) : _server_socket_fd(sock) , _client_ip(client_ip), _client_port(client_port), _handler(handler) {} //任务处理函数 void operator()(const std::string& name) { _handler(_server_socket_fd, _client_ip, _client_port); } private: int _server_socket_fd; std::string _client_ip; uint16_t _client_port; fun_t _handler; };
实际可以让服务器处理不同的任务,当前服务器只是在进行字符串的回显处理,而实际要怎么处理这个任务完全是由任务类中的_handler成员来决定的
若想要让服务器处理其他任务,只需要修改()的重载函数就行了,而服务器的初始化、启动服务器以及线程池的代码都是不需要更改的,这被称为把通信功能和业务逻辑在软件上做解耦
网络测试
while :; do ps -aL|head -1&&ps -aL|grep tcp_server;echo "####################";sleep 1;done
运行服务端后,就算没有客户端发来连接请求,此时在服务端就已经有了11个线程,其中有一个是接收新连接的服务线程,而其余的5个是线程池中为客户端提供服务的线程
当客户端连接服务器后,服务端的主线程就会获取该客户端的连接请求,并将其封装为一个任务对象后放入任务队列,此时线程池中的10个线程就会有一个线程从任务队列当中获取到该任务,并执行该任务的处理函数为客户端提供服务
当第二个客户端发起连接请求时,服务端也会将其封装为一个任务类放入任务队列,然后线程池中的线程再从任务队列中获取到该任务进行处理,此时也是不同的执行流为这两个客户端提供的服务,因此这两个客户端是能够同时享受服务的
无论有多少客户端发来请求,在服务端都只会有线程池中的10个线程为之提供服务,线程池中的线程个数不会随着客户端连接的增多而增多,这些线程也不会因为客户端的退出而退出
五、地址转换函数
5.1 字符串IP转整数IP
inet_aton函数
int inet_aton(const char *cp, struct in_addr *inp);
- cp:待转换的字符串IP。
- inp:转换后的整数IP,输出型参数
返回值:若转换成功则返回一个非零值,若输入的地址不正确则返回零值
inet_addr函数
in_addr_t inet_addr(const char *cp);
参数cp:待转换的字符串IP
返回值:若输入地址有效,则返回转换后的整数IP;若无效,则返回INADDR_NONE(-1)
inet_pton函数
int inet_pton(int af, const char *src, void *dst);
- af参数:协议家族
- src参数:待转换的字符串IP
- dst参数:转换后的整数IP,输出型参数
返回值说明:
- 若转换成功,则返回1
- 若输入的字符串IP无效,则返回0
- 若输入的协议家族af无效,则返回-1,并将errno设置为EAFNOSUPPORT
5.2 整数IP转字符串IP
char *inet_ntoa(struct in_addr in);
参数in:待转换的整数IP
返回值:返回转换后的字符串IP
inet_ntop函数
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
- af参数:协议家族
- src参数:待转换的整数IP
- dst参数:转换后的字符串IP,输出型参数
- size参数:用于指明dst中可用的字节数
返回值:若转换成功,则返回一个指向dst的非空指针;若转换失败,则返回NULL。
注意
- 最常用的两个转换函数是inet_addr和inet_ntoa,因为这两个函数足够简单。这两个函数的参数就是需要转换的字符串IP或整数IP,而这两个函数的返回值就是对应的整数IP和字符串IP
- 其中inet_pton和inet_ntop函数不仅可以转换IPv4的in_addr,还可以转换IPv6的in6_addr,因此这两个函数中对应的参数类型是void*
- 转换函数都是为了满足某些打印场景的或者做某些数据分析,如网络安全方面的数据分析
5.3 inet_ntoa函数问题
inet_ntoa函数可以将4字节的整数IP转换成字符串IP,其中该函数返回的这个转换后的字符串IP是存储在静态存储区的,不需要调用者手动进行释放。若多次调用inet_ntoa函数,此时就会出现数据覆盖的问题
inet_ntoa函数内部只在静态存储区申请了一块区域,导致inet_ntoa函数第二次转换的结果就会覆盖第一次转换的结果
若要多次调用inet_ntoa函数,那么就要及时保存inet_ntoa的转换结果
并发场景下的inet_ntoa函数
inet_ntoa函数内部只在静态存储区申请了一块区域,用于存储转换后的字符串IP,那么在线程场景下这块区域就叫做临界区,多线程在不加锁的情况下同时访问临界区必然会出现异常情况。并且在APUE中,也明确提出inet_ntoa不是线程安全的函数
#include <iostream> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include <unistd.h> void* Func1(void* arg) { struct sockaddr_in* p = (struct sockaddr_in*)arg; while (1){ char* ptr1 = inet_ntoa(p->sin_addr); std::cout << "ptr1: " << ptr1 << std::endl; sleep(1); } } void* Func2(void* arg) { struct sockaddr_in* p = (struct sockaddr_in*)arg; while (1){ char* ptr2 = inet_ntoa(p->sin_addr); std::cout << "ptr2: " << ptr2 << std::endl; sleep(1); } } int main() { struct sockaddr_in addr1; struct sockaddr_in addr2; addr1.sin_addr.s_addr = 0; addr2.sin_addr.s_addr = 0xffffffff; pthread_t tid1 = 0; pthread_create(&tid1, nullptr, Func1, &addr1); sleep(1); pthread_t tid2 = 0; pthread_create(&tid2, nullptr, Func2, &addr2); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); return 0; }
但是实际在centos7上测试时,在多线程场景下调用inet_ntoa函数并没有出现问题,可能是该函数内部的实现加了互斥锁,这就跟接口本身的设计也是有关系的
在多线程环境下更加推荐使用inet_ntop函数进行转换,因为该函数是由调用者自己提供缓冲区保存转换结果的,可以规避线程安全的问题