利用线程池多线程并发实现TCP两端通信交互,并将服务端设为守护进程(一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 利用线程池多线程并发实现TCP两端通信交互,并将服务端设为守护进程(一)

实现目标

利用线程池多线程并发实现基于TCP通信的多个客户端与服务端之间的交互,客户端发送数据,服务端接收后处理数据并返回。服务端为守护进程

实现步骤

  1. 封装一个记录日志的类,将程序运行的信息保存到文件
  2. 封装线程类、服务端处理任务类以及将锁进行封装,为方便实现线程池
  3. 实现服务端,使服务端能接收客户端所发来的数据,处理数据后返回。服务端采用多线程并发处理
  4. 封装守护进程方法,使服务端为守护进程
  5. 实现客户端,可以向服务端发送数据,并接收到服务端发送回来的数据

封装日志类

将程序运行的信息保存到指定文件,例如创建套接字成功或者失败等信息。以【状态】【时间】【信息】的格式保存。

状态可分为五种:“DEBUG”,“NORMAL”,“WARNING”,“ERROR”,“FATAL”

日志类保存的信息需带有可变参数

#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *to_levelstr(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}
void LogMessage(int level, const char *format, ...)
{
#define NUM 1024
    char logpre[NUM];
    snprintf(logpre, sizeof(logpre), "[%s][%ld][%d]", to_levelstr(level), (long int)time(nullptr), getpid());
    char line[NUM];
    // 可变参数
    va_list arg;
    va_start(arg, format);
    vsnprintf(line, sizeof(line), format, arg);
    // 保存至文件
    FILE* log = fopen("log.txt", "a");
    FILE* err = fopen("log.error", "a");
    if(log && err)
    {
        FILE *curr = nullptr;
        if(level == DEBUG || level == NORMAL || level == WARNING) 
            curr = log;
        if(level == ERROR || level == FATAL) 
            curr = err;
        if(curr) fprintf(curr, "%s%s\n", logpre, line);
        fclose(log);
        fclose(err);
    }
}

封装线程池

封装线程

将线程的创建,等待封装成类的成员函数。不再需要单个的条用线程库接口,以对象的方式创建。

需要注意:在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
typedef std::function<void *(void *)> func_t;
class Thread
{
private:
    // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
    static void *start_routine(void *args) // 类内成员,有缺省参数!
    {
        Thread *_this = static_cast<Thread *>(args);
        return _this->callback();
    }
public:
    // 构造函数里直接生成线程名,利用静态变量从1开始
    Thread()
    {
        char namebuffer[1024];
        snprintf(namebuffer, sizeof namebuffer, "thread-NO.%d", threadnum++);
        _name = namebuffer;
    }
    // 线程启动
    void start(func_t func, void *args = nullptr)
    {
        _func = func;
        _args = args;
        // 由于静态的方法是不能访问类内成员的,
        // 因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员
        // 也就是this指针
        int n = pthread_create(&_tid, nullptr, start_routine, this);
        assert(n == 0);
        (void)n;
    }
    // 线程等待
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        assert(n == 0);
        (void)n;
    }
    ~Thread()
    {
    }
    void *callback()
    {
        return _func(_args);
    }
private:
    std::string _name; // 类名
    func_t _func;      // 线程回调函数
    void *_args;       // 线程回调函数的参数
    pthread_t _tid;    // 线程id
    static int threadnum; // 线程的编号,为生成线程名
};
// static的成员需在类外初始化
int Thread::threadnum = 1;

封装锁

同样的为了不再需要一直调用系统接口,可以将整个方法封装成类,通过类的对象实现加锁过程

#pragma once
#include <iostream>
#include <pthread.h>
// 加锁 解锁
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p)
    {
    }
    // 加锁
    void lock()
    {
        if (_lock_p)
            pthread_mutex_lock(_lock_p);
    }
    // 解锁
    void unlock()
    {
        if (_lock_p)
            pthread_mutex_unlock(_lock_p);
    }
    ~Mutex()
    {
    }
private:
    pthread_mutex_t *_lock_p;
};
// 锁的类
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)
    {
        _mutex.lock(); // 在构造函数中进行加锁
    }
    ~LockGuard()
    {
        _mutex.unlock(); // 在析构函数中进行解锁
    }
private:
    Mutex _mutex;
};

封装线程池

在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员。

线程池需要实现为单例模式:

  1. 第一步就是把构造函数私有,再把拷贝构造和赋值运算符重载delete
  2. 在设置获取单例对象的函数的时候,注意要设置成静态成员函数,因为在获取对象前根本没有对象,无法调用非静态成员函数
  3. 可能会出现多个线程同时申请资源的场景,所以还需要一把锁来保护这块资源,而这把锁也得设置成静态,因为单例模式的函数是静态的
#pragma once
#include "Thread.hpp"
#include "log.hpp"
#include "Lock.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 线程池类定义位于下面,因此属性类想要获取到
// 就必须在前面声明
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
    ThreadPool<T> *threadpool; // 线程所在的线程池,获取到线程的this指针
    std::string _name;         // 线程的名字
public:
    ThreadData(ThreadPool<T> *tp, const std::string &name) : threadpool(tp), _name(name)
    {
    }
};
template <class T>
class ThreadPool
{
private:
    // 线程最终实现的方法
    static void *handlerTask(void *args)
    {
        ThreadData<T> *td = (ThreadData<T> *)args;
        while (true)
        {
            T t;
            {
                LockGuard lockguard(td->threadpool->mutex());
                while (td->threadpool->isQueueEmpty())
                {
                    td->threadpool->threadWait();
                }
                t = td->threadpool->pop();
            }
            t();
        }
        delete td;
        return nullptr;
    }
    ThreadPool(const int &num = 10) : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(new Thread());
        }
    }
    void operator=(const ThreadPool &) = delete;
    ThreadPool(const ThreadPool &) = delete;
public:
    // 将加锁 解锁 判断任务队列是否为空 和条件变量等待全部封装成类内方法
    // 方便在线程的回调方法中通过对象直接调用
    void lockQueue() { pthread_mutex_lock(&_mutex); }
    void unlockQueue() { pthread_mutex_unlock(&_mutex); }
    bool isQueueEmpty() { return _task_queue.empty(); }
    void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
    // 任务队列删除队头,并返回队头的任务
    T pop()
    {
        T t = _task_queue.front();
        _task_queue.pop();
        return t;
    }
    pthread_mutex_t *mutex()
    {
        return &_mutex;
    }
public:
    // 让每个线程对象调用其启动函数,并将线程辅助类和最终执行的任务方法传入函数中
    // 线程的辅助类对象里包含了线程当前线程池对象,也就是可以
    // 通过辅助类对象可以调用到线程池对象里的成员
    void run()
    {
        for (const auto &t : _threads)
        {
            ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);
            // 创建成功后打印日志
            LogMessage(DEBUG, "%s start ...", t->threadname().c_str());
        }
    }
    // 往任务队列里插入一个任务
    void push(const T &in)
    {
        LockGuard lockguard(&_mutex);
        _task_queue.push(in);
        pthread_cond_signal(&_cond);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (const auto &t : _threads)
            delete t;
    }
    // 实现单例模式
    static ThreadPool<T> *getInstance()
    {
        if (nullptr == tp)
        {
            _singlock.lock();
            if (nullptr == tp)
            {
                tp = new ThreadPool<T>();
            }
            _singlock.unlock();
        }
        return tp;
    }
private:
    int _num;//线程的数量
    std::vector<Thread *> _threads;//线程组
    std::queue<T> _task_queue;//任务队列
    pthread_mutex_t _mutex;//锁
    pthread_cond_t _cond;//条件变量
    static ThreadPool<T> *tp;
    static std::mutex _singlock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template <class T>
std::mutex ThreadPool<T>::_singlock;

TCP通信的接口和注意事项

为了实现TCP版的通信,首先来了解一下相关接口和注意事项

  1. TCP需要在通信前先创建链接,因此在TCP没有链接之前其创建的套接字并不是用来通信的,而是用来监听的。一旦创建链接成功后,才会返回一个用来通信的套接字
  2. TCP时面向字节流的,因此其通信就是往文件上IO,因此不用指定的调用某接口去完成,直接用文件接口读写就可以完成

accept

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

这就是用来创建链接的接口

参数一为负责监听的套接字

参数二就是socket的结构体

参数三为结构体的大小

返回值,成功创建链接之后会返回一个值,这个值就是负责通信的套接字,也就是后面利用文件通信的文件描述符

TCP

封装任务

因为上述说到TCP是可以直接使用文件操作来完成通信的,那么也就是说其通信根本就用不到其他的成员了,只需要知道一个套接字即可。那么这个方法就可以不放在类内,因为这就是线程最后的执行目的,因此可以将这个任务单独放到一个头文件中。因为线程池是一个模板类,则可以封装一个任务类。

#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
#include "log.hpp"
// TCP的通信
// 线程的最终执行方法
void ServerIO(int sock)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            // read
            buffer[n] = 0;
            std::cout << "recv message: " << buffer << std::endl;
            // write
            std::string outbuffer = buffer;
            outbuffer += " server[echo]";
            write(sock, outbuffer.c_str(), outbuffer.size());
        }
        else if (n == 0)
        {
            // 代表client退出
            LogMessage(NORMAL, "client quit, me too!");
            break;
        }
    }
    close(sock);
}
// 任务类
// 为了最终执行的方法而服务
class Task
{
    using func_t = std::function<void(int)>;
public:
    Task()
    {
    }
    Task(int sock, func_t func)
        : _sock(sock), _callback(func)
    {
    }
    void operator()()
    {
        _callback(_sock);
    }
private:
    int _sock; // 通信套接字
    func_t _callback;
};


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
24天前
|
调度 开发者 Python
深入浅出操作系统:进程与线程的奥秘
在数字世界的底层,操作系统扮演着不可或缺的角色。它如同一位高效的管家,协调和控制着计算机硬件与软件资源。本文将拨开迷雾,深入探索操作系统中两个核心概念——进程与线程。我们将从它们的诞生谈起,逐步剖析它们的本质、区别以及如何影响我们日常使用的应用程序性能。通过简单的比喻,我们将理解这些看似抽象的概念,并学会如何在编程实践中高效利用进程与线程。准备好跟随我一起,揭开操作系统的神秘面纱,让我们的代码运行得更加流畅吧!
|
4天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
15 1
|
24天前
|
消息中间件 Unix Linux
【C语言】进程和线程详解
在现代操作系统中,进程和线程是实现并发执行的两种主要方式。理解它们的区别和各自的应用场景对于编写高效的并发程序至关重要。
49 6
|
25天前
|
调度 开发者
深入理解:进程与线程的本质差异
在操作系统和计算机编程领域,进程和线程是两个核心概念。它们在程序执行和资源管理中扮演着至关重要的角色。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
52 5
|
22天前
|
算法 调度 开发者
深入理解操作系统:进程与线程的管理
在数字世界的复杂编织中,操作系统如同一位精明的指挥家,协调着每一个音符的奏响。本篇文章将带领读者穿越操作系统的幕后,探索进程与线程管理的奥秘。从进程的诞生到线程的舞蹈,我们将一起见证这场微观世界的华丽变奏。通过深入浅出的解释和生动的比喻,本文旨在揭示操作系统如何高效地处理多任务,确保系统的稳定性和效率。让我们一起跟随代码的步伐,走进操作系统的内心世界。
|
25天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
45 4
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
58 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####