实现目标
利用线程池多线程并发实现基于TCP通信的多个客户端与服务端之间的交互,客户端发送数据,服务端接收后处理数据并返回。服务端为守护进程
实现步骤
- 封装一个记录日志的类,将程序运行的信息保存到文件
- 封装线程类、服务端处理任务类以及将锁进行封装,为方便实现线程池
- 实现服务端,使服务端能接收客户端所发来的数据,处理数据后返回。服务端采用多线程并发处理
- 封装守护进程方法,使服务端为守护进程
- 实现客户端,可以向服务端发送数据,并接收到服务端发送回来的数据
封装日志类
将程序运行的信息保存到指定文件,例如创建套接字成功或者失败等信息。以【状态】【时间】【信息】的格式保存。
状态可分为五种:“DEBUG”,“NORMAL”,“WARNING”,“ERROR”,“FATAL”
日志类保存的信息需带有可变参数
#pragma once #include <iostream> #include <string> #include <cstdarg> #include <ctime> #include <unistd.h> using namespace std; #define DEBUG 0 #define NORMAL 1 #define WARNING 2 #define ERROR 3 #define FATAL 4 const char *to_levelstr(int level) { switch (level) { case DEBUG: return "DEBUG"; case NORMAL: return "NORMAL"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; case FATAL: return "FATAL"; default: return nullptr; } } void LogMessage(int level, const char *format, ...) { #define NUM 1024 char logpre[NUM]; snprintf(logpre, sizeof(logpre), "[%s][%ld][%d]", to_levelstr(level), (long int)time(nullptr), getpid()); char line[NUM]; // 可变参数 va_list arg; va_start(arg, format); vsnprintf(line, sizeof(line), format, arg); // 保存至文件 FILE* log = fopen("log.txt", "a"); FILE* err = fopen("log.error", "a"); if(log && err) { FILE *curr = nullptr; if(level == DEBUG || level == NORMAL || level == WARNING) curr = log; if(level == ERROR || level == FATAL) curr = err; if(curr) fprintf(curr, "%s%s\n", logpre, line); fclose(log); fclose(err); } }
封装线程池
封装线程
将线程的创建,等待封装成类的成员函数。不再需要单个的条用线程库接口,以对象的方式创建。
需要注意:在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员
#pragma once #include <iostream> #include <string> #include <cstring> #include <cassert> #include <functional> #include <pthread.h> typedef std::function<void *(void *)> func_t; class Thread { private: // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static static void *start_routine(void *args) // 类内成员,有缺省参数! { Thread *_this = static_cast<Thread *>(args); return _this->callback(); } public: // 构造函数里直接生成线程名,利用静态变量从1开始 Thread() { char namebuffer[1024]; snprintf(namebuffer, sizeof namebuffer, "thread-NO.%d", threadnum++); _name = namebuffer; } // 线程启动 void start(func_t func, void *args = nullptr) { _func = func; _args = args; // 由于静态的方法是不能访问类内成员的, // 因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员 // 也就是this指针 int n = pthread_create(&_tid, nullptr, start_routine, this); assert(n == 0); (void)n; } // 线程等待 void join() { int n = pthread_join(_tid, nullptr); assert(n == 0); (void)n; } ~Thread() { } void *callback() { return _func(_args); } private: std::string _name; // 类名 func_t _func; // 线程回调函数 void *_args; // 线程回调函数的参数 pthread_t _tid; // 线程id static int threadnum; // 线程的编号,为生成线程名 }; // static的成员需在类外初始化 int Thread::threadnum = 1;
封装锁
同样的为了不再需要一直调用系统接口,可以将整个方法封装成类,通过类的对象实现加锁过程
#pragma once #include <iostream> #include <pthread.h> // 加锁 解锁 class Mutex { public: Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p) { } // 加锁 void lock() { if (_lock_p) pthread_mutex_lock(_lock_p); } // 解锁 void unlock() { if (_lock_p) pthread_mutex_unlock(_lock_p); } ~Mutex() { } private: pthread_mutex_t *_lock_p; }; // 锁的类 class LockGuard { public: LockGuard(pthread_mutex_t *mutex) : _mutex(mutex) { _mutex.lock(); // 在构造函数中进行加锁 } ~LockGuard() { _mutex.unlock(); // 在析构函数中进行解锁 } private: Mutex _mutex; };
封装线程池
在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员。
线程池需要实现为单例模式:
- 第一步就是把构造函数私有,再把拷贝构造和赋值运算符重载delete
- 在设置获取单例对象的函数的时候,注意要设置成静态成员函数,因为在获取对象前根本没有对象,无法调用非静态成员函数
- 可能会出现多个线程同时申请资源的场景,所以还需要一把锁来保护这块资源,而这把锁也得设置成静态,因为单例模式的函数是静态的
#pragma once #include "Thread.hpp" #include "log.hpp" #include "Lock.hpp" #include <vector> #include <queue> #include <mutex> #include <pthread.h> #include <unistd.h> using namespace std; // 线程池类定义位于下面,因此属性类想要获取到 // 就必须在前面声明 template <class T> class ThreadPool; template <class T> class ThreadData { public: ThreadPool<T> *threadpool; // 线程所在的线程池,获取到线程的this指针 std::string _name; // 线程的名字 public: ThreadData(ThreadPool<T> *tp, const std::string &name) : threadpool(tp), _name(name) { } }; template <class T> class ThreadPool { private: // 线程最终实现的方法 static void *handlerTask(void *args) { ThreadData<T> *td = (ThreadData<T> *)args; while (true) { T t; { LockGuard lockguard(td->threadpool->mutex()); while (td->threadpool->isQueueEmpty()) { td->threadpool->threadWait(); } t = td->threadpool->pop(); } t(); } delete td; return nullptr; } ThreadPool(const int &num = 10) : _num(num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); for (int i = 0; i < _num; i++) { _threads.push_back(new Thread()); } } void operator=(const ThreadPool &) = delete; ThreadPool(const ThreadPool &) = delete; public: // 将加锁 解锁 判断任务队列是否为空 和条件变量等待全部封装成类内方法 // 方便在线程的回调方法中通过对象直接调用 void lockQueue() { pthread_mutex_lock(&_mutex); } void unlockQueue() { pthread_mutex_unlock(&_mutex); } bool isQueueEmpty() { return _task_queue.empty(); } void threadWait() { pthread_cond_wait(&_cond, &_mutex); } // 任务队列删除队头,并返回队头的任务 T pop() { T t = _task_queue.front(); _task_queue.pop(); return t; } pthread_mutex_t *mutex() { return &_mutex; } public: // 让每个线程对象调用其启动函数,并将线程辅助类和最终执行的任务方法传入函数中 // 线程的辅助类对象里包含了线程当前线程池对象,也就是可以 // 通过辅助类对象可以调用到线程池对象里的成员 void run() { for (const auto &t : _threads) { ThreadData<T> *td = new ThreadData<T>(this, t->threadname()); t->start(handlerTask, td); // 创建成功后打印日志 LogMessage(DEBUG, "%s start ...", t->threadname().c_str()); } } // 往任务队列里插入一个任务 void push(const T &in) { LockGuard lockguard(&_mutex); _task_queue.push(in); pthread_cond_signal(&_cond); } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); for (const auto &t : _threads) delete t; } // 实现单例模式 static ThreadPool<T> *getInstance() { if (nullptr == tp) { _singlock.lock(); if (nullptr == tp) { tp = new ThreadPool<T>(); } _singlock.unlock(); } return tp; } private: int _num;//线程的数量 std::vector<Thread *> _threads;//线程组 std::queue<T> _task_queue;//任务队列 pthread_mutex_t _mutex;//锁 pthread_cond_t _cond;//条件变量 static ThreadPool<T> *tp; static std::mutex _singlock; }; template <class T> ThreadPool<T> *ThreadPool<T>::tp = nullptr; template <class T> std::mutex ThreadPool<T>::_singlock;
TCP通信的接口和注意事项
为了实现TCP版的通信,首先来了解一下相关接口和注意事项
- TCP需要在通信前先创建链接,因此在TCP没有链接之前其创建的套接字并不是用来通信的,而是用来监听的。一旦创建链接成功后,才会返回一个用来通信的套接字
- TCP时面向字节流的,因此其通信就是往文件上IO,因此不用指定的调用某接口去完成,直接用文件接口读写就可以完成
accept
#include <sys/types.h> /* See NOTES */ #include <sys/socket.h> int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
这就是用来创建链接的接口
参数一为负责监听的套接字
参数二就是socket的结构体
参数三为结构体的大小
返回值,成功创建链接之后会返回一个值,这个值就是负责通信的套接字,也就是后面利用文件通信的文件描述符
TCP
封装任务
因为上述说到TCP是可以直接使用文件操作来完成通信的,那么也就是说其通信根本就用不到其他的成员了,只需要知道一个套接字即可。那么这个方法就可以不放在类内,因为这就是线程最后的执行目的,因此可以将这个任务单独放到一个头文件中。因为线程池是一个模板类,则可以封装一个任务类。
#pragma once #include <iostream> #include <string> #include <cstdio> #include <functional> #include "log.hpp" // TCP的通信 // 线程的最终执行方法 void ServerIO(int sock) { char buffer[1024]; while (true) { ssize_t n = read(sock, buffer, sizeof(buffer) - 1); if (n > 0) { // read buffer[n] = 0; std::cout << "recv message: " << buffer << std::endl; // write std::string outbuffer = buffer; outbuffer += " server[echo]"; write(sock, outbuffer.c_str(), outbuffer.size()); } else if (n == 0) { // 代表client退出 LogMessage(NORMAL, "client quit, me too!"); break; } } close(sock); } // 任务类 // 为了最终执行的方法而服务 class Task { using func_t = std::function<void(int)>; public: Task() { } Task(int sock, func_t func) : _sock(sock), _callback(func) { } void operator()() { _callback(_sock); } private: int _sock; // 通信套接字 func_t _callback; };