ConcurrencyPHPverifiedVerified
Thread Pool Pattern in PHP
Maintain a fixed set of reusable worker threads that pick up tasks from a queue, avoiding the overhead of spawning a new thread per task.
How to Implement the Thread Pool Pattern in PHP
1Step 1: Define the task type
interface Task
{
public function run(): mixed;
}2Step 2: Implement a Fiber-based thread pool
class FiberPool
{
/** @var \Fiber[] */
private array $fibers = [];
/** @var Task[] */
private array $queue = [];
/** @var mixed[] */
private array $results = [];
private int $activeCount = 0;
public function __construct(
private readonly int $maxConcurrency,
) {}
public function submit(Task $task): int
{
$taskId = count($this->queue);
$this->queue[] = $task;
return $taskId;
}3Step 3: Run all submitted tasks with concurrency limit
public function run(): array
{
$taskIndex = 0;
while ($taskIndex < count($this->queue) || !empty($this->fibers)) {
// Launch new fibers up to the concurrency limit
while ($this->activeCount < $this->maxConcurrency && $taskIndex < count($this->queue)) {
$task = $this->queue[$taskIndex];
$id = $taskIndex;
$fiber = new \Fiber(function () use ($task, $id): array {
return ['id' => $id, 'result' => $task->run()];
});
$fiber->start();
$this->fibers[$id] = $fiber;
$this->activeCount++;
$taskIndex++;
}
// Check for completed fibers
foreach ($this->fibers as $id => $fiber) {
if ($fiber->isTerminated()) {
$this->results[$id] = $fiber->getReturn();
unset($this->fibers[$id]);
$this->activeCount--;
} elseif ($fiber->isSuspended()) {
$fiber->resume();
}
}
}
return $this->results;
}
}<?php
declare(strict_types=1);
// [step] Define task and result types
interface RunnableInterface
{
public function run(): mixed;
public function getId(): string;
}
final readonly class PoolTaskResult
{
public function __construct(
public string $taskId,
public bool $success,
public mixed $result = null,
public ?string $error = null,
public float $durationMs = 0,
) {}
}
final readonly class PoolConfig
{
public function __construct(
public int $maxWorkers = 4,
public int $queueCapacity = 100,
public float $idleTimeoutMs = 1000,
) {
if ($maxWorkers <= 0) {
throw new \InvalidArgumentException("maxWorkers must be positive");
}
}
}
interface LoggerInterface
{
public function info(string $message, array $context = []): void;
public function error(string $message, array $context = []): void;
}
// [step] Implement a Fiber-based worker pool with lifecycle management
final class FiberWorkerPool
{
/** @var \Fiber[] */
private array $workers = [];
/** @var RunnableInterface[] */
private array $pending = [];
/** @var PoolTaskResult[] */
private array $results = [];
private bool $shutdown = false;
private int $completedCount = 0;
public function __construct(
private readonly PoolConfig $config,
private readonly LoggerInterface $logger,
) {}
public function submit(RunnableInterface $task): void
{
if ($this->shutdown) {
throw new \RuntimeException('Pool is shut down');
}
if (count($this->pending) >= $this->config->queueCapacity) {
throw new \OverflowException("Queue full (capacity: {$this->config->queueCapacity})");
}
$this->pending[] = $task;
$this->logger->info("Task submitted", ['taskId' => $task->getId()]);
}
// [step] Execute all tasks with fiber-based concurrency
public function execute(): void
{
while (!empty($this->pending) || !empty($this->workers)) {
// Launch workers for pending tasks
while (count($this->workers) < $this->config->maxWorkers && !empty($this->pending)) {
$task = array_shift($this->pending);
$this->spawnWorker($task);
}
// Tick all active workers
foreach ($this->workers as $id => $fiber) {
if ($fiber->isTerminated()) {
try {
$this->results[] = $fiber->getReturn();
} catch (\Throwable $e) {
$this->logger->error("Worker result error", ['error' => $e->getMessage()]);
}
unset($this->workers[$id]);
$this->completedCount++;
} elseif ($fiber->isSuspended()) {
try {
$fiber->resume();
} catch (\Throwable $e) {
$this->logger->error("Worker resume error", ['error' => $e->getMessage()]);
unset($this->workers[$id]);
}
}
}
}
$this->logger->info("Pool execution complete", [
'completed' => $this->completedCount,
]);
}
private function spawnWorker(RunnableInterface $task): void
{
$logger = $this->logger;
$fiber = new \Fiber(function () use ($task, $logger): PoolTaskResult {
$start = hrtime(true);
try {
$result = $task->run();
return new PoolTaskResult(
taskId: $task->getId(),
success: true,
result: $result,
durationMs: (hrtime(true) - $start) / 1e6,
);
} catch (\Throwable $e) {
$logger->error("Task failed", [
'taskId' => $task->getId(),
'error' => $e->getMessage(),
]);
return new PoolTaskResult(
taskId: $task->getId(),
success: false,
error: $e->getMessage(),
durationMs: (hrtime(true) - $start) / 1e6,
);
}
});
$fiber->start();
$this->workers[$task->getId()] = $fiber;
}
public function shutdown(): void
{
$this->shutdown = true;
}
/** @return PoolTaskResult[] */
public function getResults(): array { return $this->results; }
public function getCompletedCount(): int { return $this->completedCount; }
public function getActiveWorkerCount(): int { return count($this->workers); }
public function getPendingCount(): int { return count($this->pending); }
}Thread Pool Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Thread Pool Pattern in the Real World
“A hotel concierge desk staffed by three concierges represents the thread pool. No matter how many guests check in, only three requests are handled simultaneously. Other guests wait in the lobby queue. When a concierge finishes, they immediately assist the next waiting guest — the staff are never created or dismissed per guest, they simply stay on duty.”