Agentic AIPythonverifiedVerified
Plan-and-Execute Pattern in Python
Separate high-level planning from step-by-step execution: one LLM call produces a structured plan, then individual executor calls carry out each step, with replanning triggered by unexpected results.
How to Implement the Plan-and-Execute Pattern in Python
1Step 1: Define the Step and Plan data structures
from dataclasses import dataclass, field
from typing import Callable, Awaitable, Literal
@dataclass
class Step:
id: str
description: str
status: Literal["pending", "running", "done", "failed"] = "pending"
result: str | None = None
@dataclass
class Plan:
goal: str
steps: list[Step]
Planner = Callable[[str], Awaitable[list[Step]]]
Executor = Callable[[Step], Awaitable[str]]2Step 2: Implement the plan-and-execute loop
async def plan_and_execute(
goal: str,
planner: Planner,
executor: Executor,
) -> Plan:
raw_steps = await planner(goal)
plan = Plan(goal=goal, steps=raw_steps)
for step in plan.steps:
step.status = "running"
try:
step.result = await executor(step)
step.status = "done"
except Exception as exc:
step.result = str(exc)
step.status = "failed"
break
return plan3Step 3: Run the pipeline with a sample goal
async def main() -> None:
async def planner(goal: str) -> list[Step]:
return [
Step("1", "Research the topic"),
Step("2", "Draft the outline"),
Step("3", "Write the post"),
Step("4", "Review and publish"),
]
async def executor(step: Step) -> str:
print(f"Executing: {step.description}")
return f"Completed: {step.description}"
plan = await plan_and_execute(
"Write and publish a blog post", planner, executor
)
for s in plan.steps:
print(f"[{s.status}] {s.description}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())"""Plan-and-Execute agent with replanning, dependency tracking, and progress."""
import asyncio
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable, Literal
logger = logging.getLogger(__name__)
StepStatus = Literal["pending", "blocked", "running", "done", "failed", "skipped"]
# [step] Define plan step and execution plan data structures
@dataclass
class PlanStep:
id: str
description: str
depends_on: list[str] = field(default_factory=list)
status: StepStatus = "pending"
result: Any = None
error: str | None = None
started_at: float | None = None
completed_at: float | None = None
attempts: int = 0
@dataclass
class ExecutionPlan:
id: str
goal: str
steps: list[PlanStep]
version: int = 1
created_at: float = field(default_factory=time.time)
@dataclass(frozen=True)
class ExecutorContext:
goal: str
completed_steps: dict[str, Any]
plan: ExecutionPlan
StepExecutor = Callable[[PlanStep, ExecutorContext], Awaitable[Any]]
PlannerFn = Callable[[str, list[PlanStep] | None], Awaitable[list[PlanStep]]]
ProgressCallback = Callable[[PlanStep, ExecutionPlan], None]
# [step] Implement the PlanAndExecuteAgent with replanning support
class PlanAndExecuteAgent:
MAX_REPLAN_ATTEMPTS = 2
def __init__(
self,
planner: PlannerFn,
executor: StepExecutor,
on_progress: ProgressCallback | None = None,
) -> None:
self._planner = planner
self._executor = executor
self._on_progress = on_progress
async def run(
self, goal: str, *, cancel_event: asyncio.Event | None = None
) -> ExecutionPlan:
plan = await self._build_plan(goal)
replan_count = 0
while True:
if cancel_event and cancel_event.is_set():
raise asyncio.CancelledError("Execution aborted")
ready = self._get_ready_steps(plan)
if not ready:
break
failed: list[PlanStep] = []
for step in ready:
if cancel_event and cancel_event.is_set():
raise asyncio.CancelledError("Execution aborted")
step.status = "running"
step.started_at = time.time()
step.attempts += 1
if self._on_progress:
self._on_progress(step, plan)
try:
ctx = ExecutorContext(
goal=goal,
completed_steps={
s.id: s.result
for s in plan.steps
if s.status == "done"
},
plan=plan,
)
step.result = await self._executor(step, ctx)
step.status = "done"
step.completed_at = time.time()
except Exception as exc:
step.error = str(exc)
step.status = "failed"
step.completed_at = time.time()
failed.append(step)
if self._on_progress:
self._on_progress(step, plan)
if failed and replan_count < self.MAX_REPLAN_ATTEMPTS:
replan_count += 1
logger.info(
"Replanning (attempt %d) due to %d failed step(s)",
replan_count, len(failed),
)
plan = await self._replan(plan, failed)
continue
if failed:
break
# Mark remaining steps as skipped
for s in plan.steps:
if s.status in ("pending", "blocked"):
s.status = "skipped"
return plan
async def _build_plan(self, goal: str) -> ExecutionPlan:
raw_steps = await self._planner(goal, None)
return ExecutionPlan(
id=str(uuid.uuid4()), goal=goal, steps=raw_steps
)
async def _replan(
self, current: ExecutionPlan, failed: list[PlanStep]
) -> ExecutionPlan:
raw_steps = await self._planner(current.goal, failed)
done_ids = {s.id for s in current.steps if s.status == "done"}
new_steps = [s for s in raw_steps if s.id not in done_ids]
return ExecutionPlan(
id=current.id,
goal=current.goal,
version=current.version + 1,
steps=[s for s in current.steps if s.status == "done"] + new_steps,
)
def _get_ready_steps(self, plan: ExecutionPlan) -> list[PlanStep]:
done = {s.id for s in plan.steps if s.status == "done"}
return [
s for s in plan.steps
if s.status == "pending" and all(d in done for d in s.depends_on)
]Plan-and-Execute Pattern Architecture
hourglass_empty
Rendering diagram...
lightbulb
Plan-and-Execute Pattern in the Real World
“A building contractor (Planner) reviews the architectural blueprints and produces a phased construction schedule: foundation, framing, electrical, finishing. Individual trade crews (Executors) carry out each phase. If an inspection fails (unexpected result), the contractor revises the remaining schedule rather than demolishing the entire building and starting over.”