ConcurrencyPHPverifiedVerified
Semaphore Pattern in PHP
Control access to a finite pool of resources by maintaining a counter that threads atomically increment (release) and decrement (acquire), blocking when the count reaches zero.
How to Implement the Semaphore Pattern in PHP
1Step 1: Define a counting semaphore
class Semaphore
{
private int $permits;
public function __construct(int $maxPermits)
{
$this->permits = $maxPermits;
}2Step 2: Acquire a permit (decrement)
public function acquire(): bool
{
if ($this->permits <= 0) {
return false;
}
$this->permits--;
return true;
}3Step 3: Release a permit (increment)
public function release(): void
{
$this->permits++;
}
public function available(): int
{
return $this->permits;
}
}4Step 4: Use Fibers to simulate concurrent resource access
class ConnectionPool
{
private Semaphore $semaphore;
public function __construct(int $maxConnections)
{
$this->semaphore = new Semaphore($maxConnections);
}
public function withConnection(callable $work): \Fiber
{
return new \Fiber(function () use ($work): void {
while (!$this->semaphore->acquire()) {
\Fiber::suspend('waiting');
}
try {
$work();
} finally {
$this->semaphore->release();
}
});
}
}
// Usage
$pool = new ConnectionPool(2);
$fibers = [];
for ($i = 0; $i < 5; $i++) {
$id = $i;
$fibers[] = $pool->withConnection(function () use ($id): void {
echo "Worker {$id} using connection\n";
});
}
foreach ($fibers as $fiber) { $fiber->start(); }
for ($tick = 0; $tick < 20; $tick++) {
foreach ($fibers as $fiber) {
if ($fiber->isSuspended()) $fiber->resume();
}
}<?php
declare(strict_types=1);
// [step] Define the semaphore interface with timeout support
interface SemaphoreInterface
{
public function acquire(float $timeoutMs = 0): bool;
public function release(): void;
public function availablePermits(): int;
/** @template T
* @param callable(): T $callback
* @return T
*/
public function withPermit(callable $callback): mixed;
}
interface LoggerInterface
{
public function info(string $message, array $context = []): void;
public function warning(string $message, array $context = []): void;
}
// [step] Implement a System V semaphore wrapper for real inter-process coordination
final class CountingSemaphore implements SemaphoreInterface
{
private int $currentPermits;
private int $acquiredCount = 0;
private readonly string $name;
public function __construct(
string $name,
private readonly int $maxPermits,
private readonly LoggerInterface $logger,
) {
if ($maxPermits <= 0) {
throw new \InvalidArgumentException("Max permits must be positive, got {$maxPermits}");
}
$this->name = $name;
$this->currentPermits = $maxPermits;
}
public function acquire(float $timeoutMs = 0): bool
{
$deadline = $timeoutMs > 0 ? microtime(true) + ($timeoutMs / 1000) : 0;
do {
if ($this->currentPermits > 0) {
$this->currentPermits--;
$this->acquiredCount++;
$this->logger->info("Permit acquired", [
'semaphore' => $this->name,
'remaining' => $this->currentPermits,
]);
return true;
}
if ($deadline > 0 && microtime(true) >= $deadline) {
$this->logger->warning("Acquire timed out", ['semaphore' => $this->name]);
return false;
}
// In a real concurrent env, you'd use a proper wait mechanism
usleep(100);
} while ($deadline > 0);
return false;
}
public function release(): void
{
if ($this->currentPermits >= $this->maxPermits) {
throw new \LogicException("Cannot release: no permits currently acquired");
}
$this->currentPermits++;
$this->logger->info("Permit released", [
'semaphore' => $this->name,
'available' => $this->currentPermits,
]);
}
public function availablePermits(): int
{
return $this->currentPermits;
}
public function withPermit(callable $callback): mixed
{
if (!$this->acquire(timeoutMs: 5000)) {
throw new \RuntimeException("Failed to acquire semaphore permit: {$this->name}");
}
try {
return $callback();
} finally {
$this->release();
}
}
public function getTotalAcquired(): int { return $this->acquiredCount; }
}
// [step] Implement a rate limiter built on a semaphore
final class SemaphoreRateLimiter
{
private readonly CountingSemaphore $semaphore;
/** @var float[] Timestamps of released permits awaiting recycling */
private array $releaseTimestamps = [];
public function __construct(
private readonly int $maxConcurrent,
private readonly float $windowSeconds,
private readonly LoggerInterface $logger,
) {
$this->semaphore = new CountingSemaphore(
name: 'rate_limiter',
maxPermits: $maxConcurrent,
logger: $logger,
);
}
/** @template T
* @param callable(): T $work
* @return T
*/
public function execute(callable $work): mixed
{
$this->recycleExpired();
return $this->semaphore->withPermit(function () use ($work): mixed {
try {
return $work();
} finally {
$this->releaseTimestamps[] = microtime(true);
}
});
}
private function recycleExpired(): void
{
$now = microtime(true);
$this->releaseTimestamps = array_values(array_filter(
$this->releaseTimestamps,
fn(float $ts) => ($now - $ts) < $this->windowSeconds,
));
}
public function getAvailableSlots(): int
{
return $this->semaphore->availablePermits();
}
}Semaphore Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Semaphore Pattern in the Real World
“Imagine a car park with exactly three spaces. A ticket machine at the entrance (the semaphore) issues a ticket only if spaces remain, lifting the barrier; arriving drivers with no ticket available must wait. When a car exits, the machine automatically increments its counter and releases the next waiting driver — the car park never exceeds capacity.”