开发者社区> 问答> 正文

单生产者多重消费者骨架

我正在努力建立一个单一的生产者多重消费者框架。

Q1.消费者是否需要在生产者发布它的条件_Variable.Notificone()之前,将其可用性告知生产者(其他一些条件_变量)?

Q2.我需要采取哪些重要的额外步骤,使消费者不重复地分享生产者的产出?

#include <cstdint>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <string>
#include <vector>
#include <queue>
#include <random>

#include <functional>
#include <thread>
#include <chrono>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
    int main() {
        std::condition_variable conditionVar;
        std::mutex mtx;
        std::queue<int64_t> messageQueue;
        bool stopped = false;
        const std::size_t workSize = 4096;

        std::function<void(int64_t)> producerLambda = [&](int64_t id) {
            // Prepare a random number generator and push to the queue
            std::default_random_engine randomNumberGen{};
            std::uniform_int_distribution<int64_t> uniformDistribution{};

            for (auto count = 0; count < workSize; count++){
                //Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
                std::lock_guard<std::mutex> lockGuard{ mtx };

                //Push a random number onto the queue
                messageQueue.push(uniformDistribution(randomNumberGen));

                //Notify the consumer
                conditionVar.notify_one();
            }
            //Production finished
            //Acquire the lock, set the stopped flag, inform the consumer
            std::lock_guard<std::mutex> lockGuard {mtx };

            std::cout << "Producer is done!" << std::endl;

            stopped = true;
            conditionVar.notify_one();
        };

        std::function<void(int64_t)> consumerLambda = [&](int64_t id) {

            do {
                std::unique_lock<std::mutex> uniqueLock{ mtx };
                //Acquire the lock only if stopped or the queue isn't empty
                conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });

                //This thread owns the mutex here; pop the queue until it is empty
                while (!messageQueue.empty()) {
                    const auto val = messageQueue.front(); messageQueue.pop();
                    std::cout << "Consumer obtained: " << val << std::endl;
                }
                uniqueLock.unlock();

                if (stopped) {
                    //Producer has signaled a stop
                    std::cout << "Consumer is done!" << std::endl;
                    break;
                }

            } while (true);
        };

        std::thread consumer{ consumerLambda, 1 };
        std::thread producer{ producerLambda, 2 };
        consumer.join();
        producer.join();

        std::cout << "singleProducerSingleConsumer() finished" << std::endl;
        return 0;
    }

展开
收起
aqal5zs3gkqgc 2019-12-19 20:42:52 1829 0
0 条回答
写回答
取消 提交回答
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载