ConcurrencyC++verifiedVerified
Semaphore Pattern in C++
Control access to a finite pool of resources by maintaining a counter that threads atomically increment (release) and decrement (acquire), blocking when the count reaches zero.
How to Implement the Semaphore Pattern in C++
1Step 1: Use counting_semaphore to limit concurrent access
std::counting_semaphore<3> pool(3); // max 3 concurrent
void accessResource(int id) {
pool.acquire();
std::cout << std::format("Thread {} acquired access\n", id);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << std::format("Thread {} releasing\n", id);
pool.release();
}2Step 2: Launch more threads than the semaphore allows
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 8; ++i)
threads.emplace_back(accessResource, i);
for (auto& t : threads) t.join();
std::cout << "All done\n";
}#include <iostream>
#include <thread>
#include <vector>
#include <semaphore>
#include <mutex>
#include <queue>
#include <format>
#include <chrono>
#include <atomic>
#include <functional>
#include <string>
// [step] RAII semaphore guard for exception-safe acquire/release
template <std::ptrdiff_t MaxCount>
class SemaphoreGuard {
std::counting_semaphore<MaxCount>& sem_;
public:
explicit SemaphoreGuard(std::counting_semaphore<MaxCount>& sem) : sem_(sem) {
sem_.acquire();
}
~SemaphoreGuard() { sem_.release(); }
SemaphoreGuard(const SemaphoreGuard&) = delete;
SemaphoreGuard& operator=(const SemaphoreGuard&) = delete;
};
// [step] Connection pool using semaphore for capacity management
class ConnectionPool {
static constexpr int MAX_CONNECTIONS = 5;
struct Connection {
int id;
bool inUse;
};
std::vector<Connection> connections_;
std::counting_semaphore<MAX_CONNECTIONS> semaphore_{MAX_CONNECTIONS};
std::mutex mu_;
std::atomic<int> totalAcquired_{0};
std::atomic<int> totalReleased_{0};
public:
ConnectionPool() {
for (int i = 0; i < MAX_CONNECTIONS; ++i)
connections_.push_back({i, false});
}
class ConnectionHandle {
ConnectionPool& pool_;
int connId_;
public:
ConnectionHandle(ConnectionPool& pool, int id) : pool_(pool), connId_(id) {}
~ConnectionHandle() { pool_.releaseConnection(connId_); }
int id() const { return connId_; }
ConnectionHandle(const ConnectionHandle&) = delete;
ConnectionHandle& operator=(const ConnectionHandle&) = delete;
};
// [step] Acquire blocks until a connection is available
std::unique_ptr<ConnectionHandle> acquire() {
semaphore_.acquire(); // blocks if all connections in use
std::lock_guard lock(mu_);
for (auto& conn : connections_) {
if (!conn.inUse) {
conn.inUse = true;
++totalAcquired_;
return std::make_unique<ConnectionHandle>(*this, conn.id);
}
}
// Should never reach here thanks to semaphore
semaphore_.release();
throw std::runtime_error("No available connections");
}
// [step] Try acquire with timeout
std::unique_ptr<ConnectionHandle> tryAcquire(std::chrono::milliseconds timeout) {
if (!semaphore_.try_acquire_for(timeout))
return nullptr;
std::lock_guard lock(mu_);
for (auto& conn : connections_) {
if (!conn.inUse) {
conn.inUse = true;
++totalAcquired_;
return std::make_unique<ConnectionHandle>(*this, conn.id);
}
}
semaphore_.release();
return nullptr;
}
struct Stats {
int totalAcquired;
int totalReleased;
int currentlyInUse;
};
Stats stats() const {
return {totalAcquired_.load(), totalReleased_.load(),
totalAcquired_.load() - totalReleased_.load()};
}
private:
void releaseConnection(int id) {
{
std::lock_guard lock(mu_);
connections_[id].inUse = false;
++totalReleased_;
}
semaphore_.release();
}
};
// [step] Rate limiter using semaphore with token refill
class RateLimiter {
static constexpr int MAX_TOKENS = 10;
std::counting_semaphore<MAX_TOKENS> tokens_{MAX_TOKENS};
std::atomic<bool> running_{true};
std::thread refillThread_;
public:
RateLimiter(std::chrono::milliseconds refillInterval, int tokensPerRefill) {
refillThread_ = std::thread([this, refillInterval, tokensPerRefill] {
while (running_) {
std::this_thread::sleep_for(refillInterval);
for (int i = 0; i < tokensPerRefill && running_; ++i)
tokens_.release();
}
});
}
bool tryConsume(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) {
return tokens_.try_acquire_for(timeout);
}
~RateLimiter() {
running_ = false;
if (refillThread_.joinable()) refillThread_.join();
}
};
// [step] Demonstrate connection pool and rate limiter
int main() {
ConnectionPool pool;
// Simulate 20 concurrent clients for 5 connections
std::vector<std::thread> clients;
for (int i = 0; i < 20; ++i) {
clients.emplace_back([&pool, i] {
auto conn = pool.acquire(); // blocks until available
std::cout << std::format("Client {} got connection {}\n", i, conn->id());
std::this_thread::sleep_for(std::chrono::milliseconds(50));
// ConnectionHandle releases automatically via RAII
});
}
for (auto& t : clients) t.join();
auto s = pool.stats();
std::cout << std::format(
"\nPool stats: acquired={}, released={}, in-use={}\n",
s.totalAcquired, s.totalReleased, s.currentlyInUse);
}Semaphore Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Semaphore Pattern in the Real World
“Imagine a car park with exactly three spaces. A ticket machine at the entrance (the semaphore) issues a ticket only if spaces remain, lifting the barrier; arriving drivers with no ticket available must wait. When a car exits, the machine automatically increments its counter and releases the next waiting driver — the car park never exceeds capacity.”