ConcurrencyPythonverifiedVerified
Semaphore Pattern in Python
Control access to a finite pool of resources by maintaining a counter that threads atomically increment (release) and decrement (acquire), blocking when the count reaches zero.
How to Implement the Semaphore Pattern in Python
1Step 1: Implement the Semaphore using asyncio.Semaphore
import asyncio2Step 2: Limit concurrent tasks using the semaphore
async def main() -> None:
semaphore = asyncio.Semaphore(3)
async def limited_task(task_id: int) -> str:
async with semaphore:
await asyncio.sleep(0.1)
return f"task-{task_id}"
results = await asyncio.gather(
*(limited_task(i) for i in range(10))
)
print(results)
if __name__ == "__main__":
asyncio.run(main())"""Rate Limiter using Semaphore for concurrent API calls with spacing."""
import asyncio
import logging
import time
import uuid
from dataclasses import dataclass
from typing import Any, Callable, Awaitable
logger = logging.getLogger(__name__)
# [step] Define rate limiter options and response types
@dataclass(frozen=True)
class RateLimiterOptions:
max_concurrent: int
min_interval_ms: float | None = None
@dataclass(frozen=True)
class ApiResponse:
data: Any
request_id: str
duration_ms: float
# [step] Implement the ApiRateLimiter with semaphore and spacing
class ApiRateLimiter:
def __init__(self, options: RateLimiterOptions) -> None:
self._options = options
self._semaphore = asyncio.Semaphore(options.max_concurrent)
self._last_call_time = 0.0
self._spacing_lock = asyncio.Lock()
async def call[T](
self,
fn: Callable[[], Awaitable[T]],
request_id: str | None = None,
) -> ApiResponse:
req_id = request_id or str(uuid.uuid4())
async with self._semaphore:
# Enforce minimum spacing between requests
if self._options.min_interval_ms:
async with self._spacing_lock:
elapsed_ms = (time.monotonic() - self._last_call_time) * 1000
if elapsed_ms < self._options.min_interval_ms:
wait_s = (self._options.min_interval_ms - elapsed_ms) / 1000
await asyncio.sleep(wait_s)
self._last_call_time = time.monotonic()
start = time.monotonic()
data = await fn()
return ApiResponse(
data=data,
request_id=req_id,
duration_ms=(time.monotonic() - start) * 1000,
)
@property
def available(self) -> int:
return self._semaphore._value
# [step] Usage: limit to 5 concurrent API calls with 100ms spacing
async def main() -> None:
limiter = ApiRateLimiter(RateLimiterOptions(
max_concurrent=5, min_interval_ms=100,
))
async def fetch_user(uid: str) -> dict[str, str]:
return {"id": uid, "name": f"User {uid}"}
user_ids = [str(i + 1) for i in range(20)]
responses = await asyncio.gather(
*(limiter.call(lambda uid=uid: fetch_user(uid), f"req-{uid}")
for uid in user_ids)
)
print(f"Fetched {len(responses)} users")
if __name__ == "__main__":
asyncio.run(main())Semaphore Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Semaphore Pattern in the Real World
“Imagine a car park with exactly three spaces. A ticket machine at the entrance (the semaphore) issues a ticket only if spaces remain, lifting the barrier; arriving drivers with no ticket available must wait. When a car exits, the machine automatically increments its counter and releases the next waiting driver — the car park never exceeds capacity.”