ConcurrencyC++verifiedVerified
Thread Pool Pattern in C++
Maintain a fixed set of reusable worker threads that pick up tasks from a queue, avoiding the overhead of spawning a new thread per task.
How to Implement the Thread Pool Pattern in C++
1Step 1: Define a simple thread pool
class ThreadPool {
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mu_;
std::condition_variable cv_;
bool stop_ = false;
public:
explicit ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock lock(mu_);
cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}2Step 2: Submit a task and get a future
template <typename F>
auto submit(F&& f) -> std::future<decltype(f())> {
auto task = std::make_shared<std::packaged_task<decltype(f())()>>(
std::forward<F>(f));
auto future = task->get_future();
{
std::lock_guard lock(mu_);
tasks_.push([task] { (*task)(); });
}
cv_.notify_one();
return future;
}3Step 3: Graceful shutdown
~ThreadPool() {
{ std::lock_guard lock(mu_); stop_ = true; }
cv_.notify_all();
for (auto& w : workers_) w.join();
}
};
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> futures;
for (int i = 0; i < 8; ++i) {
futures.push_back(pool.submit([i] {
return i * i;
}));
}
for (auto& f : futures)
std::cout << f.get() << " ";
std::cout << "\n";
}#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <atomic>
#include <format>
#include <chrono>
#include <stdexcept>
// [step] Define task priority and metadata
enum class Priority { Low = 0, Normal = 1, High = 2, Critical = 3 };
struct TaskEntry {
std::function<void()> fn;
Priority priority;
int id;
bool operator<(const TaskEntry& other) const {
return priority < other.priority; // max-heap by priority
}
};
// [step] Thread pool with priority queue, stats, and graceful shutdown
class ThreadPool {
std::vector<std::thread> workers_;
std::priority_queue<TaskEntry> tasks_;
mutable std::mutex mu_;
std::condition_variable cv_;
std::atomic<bool> stop_{false};
std::atomic<bool> paused_{false};
std::atomic<int> activeTasks_{0};
std::atomic<int> completedTasks_{0};
std::atomic<int> failedTasks_{0};
std::atomic<int> nextTaskId_{0};
public:
explicit ThreadPool(size_t numThreads) {
workers_.reserve(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this, i] { workerLoop(i); });
}
}
// [step] Submit with priority and return a future
template <typename F>
auto submit(F&& f, Priority priority = Priority::Normal)
-> std::future<decltype(f())>
{
using ReturnType = decltype(f());
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::forward<F>(f));
auto future = task->get_future();
{
std::lock_guard lock(mu_);
if (stop_)
throw std::runtime_error("Cannot submit to stopped pool");
int id = nextTaskId_++;
tasks_.push({[task] { (*task)(); }, priority, id});
}
cv_.notify_one();
return future;
}
void pause() { paused_ = true; }
void resume() {
paused_ = false;
cv_.notify_all();
}
// [step] Wait for all tasks to complete
void waitAll() {
std::unique_lock lock(mu_);
cv_.wait(lock, [this] {
return tasks_.empty() && activeTasks_ == 0;
});
}
struct Stats {
size_t workerCount;
int activeTasks;
int completedTasks;
int failedTasks;
size_t pendingTasks;
};
Stats stats() const {
std::lock_guard lock(mu_);
return {workers_.size(), activeTasks_.load(), completedTasks_.load(),
failedTasks_.load(), tasks_.size()};
}
~ThreadPool() {
stop_ = true;
paused_ = false;
cv_.notify_all();
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
}
private:
void workerLoop(size_t workerId) {
while (true) {
TaskEntry entry;
{
std::unique_lock lock(mu_);
cv_.wait(lock, [this] {
return (stop_ || (!tasks_.empty() && !paused_));
});
if (stop_ && tasks_.empty()) return;
if (tasks_.empty()) continue;
entry = std::move(const_cast<TaskEntry&>(tasks_.top()));
tasks_.pop();
}
++activeTasks_;
try {
entry.fn();
++completedTasks_;
} catch (...) {
++failedTasks_;
}
--activeTasks_;
cv_.notify_all();
}
}
};
// [step] Demonstrate priority scheduling and stats
int main() {
ThreadPool pool(4);
// Submit tasks with different priorities
std::vector<std::future<std::string>> futures;
for (int i = 0; i < 5; ++i) {
futures.push_back(pool.submit([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return std::format("low-{}", i);
}, Priority::Low));
}
futures.push_back(pool.submit([] {
return std::string("CRITICAL task done");
}, Priority::Critical));
futures.push_back(pool.submit([] {
return std::string("HIGH priority done");
}, Priority::High));
// Collect results
for (auto& f : futures)
std::cout << f.get() << "\n";
auto s = pool.stats();
std::cout << std::format(
"\nPool stats: workers={}, completed={}, failed={}, pending={}\n",
s.workerCount, s.completedTasks, s.failedTasks, s.pendingTasks);
}Thread Pool Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Thread Pool Pattern in the Real World
“A hotel concierge desk staffed by three concierges represents the thread pool. No matter how many guests check in, only three requests are handled simultaneously. Other guests wait in the lobby queue. When a concierge finishes, they immediately assist the next waiting guest — the staff are never created or dismissed per guest, they simply stay on duty.”