BehavioralPythonverifiedVerified
Observer Pattern in Python
Defines a one-to-many dependency so that when one object changes state, all its dependents are notified and updated automatically.
How to Implement the Observer Pattern in Python
1Step 1: Define the generic event listener type
from typing import Callable, TypeVar
T = TypeVar("T")
Listener = Callable[[T], None]2Step 2: Create the EventEmitter class with subscribe/unsubscribe
class EventEmitter[T]:
def __init__(self) -> None:
self._listeners: list[Listener[T]] = []
def subscribe(self, listener: Listener[T]) -> Callable[[], None]:
self._listeners.append(listener)
return lambda: self.unsubscribe(listener)
def unsubscribe(self, listener: Listener[T]) -> None:
self._listeners = [l for l in self._listeners if l is not listener]
def emit(self, event: T) -> None:
for listener in self._listeners:
listener(event)3Step 3: Define the event data and wire up notifications
from dataclasses import dataclass
@dataclass
class StockUpdate:
symbol: str
price: float
stock_feed: EventEmitter[StockUpdate] = EventEmitter()
unsubscribe = stock_feed.subscribe(
lambda update: print(f"{update.symbol}: ${update.price}")
)
stock_feed.emit(StockUpdate("AAPL", 182.5))
stock_feed.emit(StockUpdate("GOOG", 141.3))
unsubscribe()
stock_feed.emit(StockUpdate("AAPL", 183.0)) # not received"""Typed Event Bus with filtering and once-only subscriptions."""
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
Unsubscribe = Callable[[], None]
# [step] Implement the TypedEventBus with filter and once support
class TypedEventBus:
def __init__(self) -> None:
self._handlers: dict[str, list[dict[str, Any]]] = {}
def on(
self,
event: str,
handler: Callable[[Any], None],
*,
filter_fn: Callable[[Any], bool] | None = None,
once: bool = False,
) -> Unsubscribe:
if event not in self._handlers:
self._handlers[event] = []
entry = {"fn": handler, "filter": filter_fn, "once": once}
self._handlers[event].append(entry)
def unsubscribe() -> None:
entries = self._handlers.get(event, [])
if entry in entries:
entries.remove(entry)
return unsubscribe
def once(
self,
event: str,
handler: Callable[[Any], None],
*,
filter_fn: Callable[[Any], bool] | None = None,
) -> Unsubscribe:
return self.on(event, handler, filter_fn=filter_fn, once=True)
def emit(self, event: str, payload: Any) -> None:
entries = self._handlers.get(event)
if entries is None:
return
to_remove: list[dict[str, Any]] = []
for entry in list(entries):
if entry["filter"] and not entry["filter"](payload):
continue
entry["fn"](payload)
if entry["once"]:
to_remove.append(entry)
for entry in to_remove:
entries.remove(entry)
def off(self, event: str) -> None:
self._handlers.pop(event, None)
# [step] Define events and subscribe handlers
@dataclass(frozen=True)
class OrderPlaced:
order_id: str
total: float
user_id: str
@dataclass(frozen=True)
class UserLogin:
user_id: str
timestamp: float
bus = TypedEventBus()
bus.on("order:placed", lambda e: print(f"New order {e.order_id} for ${e.total}"))
bus.on(
"order:placed",
lambda e: print(f"High-value order alert: {e.order_id}"),
filter_fn=lambda e: e.total > 500,
)
bus.once("user:login", lambda e: print(f"Welcome back, {e.user_id}!"))Observer Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Observer Pattern in the Real World
“Think of a newspaper subscription service. The publisher (subject) doesn't know exactly who its subscribers (observers) are—it just maintains a list. When a new edition is printed, it delivers a copy to every subscriber on the list automatically. Subscribers can cancel at any time without the publisher needing to change anything.”