ConcurrencyC++verifiedVerified
Producer-Consumer Pattern in C++
Decouple data production from data consumption using a shared buffer, allowing each side to operate at its own pace.
How to Implement the Producer-Consumer Pattern in C++
1Step 1: Define a thread-safe bounded queue
template <typename T>
class BlockingQueue {
std::queue<T> queue_;
std::mutex mu_;
std::condition_variable notEmpty_;
std::condition_variable notFull_;
size_t maxSize_;
public:
explicit BlockingQueue(size_t maxSize) : maxSize_(maxSize) {}2Step 2: Producer pushes items (blocks when full)
void push(T item) {
std::unique_lock lock(mu_);
notFull_.wait(lock, [this] { return queue_.size() < maxSize_; });
queue_.push(std::move(item));
notEmpty_.notify_one();
}3Step 3: Consumer pops items (blocks when empty)
T pop() {
std::unique_lock lock(mu_);
notEmpty_.wait(lock, [this] { return !queue_.empty(); });
T item = std::move(queue_.front());
queue_.pop();
notFull_.notify_one();
return item;
}
};
int main() {
BlockingQueue<std::string> queue(5);
// Producer thread
std::thread producer([&queue] {
for (int i = 0; i < 10; ++i) {
queue.push("item-" + std::to_string(i));
std::cout << "Produced: item-" << i << "\n";
}
});
// Consumer thread
std::thread consumer([&queue] {
for (int i = 0; i < 10; ++i) {
auto item = queue.pop();
std::cout << "Consumed: " << item << "\n";
}
});
producer.join();
consumer.join();
}#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <functional>
#include <optional>
#include <format>
#include <chrono>
// [step] Thread-safe bounded queue with shutdown support
template <typename T>
class Channel {
std::queue<T> queue_;
mutable std::mutex mu_;
std::condition_variable producerCv_;
std::condition_variable consumerCv_;
size_t capacity_;
std::atomic<bool> closed_{false};
public:
explicit Channel(size_t capacity) : capacity_(capacity) {}
// Returns false if channel is closed
bool send(T item) {
std::unique_lock lock(mu_);
producerCv_.wait(lock, [this] {
return queue_.size() < capacity_ || closed_.load();
});
if (closed_) return false;
queue_.push(std::move(item));
consumerCv_.notify_one();
return true;
}
// Returns nullopt if channel is closed and empty
std::optional<T> receive() {
std::unique_lock lock(mu_);
consumerCv_.wait(lock, [this] {
return !queue_.empty() || closed_.load();
});
if (queue_.empty()) return std::nullopt;
T item = std::move(queue_.front());
queue_.pop();
producerCv_.notify_one();
return item;
}
void close() {
closed_ = true;
producerCv_.notify_all();
consumerCv_.notify_all();
}
bool isClosed() const { return closed_.load(); }
size_t size() const {
std::lock_guard lock(mu_);
return queue_.size();
}
};
// [step] Multi-producer multi-consumer pipeline
struct WorkItem {
int id;
std::string data;
};
struct ProcessedItem {
int id;
std::string result;
std::chrono::milliseconds processingTime;
};
void producer(Channel<WorkItem>& out, int producerId, int count) {
for (int i = 0; i < count; ++i) {
WorkItem item{producerId * 1000 + i,
std::format("data-{}-{}", producerId, i)};
if (!out.send(std::move(item))) break;
}
}
void worker(Channel<WorkItem>& in, Channel<ProcessedItem>& out, int workerId) {
while (auto item = in.receive()) {
auto start = std::chrono::steady_clock::now();
// Simulate work
std::string result = std::format("processed({})", item->data);
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
out.send({item->id, std::move(result), dur});
}
}
void consumer(Channel<ProcessedItem>& in, std::atomic<int>& totalProcessed) {
while (auto item = in.receive()) {
++totalProcessed;
if (totalProcessed % 5 == 0) {
std::cout << std::format("[Consumer] Processed {} items so far\n",
totalProcessed.load());
}
}
}
// [step] Wire up the pipeline with multiple producers and workers
int main() {
Channel<WorkItem> workQueue(10);
Channel<ProcessedItem> resultQueue(10);
std::atomic<int> totalProcessed{0};
// Start workers
std::vector<std::thread> workers;
for (int i = 0; i < 3; ++i)
workers.emplace_back(worker, std::ref(workQueue),
std::ref(resultQueue), i);
// Start consumer
std::thread consumerThread(consumer, std::ref(resultQueue),
std::ref(totalProcessed));
// Start producers
std::vector<std::thread> producers;
for (int i = 0; i < 2; ++i)
producers.emplace_back(producer, std::ref(workQueue), i, 10);
// Wait for producers to finish
for (auto& t : producers) t.join();
workQueue.close();
// Wait for workers to finish
for (auto& t : workers) t.join();
resultQueue.close();
// Wait for consumer
consumerThread.join();
std::cout << std::format("Total processed: {}\n", totalProcessed.load());
}Producer-Consumer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Producer-Consumer Pattern in the Real World
“Think of a bakery where bakers (producers) place fresh loaves on a display shelf (the queue) and customers (consumers) pick them up at their leisure. The shelf decouples the baking schedule from customer arrival times — bakers keep baking even when no customer is present, and customers keep shopping even when bakers are on break.”