ConcurrencyPythonverifiedVerified
Thread Pool Pattern in Python
Maintain a fixed set of reusable worker threads that pick up tasks from a queue, avoiding the overhead of spawning a new thread per task.
How to Implement the Thread Pool Pattern in Python
1Step 1: Define the task type and implement the pool using asyncio
import asyncio
from typing import Callable, Awaitable, TypeVar
T = TypeVar("T")
Task = Callable[[], Awaitable[T]]2Step 2: Implement the pool with a semaphore for concurrency control
class AsyncPool:
def __init__(self, size: int) -> None:
self._semaphore = asyncio.Semaphore(size)
self._pending = 0
self._active = 0
async def submit(self, task: Task[T]) -> T:
self._pending += 1
async with self._semaphore:
self._pending -= 1
self._active += 1
try:
return await task()
finally:
self._active -= 1
@property
def pending(self) -> int:
return self._pending
@property
def active(self) -> int:
return self._active3Step 3: Submit tasks and observe pooled execution
async def main() -> None:
pool = AsyncPool(4)
async def work(i: int) -> str:
await asyncio.sleep(0.01)
return f"task-{i}-done"
results = await asyncio.gather(
*(pool.submit(lambda i=i: work(i)) for i in range(10))
)
print(results)
if __name__ == "__main__":
asyncio.run(main())"""Worker Pool using concurrent.futures with graceful shutdown."""
import logging
import time
from concurrent.futures import ThreadPoolExecutor, Future
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
# [step] Define task and pool stats types
@dataclass(frozen=True)
class PoolStats:
active: int
idle: int
queued: int
completed: int
failed: int
@dataclass
class WorkerTask:
id: str
fn: Callable[..., Any]
args: tuple[Any, ...] = ()
kwargs: dict[str, Any] = field(default_factory=dict)
# [step] Implement the WorkerPool wrapping ThreadPoolExecutor
class WorkerPool:
def __init__(self, size: int, task_timeout_s: float | None = None) -> None:
self._size = size
self._task_timeout = task_timeout_s
self._executor = ThreadPoolExecutor(max_workers=size)
self._futures: dict[str, Future[Any]] = {}
self._completed = 0
self._failed = 0
self._shutting_down = False
def submit(self, task: WorkerTask) -> Future[Any]:
if self._shutting_down:
raise RuntimeError("Pool is shutting down")
def _run() -> Any:
start = time.monotonic()
try:
result = task.fn(*task.args, **task.kwargs)
self._completed += 1
logger.debug("Task %s completed in %.1fms",
task.id, (time.monotonic() - start) * 1000)
return result
except Exception:
self._failed += 1
raise
future = self._executor.submit(_run)
self._futures[task.id] = future
return future
def pool_stats(self) -> PoolStats:
active = sum(1 for f in self._futures.values() if f.running())
done = sum(1 for f in self._futures.values() if f.done())
return PoolStats(
active=active,
idle=self._size - active,
queued=len(self._futures) - active - done,
completed=self._completed,
failed=self._failed,
)
def shutdown(self, wait: bool = True) -> None:
self._shutting_down = True
self._executor.shutdown(wait=wait)
# [step] Alternative: ProcessPoolExecutor for CPU-bound work
from concurrent.futures import ProcessPoolExecutor
class ProcessWorkerPool:
"""Uses multiple processes for CPU-bound tasks, bypassing the GIL."""
def __init__(self, size: int | None = None) -> None:
self._executor = ProcessPoolExecutor(max_workers=size)
self._shutting_down = False
def submit(self, fn: Callable[..., Any], *args: Any) -> Future[Any]:
if self._shutting_down:
raise RuntimeError("Pool is shutting down")
return self._executor.submit(fn, *args)
def map(self, fn: Callable[..., Any], *iterables: Any, timeout: float | None = None):
return self._executor.map(fn, *iterables, timeout=timeout)
def shutdown(self, wait: bool = True) -> None:
self._shutting_down = True
self._executor.shutdown(wait=wait)Thread Pool Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Thread Pool Pattern in the Real World
“A hotel concierge desk staffed by three concierges represents the thread pool. No matter how many guests check in, only three requests are handled simultaneously. Other guests wait in the lobby queue. When a concierge finishes, they immediately assist the next waiting guest — the staff are never created or dismissed per guest, they simply stay on duty.”