我正在努力建立一个单一的生产者多重消费者框架。
Q1.消费者是否需要在生产者发布它的条件_Variable.Notificone()之前,将其可用性告知生产者(其他一些条件_变量)?
Q2.我需要采取哪些重要的额外步骤,使消费者不重复地分享生产者的产出?
#include <cstdint>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <string>
#include <vector>
#include <queue>
#include <random>
#include <functional>
#include <thread>
#include <chrono>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
int main() {
std::condition_variable conditionVar;
std::mutex mtx;
std::queue<int64_t> messageQueue;
bool stopped = false;
const std::size_t workSize = 4096;
std::function<void(int64_t)> producerLambda = [&](int64_t id) {
// Prepare a random number generator and push to the queue
std::default_random_engine randomNumberGen{};
std::uniform_int_distribution<int64_t> uniformDistribution{};
for (auto count = 0; count < workSize; count++){
//Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
std::lock_guard<std::mutex> lockGuard{ mtx };
//Push a random number onto the queue
messageQueue.push(uniformDistribution(randomNumberGen));
//Notify the consumer
conditionVar.notify_one();
}
//Production finished
//Acquire the lock, set the stopped flag, inform the consumer
std::lock_guard<std::mutex> lockGuard {mtx };
std::cout << "Producer is done!" << std::endl;
stopped = true;
conditionVar.notify_one();
};
std::function<void(int64_t)> consumerLambda = [&](int64_t id) {
do {
std::unique_lock<std::mutex> uniqueLock{ mtx };
//Acquire the lock only if stopped or the queue isn't empty
conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });
//This thread owns the mutex here; pop the queue until it is empty
while (!messageQueue.empty()) {
const auto val = messageQueue.front(); messageQueue.pop();
std::cout << "Consumer obtained: " << val << std::endl;
}
uniqueLock.unlock();
if (stopped) {
//Producer has signaled a stop
std::cout << "Consumer is done!" << std::endl;
break;
}
} while (true);
};
std::thread consumer{ consumerLambda, 1 };
std::thread producer{ producerLambda, 2 };
consumer.join();
producer.join();
std::cout << "singleProducerSingleConsumer() finished" << std::endl;
return 0;
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。