ConcurrencyPHPverifiedVerified
Producer-Consumer Pattern in PHP
Decouple data production from data consumption using a shared buffer, allowing each side to operate at its own pace.
How to Implement the Producer-Consumer Pattern in PHP
1Step 1: Define the bounded queue using SplQueue
class BoundedQueue
{
private \SplQueue $items;
private int $size = 0;
public function __construct(
private readonly int $capacity,
) {
$this->items = new \SplQueue();
}
public function enqueue(mixed $item): bool
{
if ($this->size >= $this->capacity) {
return false; // Queue full
}
$this->items->enqueue($item);
$this->size++;
return true;
}
public function dequeue(): mixed
{
if ($this->items->isEmpty()) {
return null;
}
$this->size--;
return $this->items->dequeue();
}
public function isFull(): bool { return $this->size >= $this->capacity; }
public function isEmpty(): bool { return $this->items->isEmpty(); }
public function getSize(): int { return $this->size; }
}2Step 2: Simulate producer and consumer using Fibers
function producer(BoundedQueue $queue, int $count): \Fiber
{
return new \Fiber(function () use ($queue, $count): void {
for ($i = 0; $i < $count; $i++) {
while ($queue->isFull()) {
\Fiber::suspend('queue_full');
}
$queue->enqueue($i);
echo "Produced: {$i}\n";
\Fiber::suspend('produced');
}
});
}
function consumer(BoundedQueue $queue, int $count): \Fiber
{
return new \Fiber(function () use ($queue, $count): void {
for ($i = 0; $i < $count; $i++) {
while ($queue->isEmpty()) {
\Fiber::suspend('queue_empty');
}
$item = $queue->dequeue();
echo "Consumed: {$item}\n";
\Fiber::suspend('consumed');
}
});
}3Step 3: Coordinate fibers to simulate concurrency
$queue = new BoundedQueue(3);
$p = producer($queue, 5);
$c = consumer($queue, 5);
$p->start();
$c->start();
while (!$p->isTerminated() || !$c->isTerminated()) {
if (!$p->isTerminated()) $p->resume();
if (!$c->isTerminated()) $c->resume();
}<?php
declare(strict_types=1);
// [step] Define task and result types
final readonly class Task
{
public function __construct(
public string $id,
public mixed $payload,
public int $priority = 0,
public int $maxRetries = 3,
) {}
}
final readonly class TaskResult
{
public function __construct(
public string $taskId,
public bool $success,
public mixed $result = null,
public ?string $error = null,
public int $attempts = 1,
public float $durationMs = 0,
) {}
}
enum BackpressureStrategy: string
{
case Block = 'block';
case Drop = 'drop';
case Error = 'error';
}
interface LoggerInterface
{
public function info(string $message, array $context = []): void;
public function error(string $message, array $context = []): void;
}
// [step] Implement the priority task queue with backpressure
final class TaskQueue
{
/** @var Task[] */
private array $queue = [];
private bool $shutdown = false;
/** @var TaskResult[] */
private array $results = [];
public function __construct(
private readonly int $capacity,
private readonly BackpressureStrategy $backpressure,
private readonly LoggerInterface $logger,
) {}
public function submit(Task $task): bool
{
if ($this->shutdown) {
throw new \RuntimeException('Queue is shut down');
}
if (count($this->queue) >= $this->capacity) {
return match ($this->backpressure) {
BackpressureStrategy::Drop => false,
BackpressureStrategy::Error => throw new \OverflowException(
"Queue full (capacity: {$this->capacity})"
),
BackpressureStrategy::Block => false, // Caller should retry
};
}
$this->queue[] = $task;
// Sort by priority (highest first)
usort($this->queue, fn(Task $a, Task $b) => $b->priority <=> $a->priority);
$this->logger->info("Task submitted", ['taskId' => $task->id, 'queueSize' => count($this->queue)]);
return true;
}
public function take(): ?Task
{
if (empty($this->queue)) return null;
return array_shift($this->queue);
}
public function addResult(TaskResult $result): void
{
$this->results[] = $result;
}
public function shutdown(): void { $this->shutdown = true; }
public function isShutdown(): bool { return $this->shutdown; }
public function isEmpty(): bool { return empty($this->queue); }
/** @return TaskResult[] */
public function getResults(): array { return $this->results; }
}
// [step] Implement workers using Fibers
final class Worker
{
public function __construct(
private readonly int $id,
private readonly TaskQueue $queue,
private readonly Closure $handler,
private readonly LoggerInterface $logger,
) {}
public function createFiber(): \Fiber
{
return new \Fiber(function (): void {
while (!$this->queue->isShutdown() || !$this->queue->isEmpty()) {
$task = $this->queue->take();
if ($task === null) {
\Fiber::suspend('idle');
continue;
}
$start = hrtime(true);
$attempts = 0;
$lastError = null;
while ($attempts <= $task->maxRetries) {
try {
$result = ($this->handler)($task);
$this->queue->addResult(new TaskResult(
taskId: $task->id,
success: true,
result: $result,
attempts: $attempts + 1,
durationMs: (hrtime(true) - $start) / 1e6,
));
break;
} catch (\Throwable $e) {
$lastError = $e->getMessage();
$attempts++;
$this->logger->error("Worker {$this->id}: task {$task->id} failed (attempt {$attempts})", [
'error' => $lastError,
]);
}
}
if ($lastError !== null && $attempts > $task->maxRetries) {
$this->queue->addResult(new TaskResult(
taskId: $task->id,
success: false,
error: $lastError,
attempts: $attempts,
durationMs: (hrtime(true) - $start) / 1e6,
));
}
\Fiber::suspend('processed');
}
});
}
}Producer-Consumer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Producer-Consumer Pattern in the Real World
“Think of a bakery where bakers (producers) place fresh loaves on a display shelf (the queue) and customers (consumers) pick them up at their leisure. The shelf decouples the baking schedule from customer arrival times — bakers keep baking even when no customer is present, and customers keep shopping even when bakers are on break.”