ConcurrencyPythonverifiedVerified
Producer-Consumer Pattern in Python
Decouple data production from data consumption using a shared buffer, allowing each side to operate at its own pace.
How to Implement the Producer-Consumer Pattern in Python
1Step 1: Implement the bounded async queue using asyncio.Queue
import asyncio2Step 2: Create producer and consumer coroutines
async def producer(queue: asyncio.Queue[int]) -> None:
for i in range(10):
await queue.put(i)
print(f"Produced: {i}")
async def consumer(queue: asyncio.Queue[int]) -> None:
for _ in range(10):
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()3Step 3: Run producer and consumer concurrently
async def main() -> None:
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=3)
await asyncio.gather(producer(queue), consumer(queue))
if __name__ == "__main__":
asyncio.run(main())"""Task Queue with workers, backpressure, priorities, and retries."""
import asyncio
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable, Literal
logger = logging.getLogger(__name__)
# [step] Define task and result data structures
@dataclass
class Task:
id: str
payload: Any
priority: int = 0
retries: int | None = None
@dataclass
class WorkerResult:
task_id: str
result: Any = None
error: Exception | None = None
attempts: int = 0
duration_ms: float = 0.0
@dataclass
class QueueOptions:
capacity: int
workers: int
max_retries: int = 3
backpressure_strategy: Literal["block", "drop", "error"] = "block"
# [step] Implement the TaskQueue with worker pool and retry logic
class TaskQueue:
def __init__(
self,
handler: Callable[[Task], Awaitable[Any]],
options: QueueOptions,
) -> None:
self._handler = handler
self._options = options
self._queue: asyncio.PriorityQueue[tuple[int, Task]] = asyncio.PriorityQueue(
maxsize=options.capacity
)
self._results: list[WorkerResult] = []
self._shutdown = asyncio.Event()
self._workers: list[asyncio.Task[None]] = []
for _ in range(options.workers):
self._workers.append(asyncio.create_task(self._run_worker()))
async def submit(self, task: Task) -> None:
if self._shutdown.is_set():
raise RuntimeError("Queue is shut down")
match self._options.backpressure_strategy:
case "block":
await self._queue.put((-task.priority, task))
case "drop":
try:
self._queue.put_nowait((-task.priority, task))
except asyncio.QueueFull:
logger.warning("Queue full, dropping task %s", task.id)
case "error":
try:
self._queue.put_nowait((-task.priority, task))
except asyncio.QueueFull:
raise RuntimeError(
f"Queue full (capacity: {self._options.capacity})"
)
async def _run_worker(self) -> None:
while not self._shutdown.is_set() or not self._queue.empty():
try:
_, task = await asyncio.wait_for(self._queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue
start = time.monotonic()
max_retries = task.retries if task.retries is not None else self._options.max_retries
attempts = 0
last_error: Exception | None = None
while attempts <= max_retries:
try:
result = await self._handler(task)
self._results.append(WorkerResult(
task_id=task.id,
result=result,
attempts=attempts + 1,
duration_ms=(time.monotonic() - start) * 1000,
))
break
except Exception as exc:
last_error = exc
attempts += 1
if attempts <= max_retries:
await asyncio.sleep(0.1 * (2 ** attempts))
if last_error and attempts > max_retries:
self._results.append(WorkerResult(
task_id=task.id,
error=last_error,
attempts=attempts,
duration_ms=(time.monotonic() - start) * 1000,
))
self._queue.task_done()
async def drain(self) -> list[WorkerResult]:
await self._queue.join()
self._shutdown.set()
await asyncio.gather(*self._workers, return_exceptions=True)
return self._resultsProducer-Consumer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Producer-Consumer Pattern in the Real World
“Think of a bakery where bakers (producers) place fresh loaves on a display shelf (the queue) and customers (consumers) pick them up at their leisure. The shelf decouples the baking schedule from customer arrival times — bakers keep baking even when no customer is present, and customers keep shopping even when bakers are on break.”