利用线程池多线程并发实现TCP两端通信交互,并将服务端设为守护进程(一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 利用线程池多线程并发实现TCP两端通信交互,并将服务端设为守护进程(一)

实现目标

利用线程池多线程并发实现基于TCP通信的多个客户端与服务端之间的交互,客户端发送数据,服务端接收后处理数据并返回。服务端为守护进程

实现步骤

  1. 封装一个记录日志的类,将程序运行的信息保存到文件
  2. 封装线程类、服务端处理任务类以及将锁进行封装,为方便实现线程池
  3. 实现服务端,使服务端能接收客户端所发来的数据,处理数据后返回。服务端采用多线程并发处理
  4. 封装守护进程方法,使服务端为守护进程
  5. 实现客户端,可以向服务端发送数据,并接收到服务端发送回来的数据

封装日志类

将程序运行的信息保存到指定文件,例如创建套接字成功或者失败等信息。以【状态】【时间】【信息】的格式保存。

状态可分为五种:“DEBUG”,“NORMAL”,“WARNING”,“ERROR”,“FATAL”

日志类保存的信息需带有可变参数

#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *to_levelstr(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}
void LogMessage(int level, const char *format, ...)
{
#define NUM 1024
    char logpre[NUM];
    snprintf(logpre, sizeof(logpre), "[%s][%ld][%d]", to_levelstr(level), (long int)time(nullptr), getpid());
    char line[NUM];
    // 可变参数
    va_list arg;
    va_start(arg, format);
    vsnprintf(line, sizeof(line), format, arg);
    // 保存至文件
    FILE* log = fopen("log.txt", "a");
    FILE* err = fopen("log.error", "a");
    if(log && err)
    {
        FILE *curr = nullptr;
        if(level == DEBUG || level == NORMAL || level == WARNING) 
            curr = log;
        if(level == ERROR || level == FATAL) 
            curr = err;
        if(curr) fprintf(curr, "%s%s\n", logpre, line);
        fclose(log);
        fclose(err);
    }
}

封装线程池

封装线程

将线程的创建,等待封装成类的成员函数。不再需要单个的条用线程库接口,以对象的方式创建。

需要注意:在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
typedef std::function<void *(void *)> func_t;
class Thread
{
private:
    // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
    static void *start_routine(void *args) // 类内成员,有缺省参数!
    {
        Thread *_this = static_cast<Thread *>(args);
        return _this->callback();
    }
public:
    // 构造函数里直接生成线程名,利用静态变量从1开始
    Thread()
    {
        char namebuffer[1024];
        snprintf(namebuffer, sizeof namebuffer, "thread-NO.%d", threadnum++);
        _name = namebuffer;
    }
    // 线程启动
    void start(func_t func, void *args = nullptr)
    {
        _func = func;
        _args = args;
        // 由于静态的方法是不能访问类内成员的,
        // 因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员
        // 也就是this指针
        int n = pthread_create(&_tid, nullptr, start_routine, this);
        assert(n == 0);
        (void)n;
    }
    // 线程等待
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        assert(n == 0);
        (void)n;
    }
    ~Thread()
    {
    }
    void *callback()
    {
        return _func(_args);
    }
private:
    std::string _name; // 类名
    func_t _func;      // 线程回调函数
    void *_args;       // 线程回调函数的参数
    pthread_t _tid;    // 线程id
    static int threadnum; // 线程的编号,为生成线程名
};
// static的成员需在类外初始化
int Thread::threadnum = 1;

封装锁

同样的为了不再需要一直调用系统接口,可以将整个方法封装成类,通过类的对象实现加锁过程

#pragma once
#include <iostream>
#include <pthread.h>
// 加锁 解锁
class Mutex
{
public:
    Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p)
    {
    }
    // 加锁
    void lock()
    {
        if (_lock_p)
            pthread_mutex_lock(_lock_p);
    }
    // 解锁
    void unlock()
    {
        if (_lock_p)
            pthread_mutex_unlock(_lock_p);
    }
    ~Mutex()
    {
    }
private:
    pthread_mutex_t *_lock_p;
};
// 锁的类
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)
    {
        _mutex.lock(); // 在构造函数中进行加锁
    }
    ~LockGuard()
    {
        _mutex.unlock(); // 在析构函数中进行解锁
    }
private:
    Mutex _mutex;
};

封装线程池

在类里面的线程回调方法必须设为static类型,而静态的方法是不能访问类内成员的,因此传给回调函数的参数需要将整个对象传过去,通过对象来获取类内成员。

线程池需要实现为单例模式:

  1. 第一步就是把构造函数私有,再把拷贝构造和赋值运算符重载delete
  2. 在设置获取单例对象的函数的时候,注意要设置成静态成员函数,因为在获取对象前根本没有对象,无法调用非静态成员函数
  3. 可能会出现多个线程同时申请资源的场景,所以还需要一把锁来保护这块资源,而这把锁也得设置成静态,因为单例模式的函数是静态的
#pragma once
#include "Thread.hpp"
#include "log.hpp"
#include "Lock.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 线程池类定义位于下面,因此属性类想要获取到
// 就必须在前面声明
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
    ThreadPool<T> *threadpool; // 线程所在的线程池,获取到线程的this指针
    std::string _name;         // 线程的名字
public:
    ThreadData(ThreadPool<T> *tp, const std::string &name) : threadpool(tp), _name(name)
    {
    }
};
template <class T>
class ThreadPool
{
private:
    // 线程最终实现的方法
    static void *handlerTask(void *args)
    {
        ThreadData<T> *td = (ThreadData<T> *)args;
        while (true)
        {
            T t;
            {
                LockGuard lockguard(td->threadpool->mutex());
                while (td->threadpool->isQueueEmpty())
                {
                    td->threadpool->threadWait();
                }
                t = td->threadpool->pop();
            }
            t();
        }
        delete td;
        return nullptr;
    }
    ThreadPool(const int &num = 10) : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(new Thread());
        }
    }
    void operator=(const ThreadPool &) = delete;
    ThreadPool(const ThreadPool &) = delete;
public:
    // 将加锁 解锁 判断任务队列是否为空 和条件变量等待全部封装成类内方法
    // 方便在线程的回调方法中通过对象直接调用
    void lockQueue() { pthread_mutex_lock(&_mutex); }
    void unlockQueue() { pthread_mutex_unlock(&_mutex); }
    bool isQueueEmpty() { return _task_queue.empty(); }
    void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
    // 任务队列删除队头,并返回队头的任务
    T pop()
    {
        T t = _task_queue.front();
        _task_queue.pop();
        return t;
    }
    pthread_mutex_t *mutex()
    {
        return &_mutex;
    }
public:
    // 让每个线程对象调用其启动函数,并将线程辅助类和最终执行的任务方法传入函数中
    // 线程的辅助类对象里包含了线程当前线程池对象,也就是可以
    // 通过辅助类对象可以调用到线程池对象里的成员
    void run()
    {
        for (const auto &t : _threads)
        {
            ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);
            // 创建成功后打印日志
            LogMessage(DEBUG, "%s start ...", t->threadname().c_str());
        }
    }
    // 往任务队列里插入一个任务
    void push(const T &in)
    {
        LockGuard lockguard(&_mutex);
        _task_queue.push(in);
        pthread_cond_signal(&_cond);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (const auto &t : _threads)
            delete t;
    }
    // 实现单例模式
    static ThreadPool<T> *getInstance()
    {
        if (nullptr == tp)
        {
            _singlock.lock();
            if (nullptr == tp)
            {
                tp = new ThreadPool<T>();
            }
            _singlock.unlock();
        }
        return tp;
    }
private:
    int _num;//线程的数量
    std::vector<Thread *> _threads;//线程组
    std::queue<T> _task_queue;//任务队列
    pthread_mutex_t _mutex;//锁
    pthread_cond_t _cond;//条件变量
    static ThreadPool<T> *tp;
    static std::mutex _singlock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template <class T>
std::mutex ThreadPool<T>::_singlock;

TCP通信的接口和注意事项

为了实现TCP版的通信,首先来了解一下相关接口和注意事项

  1. TCP需要在通信前先创建链接,因此在TCP没有链接之前其创建的套接字并不是用来通信的,而是用来监听的。一旦创建链接成功后,才会返回一个用来通信的套接字
  2. TCP时面向字节流的,因此其通信就是往文件上IO,因此不用指定的调用某接口去完成,直接用文件接口读写就可以完成

accept

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

这就是用来创建链接的接口

参数一为负责监听的套接字

参数二就是socket的结构体

参数三为结构体的大小

返回值,成功创建链接之后会返回一个值,这个值就是负责通信的套接字,也就是后面利用文件通信的文件描述符

TCP

封装任务

因为上述说到TCP是可以直接使用文件操作来完成通信的,那么也就是说其通信根本就用不到其他的成员了,只需要知道一个套接字即可。那么这个方法就可以不放在类内,因为这就是线程最后的执行目的,因此可以将这个任务单独放到一个头文件中。因为线程池是一个模板类,则可以封装一个任务类。

#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
#include "log.hpp"
// TCP的通信
// 线程的最终执行方法
void ServerIO(int sock)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            // read
            buffer[n] = 0;
            std::cout << "recv message: " << buffer << std::endl;
            // write
            std::string outbuffer = buffer;
            outbuffer += " server[echo]";
            write(sock, outbuffer.c_str(), outbuffer.size());
        }
        else if (n == 0)
        {
            // 代表client退出
            LogMessage(NORMAL, "client quit, me too!");
            break;
        }
    }
    close(sock);
}
// 任务类
// 为了最终执行的方法而服务
class Task
{
    using func_t = std::function<void(int)>;
public:
    Task()
    {
    }
    Task(int sock, func_t func)
        : _sock(sock), _callback(func)
    {
    }
    void operator()()
    {
        _callback(_sock);
    }
private:
    int _sock; // 通信套接字
    func_t _callback;
};


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
21天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
127 59
|
6天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
13天前
|
存储 Unix Linux
进程间通信方式-----管道通信
【10月更文挑战第29天】管道通信是一种重要的进程间通信机制,它为进程间的数据传输和同步提供了一种简单有效的方法。通过合理地使用管道通信,可以实现不同进程之间的协作,提高系统的整体性能和效率。
|
12天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
13天前
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
23天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
16 3
|
23天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2
|
23天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2
|
4月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
4月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
162 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)

热门文章

最新文章

相关实验场景

更多