Agentic AIPythonverifiedVerified
Multi-Agent Orchestration Pattern in Python
Coordinate a network of specialised AI agents under an orchestrator, where each agent owns a distinct capability or domain and agents communicate through structured messages.
How to Implement the Multi-Agent Orchestration Pattern in Python
1Step 1: Define the message and specialist agent interfaces
import uuid
from dataclasses import dataclass, field
from typing import Any, Protocol, Literal
@dataclass
class AgentMessage:
id: str
sender: str
receiver: str
type: Literal["task", "result", "error"]
payload: Any
class SpecialistAgent(Protocol):
id: str
capabilities: list[str]
async def handle(self, message: AgentMessage) -> AgentMessage: ...2Step 2: Build the Orchestrator with agent registration and routing
class Orchestrator:
def __init__(self) -> None:
self._agents: dict[str, SpecialistAgent] = {}
def register(self, agent: SpecialistAgent) -> None:
self._agents[agent.id] = agent
def find_agent(self, capability: str) -> SpecialistAgent | None:
for agent in self._agents.values():
if capability in agent.capabilities:
return agent
return None3Step 3: Implement task dispatch and pipeline execution
async def dispatch(
self, task: str, capability: str, payload: Any
) -> Any:
agent = self.find_agent(capability)
if agent is None:
raise ValueError(f"No agent for capability: {capability}")
message = AgentMessage(
id=str(uuid.uuid4()),
sender="orchestrator",
receiver=agent.id,
type="task",
payload={"task": task, "data": payload},
)
response = await agent.handle(message)
if response.type == "error":
raise RuntimeError(str(response.payload))
return response.payload
async def run_pipeline(
self, steps: list[dict[str, Any]]
) -> list[Any]:
results: list[Any] = []
for step in steps:
result = await self.dispatch(
step["capability"], step["capability"], step["payload"]
)
results.append(result)
return results"""Multi-Agent system with message passing, shared state, and coordination."""
import asyncio
import logging
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Literal
logger = logging.getLogger(__name__)
MessageType = Literal["task", "result", "error", "broadcast", "heartbeat"]
AgentStatus = Literal["idle", "busy", "error", "offline"]
# [step] Define message and shared state structures
@dataclass
class AgentMessage:
id: str
sender: str
receiver: str
type: MessageType
payload: Any
timestamp: float = field(default_factory=time.time)
correlation_id: str | None = None
ttl_ms: float | None = None
@dataclass(frozen=True)
class AgentCapability:
name: str
description: str
input_schema: dict[str, str]
class SharedState:
"""Thread-safe shared state for agent coordination."""
def __init__(self) -> None:
self._store: dict[str, Any] = {}
self._lock = asyncio.Lock()
async def get(self, key: str) -> Any | None:
async with self._lock:
return self._store.get(key)
async def set(self, key: str, value: Any) -> None:
async with self._lock:
self._store[key] = value
async def delete(self, key: str) -> None:
async with self._lock:
self._store.pop(key, None)
# [step] Implement the BaseAgent with message handling
class BaseAgent(ABC):
def __init__(
self,
agent_id: str,
capabilities: list[AgentCapability],
shared_state: SharedState,
) -> None:
self.id = agent_id
self.capabilities = capabilities
self._shared_state = shared_state
self._status: AgentStatus = "idle"
self._message_log: list[AgentMessage] = []
@abstractmethod
async def process_task(self, payload: Any, correlation_id: str) -> Any: ...
async def receive(self, message: AgentMessage) -> AgentMessage | None:
if message.ttl_ms and (time.time() - message.timestamp) * 1000 > message.ttl_ms:
return self._error_response(message, "Message expired")
self._message_log.append(message)
self._status = "busy"
try:
if message.type == "heartbeat":
self._status = "idle"
return self._reply(message, "heartbeat", {"status": self._status})
if message.type == "task":
result = await self.process_task(message.payload, message.id)
self._status = "idle"
return self._reply(message, "result", result)
return None
except Exception as exc:
self._status = "error"
return self._error_response(message, str(exc))
def _reply(self, original: AgentMessage, msg_type: MessageType, payload: Any) -> AgentMessage:
return AgentMessage(
id=str(uuid.uuid4()),
correlation_id=original.id,
sender=self.id,
receiver=original.sender,
type=msg_type,
payload=payload,
)
def _error_response(self, original: AgentMessage, error: str) -> AgentMessage:
return self._reply(original, "error", {"message": error})
@property
def status(self) -> AgentStatus:
return self._status
# [step] Implement the MultiAgentOrchestrator
class MultiAgentOrchestrator:
def __init__(self, shared_state: SharedState | None = None) -> None:
self._agents: dict[str, BaseAgent] = {}
self._shared_state = shared_state or SharedState()
def register(self, agent: BaseAgent) -> "MultiAgentOrchestrator":
self._agents[agent.id] = agent
return self
def find_by_capability(self, capability_name: str) -> BaseAgent | None:
for agent in self._agents.values():
if any(c.name == capability_name for c in agent.capabilities):
return agent
return None
async def send(
self,
to_id: str,
payload: Any,
*,
ttl_ms: float | None = None,
correlation_id: str | None = None,
) -> Any:
agent = self._agents.get(to_id)
if agent is None:
raise ValueError(f'Agent "{to_id}" not registered')
message = AgentMessage(
id=str(uuid.uuid4()),
correlation_id=correlation_id,
sender="orchestrator",
receiver=to_id,
type="task",
payload=payload,
ttl_ms=ttl_ms,
)
response = await agent.receive(message)
if response is None:
return None
if response.type == "error":
raise RuntimeError(str(response.payload))
return response.payload
async def broadcast(self, payload: Any) -> list[AgentMessage | None]:
tasks = [agent.receive(
AgentMessage(
id=str(uuid.uuid4()),
sender="orchestrator",
receiver=agent.id,
type="broadcast",
payload=payload,
)
) for agent in self._agents.values()]
return await asyncio.gather(*tasks)
async def health_check(self) -> dict[str, AgentStatus]:
results: dict[str, AgentStatus] = {}
for agent_id, agent in self._agents.items():
msg = AgentMessage(
id=str(uuid.uuid4()),
sender="orchestrator",
receiver=agent_id,
type="heartbeat",
payload=None,
)
response = await agent.receive(msg)
results[agent_id] = (
response.payload.get("status", "offline")
if response else "offline"
)
return resultsMulti-Agent Orchestration Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Multi-Agent Orchestration Pattern in the Real World
“A film director (Orchestrator) does not personally operate the camera, compose the score, or design costumes. Instead they delegate to specialist department heads — cinematographer, composer, costume designer — each expert in their domain. The director collects their work, gives feedback, and integrates it into a coherent film.”