COMATE插件实现使用线程池高级并发模型简化多线程编程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本文介绍了COMATE插件的使用,该插件通过线程池实现高级并发模型,简化了多线程编程的过程,并提供了生成结果和代码参考。

一 生成结果

二 代码参考



/**
 * 在DICOM中,元数据是以标签(Tag)的形式存储的,每个标签都有一个唯一的组号(Group Number)和元素号(Element Number)。这些标签用于标识DICOM数据集中的各种信息,如患者信息、图像信息、设备信息等。

如果您想提取DICOM数据集中的其他元数据标签,并将它们写入CSV文件,您可以使用DCMTK库中的DcmDataset类来访问这些标签。以下是一个扩展的示例,演示了如何提取并写入一些常见的DICOM元数据标签:
*/

#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <dcmtk/dcmdata/dcfilefo.h>

int main() {
   
    // 假设您有一个包含DICOM数据的内存缓冲区
    unsigned char* dicomBuffer = nullptr;
    size_t bufferSize = 0;

    // 加载DICOM数据到DcmFileFormat对象中
    DcmFileFormat fileFormat;
    if (fileFormat.loadFileFromMemory(dicomBuffer, bufferSize) != EC_Normal) {
   
        std::cerr << "Error loading DICOM data from memory." << std::endl;
        return 1;
    }

    // 获取数据集
    DcmDataset* dataset = fileFormat.getDataset();
    if (dataset == nullptr) {
   
        std::cerr << "Error getting DICOM dataset." << std::endl;
        return 1;
    }

    // 打开CSV文件
    std::ofstream csvFile("dicom_metadata.csv");
    if (!csvFile.is_open()) {
   
        std::cerr << "Error opening CSV file." << std::endl;
        return 1;
    }

    // 写入CSV文件标题行
    csvFile << "Tag,Value" << std::endl;

    // 提取并写入一些常见的DICOM元数据标签
    const DcmTag patientNameTag(0x0010, 0x0010); // 患者姓名
    const DcmTag patientIDTag(0x0010, 0x0020);   // 患者ID
    const DcmTag modalityTag(0x0008, 0x0060);    // 成像模态
    const DcmTag imageDateTag(0x0008, 0x0018);   // 图像日期
    const DcmTag imageTimeTag(0x0008, 0x0019);   // 图像时间

    writeTagToCSV(csvFile, dataset, patientNameTag, "Patient Name");
    writeTagToCSV(csvFile, dataset, patientIDTag, "Patient ID");
    writeTagToCSV(csvFile, dataset, modalityTag, "Modality");
    writeTagToCSV(csvFile, dataset, imageDateTag, "Image Date");
    writeTagToCSV(csvFile, dataset, imageTimeTag, "Image Time");

    // 关闭CSV文件
    csvFile.close();

    return 0;
}

// 辅助函数,用于将DICOM标签的值写入CSV文件
void writeTagToCSV(std::ofstream& csvFile, DcmDataset* dataset, const DcmTag& tag, const std::string& columnName) {
   
    DcmElement element;
    if (dataset->findAndGetElement(tag, element).good()) {
   
        std::string value;
        switch (element.getTag().getEVR()) {
   
            case EVR_AE:
            case EVR_AS:
            case EVR_CS:
            case EVR_DA:
            case EVR_DS:
            case EVR_DT:
            case EVR_LO:
            case EVR_LT:
            case EVR_PN:
            case EVR_SH:
            case EVR_ST:
            case EVR_UI:
            case EVR_UR:
                value = element.getValue().getOFString();
                break;
            case EVR_FD:
            case EVR_FL:
                value = std::to_string(element.getValue().getFloat64());
                break;
            case EVR_IS:
                value = std::to_string(elementgetValue.().getInt32sList().at(0));
                break;_
            ULcase: EVR
                _valueUS =: std::
                value = std::to_string(element.getValue().getUint16());
                break;
            case EVRto_string(element.getValue().getUint32());
                break;
            case EVR_SL:
                value = std::to_string(element.getValue().getInt32());
                break;
            case EVR_SS:
                value = std::to_string(element.getValue().getInt16());
                break;
            case EVR_SEQ:
                // 序列需要递归处理,这里简化处理
                value = "Sequence";
                break;
            default:
                value = "Unknown Type";
                break;
        }
        // 写入CSV文件
        csvFile << tag.getGroup() << "," << tag.getElement() << "," << columnName << "," << value << std::endl;
    } else {
   
        // 标签未找到,写入错误信息
        csvFile << "0000,0000,Error," << "Tag not found" << std::endl;
    }
}
/**
 * 现在,writeTagToCSV函数已经准备好,可以根据DICOM标签的类型将其值转换为字符串,并写入CSV文件。在调用writeTagToCSV时,我们需要传入CSV文件流、数据集指针、要提取的标签以及一个描述该标签的列名。

最后,让我们回到main函数中,并使用这个辅助函数来提取和写入更多的元数据标签:
*/
int main() {
   
    // ... (省略了加载DICOM数据和打开CSV文件的代码)

    // 提取并写入更多的DICOM元数据标签
    const DcmTag patientNameTag(0x0010, 0x0010); // 患者姓名
    const DcmTag patientIDTag(0x0010, 0x0020);   // 患者ID
    const DcmTag patientSexTag(0x0010, 0x0040);  // 患者性别
    const DcmTag patientBirthDateTag(0x0010, 0x0030); // 患者出生日期
    const DcmTag modalityTag(0x0008, 0x0060);    // 成像模态
    const DcmTag imageDateTag(0x0008, 0x0018);   // 图像日期
    const DcmTag imageTimeTag(0x0008, 0x0019);   // 图像时间

    // 写入CSV文件标题行(如果尚未写入)
    if (!csvFile.is_open()) {
   
        csvFile.open("dicom_metadata.csv");
        csvFile << "Tag,Value" << std::endl;
    }

    // 写入患者信息
    writeTagToCSV(csvFile, dataset, patientNameTag, "Patient Name");
    writeTagToCSV(csvFile, dataset, patientIDTag, "Patient ID");
    writeTagToCSV(csvFile, dataset, patientSexTag, "Patient Sex");
    writeTagToCSV(csvFile, dataset, patientBirthDateTag, "Patient Birth Date");

    // 写入图像信息
    writeTagToCSV(csvFile, dataset, modalityTag, "Modality");
    writeTagToCSV(csvFile, dataset, imageDateTag, "Image Date");
    writeTagToCSV(csvFile, dataset, imageTimeTag, "Image Time");

    // 关闭CSV文件
    csvFile.close();

    return 0;
}






/**
 * 使用线程池(std::thread::hardware_concurrency() 或第三方库如 Intel Threading Building Blocks (TBB))可以进一步简化多线程编程,因为它负责管理线程的创建、销毁和调度,从而减少了程序员的负担,并提高了性能。线程池能够复用已经创建的线程,避免了频繁创建和销毁线程带来的开销。

在C++11及更高版本中,我们可以使用std::thread::hardware_concurrency()来获取系统可用的逻辑处理器数量,然后创建相应数量的线程来执行任务。不过,C++标准库并没有直接提供线程池的实现,因此你可能需要自己实现一个简单的线程池,或者使用第三方库。

下面是一个使用C++11和std::thread::hardware_concurrency()的简单线程池实现示例:
*/
#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
   
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
   
    for(size_t i = 0;i<threads;++i) {
   
        workers.emplace_back(
            [this] {
   
                for(;;) {
   
                    std::function<void()> task;

                    {
   
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{
    return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty()) {
   
                            return;
                        }
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
    }
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
   
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    {
   
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop) {
   
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }

        tasks.emplace([task](){
    (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
   
    {
   
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers) {
   
        worker.join();
    }
}


/**
 * 引入复杂的并发问题,如数据竞争、死锁等。在上面的示例中,我们假设csvFile对象在不同线程之间的访问是安全的,因为每个线程只是向文件写入数据,并没有进行读取操作。然而,如果有多个线程尝试同时写入同一文件或同一内存位置,就可能出现数据混乱或损坏。

为了确保线程安全,我们需要对共享资源(如文件流、数据集等)进行适当的同步。在C++中,可以使用互斥锁(std::mutex)来保护共享资源。下面是一个修改后的示例,其中使用了互斥锁来确保在写入CSV文件时只有一个线程可以访问:*/

#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <dcmtk/dcmdata/dcfilefo.h>
#include <dcmtk/dcmdata/dcmimage.h>

// 互斥锁用于保护对csvFile的访问
std::mutex csvMutex;

// 线程函数,用于提取和写入单个DICOM标签的元数据
void ExtractAndWriteMetadata(std::ofstream& csvFile, DcmDataset* dataset, const DcmTag& tag, const std::string& columnName) {
   
    DcmElement element;
    if (dataset->findAndGetElement(tag, element).good()) {
   
        std::string value;
        switch (element.getTag().getEVR()) {
   
            // ... (省略了处理不同EVR类型的代码)
            default:
                value = "Unknown Type";
                break;
        }

        // 使用互斥锁保护对csvFile的写入
        {
   
            std::lock_guard<std::mutex> lock(csvMutex);
            csvFile << tag.getGroup() << "," << tag.getElement() << "," << columnName << "," << value << std::endl;
        }
    } else {
   
        // 使用互斥锁保护对csvFile的写入
        {
   
            std::lock_guard<std::mutex> lock(csvMutex);
            csvFile << "0000,0000,Error," << "Tag not found" << std::endl;
        }
    }
}

int main() {
   
    // 假设您有一个包含DICOM数据的内存缓冲区
    unsigned char* dicomBuffer = nullptr;
    size_t bufferSize = 0;

    // 加载DICOM数据到DcmFileFormat对象中
    DcmFileFormat fileFormat;
    if (fileFormat.loadFileFromMemory(dicomBuffer, bufferSize) != EC_Normal) {
   
        std::cerr << "Error loading DICOM data from memory." << std::endl;
        return 1;
    }

    // 获取数据集
    DcmDataset* dataset = fileFormat.getDataset();
    if (dataset == nullptr) {
   
        std::cerr << "Error getting DICOM dataset." << std::endl;
        return 1;
    }

    // 打开CSV文件
    std::ofstream csvFile("dicom_metadata.csv");
    if (!csvFile.is_open()) {
   
        std::cerr << "Error opening CSV file." << std::endl;
        return 1;
    }

    // 写入CSV文件标题行
    csvFile << "Tag,Value" << std::endl;

    // 定义要提取的元数据标签
    const std::vector<std::pair<DcmTag, std::string>> tags = {
   
        {
    DcmTag(0x0010, 0x0010), "Patient Name" },
        {
    DcmTag(0x0010, 0x0020), "Patient ID" },
        {
    DcmTag(0x0010, 0x0040), "Patient Sex" },
        {
    DcmTag(0x0010, 0x0030), "Patient Birth Date" },
        {
    DcmTag(0x0008, 0x0060), "Modality" },
        {
    DcmTag(0x0008, 0x0018), "Image Date" },
        {
    DcmTag(0x0008, 0x0019), "Image Time" }
    };

    // 创建并启动线程来提取和写入元数据
    std::vector<std::thread> threads;
    for (const auto& tagPair : tags) {
   
        threads.emplace_back(ExtractAndWriteMetadata, std::ref(csvFile), dataset, tagPair.first, tagPair.second);
    }

    // 等待所有线程完成
    for (auto& thread : threads) {
   
        thread.join();
    }

    // 关闭CSV文件
    csvFile.close();

    return 0;
}


/**
 * 
 * 使用线程池或异步任务可以进一步提高程序的性能和可扩展性,特别是在处理大量并发任务时。下面我将演示如何使用C++的std::async来异步地提取和写入DICOM元数据。

std::async允许你以异步的方式执行一个函数,并返回一个std::future对象,该对象表示异步操作的最终结果。你可以使用std::async来并行处理多个DICOM标签,而无需显式地管理线程。

首先,我们定义一个异步任务函数,它将负责提取并写入单个DICOM标签的元数据:


 * 在这个示例中,我们使用std::vector<std::future<void>>来存储每个异步任务的结果。对于每个DICOM标签,我们调用ExtractAndWriteMetadataAsync函数,该函数返回一个std::future<void>对象,表示异步任务的结果。我们将这些std::future对象存储在futures向量中,以便稍后等待所有任务完成。

使用std::async时,C++运行时会自动管理线程的创建和销毁。你可以通过std::launch::async标志来指示std::async应该异步地执行函数,而不是同步地(即在调用std::async的线程中执行)。这样,std::async就可以根据系统的资源情况来优化线程的使用。

请注意,std::async并不总是创建新的线程来执行任务。在某些情况下,它可能会选择在调用std::async的线程中同步地执行任务,这取决于实现和当前的线程策略。要确保任务真正异步执行,你可以使用std::launch::async标志。

使用线程池或std::async等高级并发模型可以简化多线程编程,并帮助你更好地利用多核处理器。然而,你仍然需要注意避免数据竞争和其他并发问题,特别是当多个线程访问共享资源时。在这种情况下,使用互斥锁(如std::mutex)或其他同步原语来保护共享资源仍然是很重要的。
 * 
*/
#include <future> // for std::async and std::future

// 异步任务函数,用于提取和写入单个DICOM标签的元数据
std::future<void> ExtractAndWriteMetadataAsync(std::ofstream& csvFile, DcmDataset* dataset, const DcmTag& tag, const std::string& columnName) {
   
    return std::async(std::launch::async, ExtractAndWriteMetadata, std::ref(csvFile), dataset, tag, columnName);
}

// ... (省略了ExtractAndWriteMetadata函数的定义)


int main() {
   
    // ... (省略了加载DICOM数据和打开CSV文件的代码)

    // 定义要提取的元数据标签
    const std::vector<std::pair<DcmTag, std::string>> tags = {
   
        // ... (省略了标签定义)
    };

    // 使用std::vector<std::future<void>>来存储异步任务的结果
    std::vector<std::future<void>> futures;

    // 为每个标签创建一个异步任务
    for (const auto& tagPair : tags) {
   
        futures.emplace_back(ExtractAndWriteMetadataAsync(csvFile, dataset, tagPair.first, tagPair.second));
    }

    // 等待所有异步任务完成
    for (auto& future : futures) {
   
        future.get(); // 这将阻塞,直到对应的异步任务完成
    }

    // 关闭CSV文件
    csvFile.close();

    return 0;
}




/**要使用线程池并行处理DICOM元数据提取和写入任务,你可以按照以下步骤进行:

创建一个线程池,指定线程数量。通常,线程数量可以设置为与系统的逻辑处理器数量相同,这样可以充分利用多核处理器的性能。

定义一个任务函数,该函数将负责处理单个DICOM标签的元数据提取和写入操作。

对于每个要处理的DICOM标签,将任务函数提交到线程池中以异步方式执行。

等待所有任务完成。你可以通过std::future对象来获取每个任务的结果,并使用get()方法等待任务完成。

下面是一个使用线程池并行处理DICOM元数据提取和写入的示例代码:

在这个示例中,我们定义了一个ExtractAndWriteMetadataTask函数,它接受一个输出文件流、DICOM数据集、DICOM标签和列名作为参数,并负责提取和写入单个标签的元数据。然后,我们创建了一个线程池,并将任务函数作为lambda表达式提交到线程池中。每个任务都是异步执行的,线程池负责管理和调度这些任务。

通过使用线程池,我们可以并行处理多个DICOM标签的元数据提取和写入操作,从而提高程序的性能。线程池会自动管理线程的创建、销毁和调度,减少了程序员的工作量,并且能够更好地利用多核处理器的性能。

请确保在使用线程池时,对共享资源的访问是线程安全的。如果需要在多个线程之间共享数据集或其他资源,请使用适当的同步机制(如互斥锁)来保护这些资源,以避免数据竞争和其他并发问题。
 * 
*/

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <future>
#include <dcmtk/dcmdata/dcfilefo.h>
#include <dcmtk/dcmdata/dcmimage.h>

// 任务函数,负责提取和写入单个DICOM标签的元数据
void ExtractAndWriteMetadataTask(std::ofstream& csvFile, DcmDataset* dataset, const DcmTag& tag, const std::string& columnName) {
   
    DcmElement element;
    if (dataset->findAndGetElement(tag, element).good()) {
   
        std::string value;
        switch (element.getTag().getEVR()) {
   
            // 处理不同EVR类型的代码...
            default:
                value = "Unknown Type";
                break;
        }
        // 写入CSV文件
        csvFile << tag.getGroup() << "," << tag.getElement() << "," << columnName << "," << value << std::endl;
    } else {
   
        // 标签未找到,写入错误信息
        csvFile << "0000,0000,Error," << "Tag not found" << std::endl;
    }
}

int main() {
   
    // 加载DICOM数据和打开CSV文件的代码...

    // 创建线程池,线程数量设置为硬件并发数
    size_t numThreads = std::thread::hardware_concurrency();
    ThreadPool pool(numThreads);

    // 定义要提取的元数据标签
    const std::vector<std::pair<DcmTag, std::string>> tags = {
   
        {
    DcmTag(0x0010, 0x0010), "Patient Name" },
        {
    DcmTag(0x0010, 0x0020), "Patient ID" },
        {
    DcmTag(0x0010, 0x0040), "Patient Sex" },
        {
    DcmTag(0x0010, 0x0030), "Patient Birth Date" },
        {
    DcmTag(0x0008, 0x0060), "Modality" },
        {
    DcmTag(0x0008, 0x0018), "Image Date" },
        {
    DcmTag(0x0008, 0x0019), "Image Time" }
    };

    // 存储std::future对象的vector,用于等待任务完成
    std::vector<std::future<void>> futures;

    // 提交任务到线程池
    for (const auto& tagPair : tags) {
   
        futures.push_back(pool.enqueue(
            [&, dataset = dataset, tag = tagPair.first, columnName = tagPair.second]() {
   
                ExtractAndWriteMetadataTask(csvFile, dataset, tag, columnName);
            }
        ));
    }

    // 等待所有任务完成
    for (auto& future : futures) {
   
        future.get(); // 阻塞直到任务完成
    }

    // 关闭CSV文件
    csvFile.close();

    return 0;
}





/**
 * 确实,一个健壮的线程池实现需要考虑更多的细节和边界情况。下面是对ThreadPool类进行改进的一些建议:

任务队列容器选择:

如果任务队列需要频繁地插入和删除操作,std::queue可能不是最优选择,因为它在删除头部元素时可能需要线性时间。考虑使用std::deque或std::list,它们可以在常数时间内删除头部元素。
任务队列容量限制:

线程池应该有一个任务队列的最大容量限制,以防止无限制的任务堆积导致内存耗尽。当队列满时,可以选择拒绝新任务、等待有空闲线程、或增加线程池的大小。
线程池大小调整:

线程池的大小可能需要根据工作负载动态调整。当任务队列长时间为空时,可以减少线程数量以节省资源;当任务队列持续增长时,可以增加线程数量以提高吞吐量。
任务取消和超时:

提供一个机制来取消正在等待执行的任务或设置任务执行超时。
优雅地停止线程池:

当需要停止线程池时,应该给线程一个机会来清理资源并优雅地退出。这可以通过设置一个标志位(如stop),并在每个工作线程中检查这个标志位来实现。同时,应该确保所有任务都已完成或已被取消后,才允许线程退出。
异常处理:

不仅要捕获并处理任务执行中的异常,还要处理线程启动和终止时的异常。
日志和监控:

添加日志记录功能,以便跟踪线程池的状态和任务执行情况。这有助于调试和性能监控。
任务返回值和状态:

如果任务有返回值或需要报告状态,应该提供一种机制来检索这些信息。
线程本地存储(Thread Local Storage, TLS):

考虑使用线程本地存储来存储线程特有的数据,这样可以避免数据竞争并减少锁的使用。
资源限制和隔离:

在多租户环境中,可能需要限制每个线程池的资源使用,比如CPU时间、内存使用量等,并防止一个线程池的任务影响其他线程池的执行。
任务优先级:

根据任务的重要性和紧急性,可以为任务设置优先级,并相应地调整任务队列的管理策略。
死锁预防:

线程池内部应避免产生死锁的情况,比如在使用互斥锁和其他同步机制时要特别注意锁的顺序。
线程安全的数据结构:

确保所有在多个线程间共享的数据结构都是线程安全的,或者使用适当的同步机制来保护它们。

在继续优化ThreadPool类时,我们可以考虑以下几个方面:

线程池启动和关闭:

提供明确的启动和关闭方法来控制线程池的生命周期。
在关闭线程池时,确保所有正在执行的任务都已完成,并且不再接受新任务。
任务队列的线程安全:

确保任务队列在入队和出队操作时是线程安全的。
使用std::queue配合std::mutex来实现线程安全的队列。
任务返回值的处理:

如果任务有返回值,我们应该使用std::future和std::promise来存储和检索这些返回值。
动态调整线程数量:

根据工作负载调整线程池的大小。这可以通过监视任务队列的长度和/或系统的负载来实现。
任务取消:

提供一种机制来取消正在等待执行的任务。这可能需要任务本身支持取消操作。
错误处理和日志记录:

添加详细的日志记录,以便跟踪和调试线程池的行为。
提供错误处理机制,以便在任务执行失败时通知用户。
资源管理:

确保线程池在使用完毕后释放所有资源,包括线程和动态分配的内存。
任务调度策略:

根据需要实现不同的任务调度策略,如优先级调度、先入先出(FIFO)、最短作业优先(SJF)等。
异常传播:

如果任务抛出异常,并且该异常没有在任务内部捕获,线程会终止。线程池应该捕获这些异常,并允许调用者访问它们。
线程池的扩展性:

设计线程池以支持扩展,比如添加更多功能或集成到更大的系统中。

*/

#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <stdexcept>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <unordered_map>
#include <iostream>

template <typename Task>
class ThreadPool {
   
public:
    using TaskID = std::future<void>::get_id;

    ThreadPool(size_t threads)
        : stop(false), tasks_queue(), workers(threads), active_tasks(0) {
   
        for (size_t i = 0; i < threads; ++i) {
   
            workers.emplace_back([this] {
   
                while (true) {
   
                    std::function<void()> task;
                    TaskID task_id;
                    std::atomic<bool> cancel_token(false);

                    {
   
                        std::unique_lock<std::mutex> lock(mutex);
                        condition.wait(lock, [this] {
    return stop || (tasks_queue.empty() && active_tasks == 0); });
                        if (stop && tasks_queue.empty()) {
   
                            return;
                        }
                        task = std::move(tasks_queue.front().first);
                        task_id = tasks_queue.front().second;
                        cancel_token = tasks_queue.front().third;
                        tasks_queue.pop();
                        ++active_tasks;
                    }

                    try {
   
                        if (cancel_token.load()) {
   
                            // Task was cancelled before starting
                            throw std::runtime_error("Task cancelled");
                        }
                        task();
                    } catch (const std::runtime_error& e) {
   
                        if (e.what() != "Task cancelled") {
   
                            // Log or handle other exceptions
                            std::cerr << "Exception in thread pool task: " << e.what() << '\n';
                        }
                    } catch (...) {
   
                        // Log or handle unknown exceptions
                        std::cerr << "Unknown exception in thread pool task\n";
                    }

                    {
   
                        std::lock_guard<std::mutex> lock(mutex);
                        --active_tasks;
                    }
                }
            });
        }
    }

    ~ThreadPool() {
   
        stop();
        for (std::thread &worker : workers) {
   
            if (worker.joinable()) {
   
                worker.join();
            }
        }
    }

    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    template <typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<void> {
   
        auto task = std::make_shared<std::packaged_task<void()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<void> res = task->get_future();
        TaskID task_id = res.get_id();

        {
   
            std::unique_lock<std::mutex> lock(mutex);
            if (stop) {
   
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks_queue.emplace(std::move(task), task_id, std::atomic<bool>(false));
        }
        condition.notify_one();
        return res;
    }

    void start() {
   
        std::unique_lock<std::mutex> lock(mutex);
        if (stop) {
   
            throw std::runtime_error("Cannot start a stopped ThreadPool");
        }
        if (active_tasks == 0) {
   
            // Start the worker threads if not already started
            for (auto& worker : workers) {
   
                if (!worker.joinable()) {
   
                    worker = std::thread([this] {
   
                        while (true) {
   
                            std::function<void()> task;
                            TaskID task_id;
                            std::atomic<bool> cancel_token(false);

                            {
   
                                std::unique_lock<std::mutex> lock(mutex);
                                condition.wait(lock, [this] {
    return stop || !tasks_queue.empty(); });
                                if (stop && tasks_queue.empty()) {
   
                                    return;
                                }
                                task = std::move(tasks_queue.front().first);
                                task_id = tasks_queue.front().second;
                                cancel_token = tasks_queue.front().third;
                                tasks_queue.pop();
                                ++active_tasks;
                            }

                            try {
   
                                if (cancel_token.load()) {
   
                                    // Task was cancelled before starting
                                    throw std::runtime_error("Task cancelled");
                                }
                                task();
                            } catch (const std::runtime_error& e) {
   
                                if (e.what() != "Task cancelled") {
   
                                    // Log or handle other exceptions
                                    std::cerr << "Exception in thread pool task: " << e.what() << '\n';
                                }
                            } catch (...) {
   
                                // Log or handle unknown exceptions
                                std::cerr << "Unknown exception in thread pool task\n";
                            }

                            {
   
                                std::lock_guard<std::mutex> lock(mutex);
                                if (cancel_token && cancel_token.load()) {
   
                                    // Task was cancelled during execution
                                } else {
   
                                    --active_tasks;
                                }
                            }
                        }
                    });
                }
            }
        }
    }

    void stop() {
   
        {
   
            std::unique_lock<std::mutex> lock(mutex);
            stop = true; // Set the stop flag
        }
        // Notify all waiting threads that they should exit
        condition.notify_all();
        // Wait for all threads to finish
        for (std::thread &worker : workers) {
   
            if (worker.joinable()) {
   
                worker.join();
            }
        }
        // Clear the task queue and any remaining task references
        while (!tasks_queue.empty()) {
   
            tasks_queue.pop();
        }
        task_ids.clear();
    }

/**
 *     // 在cancel方法中
    bool cancel(TaskID task_id) {
        std::unique_lock<std::mutex> lock(mutex);
        auto it = task_ids.find(task_id);
        if (it == task_ids.end()) {
            return false; // 任务未找到
        }
        std::shared_ptr<std::packaged_task<void()>> task = it->second;
        task_ids.erase(it); // 从task_ids中移除任务
        return true;
    }
*/

    // Cancel a task given its ID
    bool cancel(TaskID task_id) {
   
        std::unique_lock<std::mutex> lock(mutex);
        for (auto it = tasks_queue.begin(); it != tasks_queue.end(); ) {
   
            if (it->second == task_id) {
   
                it->third.store(true); // Set the cancel token
                it = tasks_queue.erase(it); // Erase the task from the queue
                return true; // Task cancelled or will be cancelled if not yet started
            }
            ++it;
        }
        return false; // Task not found
    }

    bool isStopped() const {
   
        return stop;
    }

    size_t getActiveTasks() const {
   
        return active_tasks;
    }

private:
    std::unordered_map<TaskID, std::shared_ptr<std::packaged_task<void()>>> task_ids;
    std::atomic<bool> stop;
    //std::queue<std::function<void()>> tasks_queue;
    std::queue<std::tuple<std::shared_ptr<std::packaged_task<void()>>, TaskID, std::atomic<bool>>> tasks_queue; // 任务队列
    std::vector<std::thread> workers;
    std::mutex mutex;
    std::condition_variable condition;
    size_t active_tasks;
};

三 参考链接

https://www.msn.com/zh-cn/news/other/baidu-comate-2-0%E5%8F%91%E5%B8%83-%E9%9D%A2%E5%90%91%E4%B8%AA%E4%BA%BA%E5%BC%80%E5%8F%91%E8%80%85%E5%85%A8%E9%9D%A2%E5%85%8D%E8%B4%B9/ar-BB1la7E5?ocid=msedgdhp&pc=U531&cvid=66123b350f134dccacb9061ef41f5415&ei=51

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
2月前
|
安全 Java UED
深入浅出Java多线程编程
【10月更文挑战第40天】在Java的世界中,多线程是提升应用性能和响应能力的关键。本文将通过浅显易懂的方式介绍Java中的多线程编程,从基础概念到高级特性,再到实际应用案例,带你一步步深入了解如何在Java中高效地使用多线程。文章不仅涵盖了理论知识,还提供了实用的代码示例,帮助你在实际开发中更好地应用多线程技术。
65 5
|
24天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
136 2
|
24天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
53 1
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
1月前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
52 10
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
61 3
|
1月前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
45 4