ConcurrencyPythonverifiedVerified
Mutex / Lock Pattern in Python
Guarantee that only one thread at a time can access a shared resource by requiring threads to acquire an exclusive lock before proceeding.
How to Implement the Mutex / Lock Pattern in Python
1Step 1: Implement the Mutex using asyncio.Lock
import asyncio
class Mutex:
def __init__(self) -> None:
self._lock = asyncio.Lock()
async def acquire(self) -> None:
await self._lock.acquire()
def release(self) -> None:
self._lock.release()
async def with_lock[T](self, fn: ...) -> T:
"""Use as a context manager instead for idiomatic Python."""
async with self._lock:
return await fn()2Step 2: Protect a shared counter with the mutex (context manager)
counter = 0
lock = asyncio.Lock()
async def increment() -> None:
global counter
async with lock:
current = counter
await asyncio.sleep(0.001) # simulate async work
counter = current + 1
async def main() -> None:
global counter
await asyncio.gather(*(increment() for _ in range(10)))
print(counter) # always 10
if __name__ == "__main__":
asyncio.run(main())"""Async Mutex with timeout, ownership tracking, and cache refresh example."""
import asyncio
import logging
import time
from typing import Any, Callable, Awaitable
logger = logging.getLogger(__name__)
# [step] Define timeout error and mutex options
class MutexTimeoutError(Exception):
def __init__(self, waited_ms: float) -> None:
self.waited_ms = waited_ms
super().__init__(f"Mutex acquisition timed out after {waited_ms:.0f}ms")
# [step] Implement the AsyncMutex with timeout and owner tracking
class AsyncMutex:
def __init__(self, label: str = "mutex") -> None:
self._label = label
self._lock = asyncio.Lock()
self._owner: str | None = None
self._acquisition_count = 0
async def acquire(
self, *, timeout_ms: float | None = None, label: str | None = None
) -> Callable[[], None]:
if timeout_ms is not None:
try:
await asyncio.wait_for(self._lock.acquire(), timeout=timeout_ms / 1000)
except asyncio.TimeoutError:
raise MutexTimeoutError(timeout_ms)
else:
await self._lock.acquire()
self._owner = label
self._acquisition_count += 1
return self._lock.release
async def with_lock[T](
self,
fn: Callable[[], Awaitable[T]],
*,
timeout_ms: float | None = None,
label: str | None = None,
) -> T:
release = await self.acquire(timeout_ms=timeout_ms, label=label)
try:
return await fn()
finally:
release()
self._owner = None
@property
def is_locked(self) -> bool:
return self._lock.locked()
@property
def current_owner(self) -> str | None:
return self._owner
@property
def total_acquisitions(self) -> int:
return self._acquisition_count
# [step] Example: serialized cache refresh preventing stampede
cache_mutex = AsyncMutex("cache-refresh")
cache: dict[str, tuple[Any, float]] = {}
async def get_or_refresh[T](
key: str,
fetcher: Callable[[], Awaitable[T]],
ttl_s: float = 60.0,
) -> T:
cached = cache.get(key)
if cached and cached[1] > time.monotonic():
return cached[0]
async def _refresh() -> T:
# Re-check after acquiring lock (another waiter may have refreshed)
fresh = cache.get(key)
if fresh and fresh[1] > time.monotonic():
return fresh[0]
value = await fetcher()
cache[key] = (value, time.monotonic() + ttl_s)
return value
return await cache_mutex.with_lock(
_refresh, timeout_ms=5000, label=f"refresh:{key}"
)Mutex / Lock Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Mutex / Lock Pattern in the Real World
“A single-occupancy public restroom with a door latch is a perfect mutex. The latch (lock) ensures only one person (thread) occupies the restroom (critical section) at a time. Anyone who finds the door locked must wait outside; when the occupant leaves and unlatches the door, one waiting person may enter.”