Skip to content

Agents Planning System

Introduction

The Agents Planning System is a modular, two-stage task decomposition and execution engine designed to dynamically select and utilize intelligent agents, tools, DSLs, or LLMs to fulfill high-level job goals. The system is powered by a centralized Behavior Controller that uses LLMs to reason about job goals, decompose them into actionable plans, and generate structured inputs for execution.

It consists of:

  1. Stage 1 Planning – Decomposes job-level goals into a list of executable sub-tasks (PlannerTasks).
  2. Stage 2 Planning & Execution – For each sub-task, selects the appropriate execution mechanism (e.g., Agent, Tool, DSL, LLM), prepares the input format, and executes it.

This system is used in decentralized or modular AI infrastructures to offload intelligent decisions to suitable agents or functions in a scalable, pluggable way.


Schema

The Agents Planning System is centered around two key data structures:

  1. PlannerTask – represents an individual executable unit of work.
  2. TaskData – aggregates multiple PlannerTasks and tracks overall planning/execution state.

Both are implemented using Python @dataclasses for structured access and serialization.


PlannerTask

Represents a single task generated by the planner. It contains metadata for execution, dependencies, schema info, and optional routing fields.

DataClass Definition

@dataclass
class PlannerTask:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    main_task_id: Optional[str] = ""
    description: Optional[str] = ""
    execution_type: Optional[str] = ""
    dependencies: List[str] = field(default_factory=list)
    tool_id: Optional[str] = None
    function_id: Optional[str] = None
    agent_id: Optional[str] = None
    parameters: Dict = field(default_factory=dict)
    arguments: Dict = field(default_factory=dict)
    instructions: Optional[str] = None
    convertor_code: Optional[str] = None
    estimate_data = None
    mark_replan = False
    status: str = "pending"
    output: Dict = field(default_factory=dict)
    creation_time: datetime = field(default_factory=datetime.utcnow)
    last_update_time: datetime = field(default_factory=datetime.utcnow)

Field Breakdown

Field Type Description
task_id str Unique identifier for the task
main_task_id Optional[str] Refers to the parent job or TaskData ID
description Optional[str] Human-readable description of the task
execution_type Optional[str] Type of execution target: agent, tool, dsl, or llm
dependencies List[str] Task IDs that must complete before this task is eligible
tool_id Optional[str] ID of the tool (if any) to be executed
function_id Optional[str] ID of the function (if relevant)
agent_id Optional[str] Target agent ID (if applicable)
parameters Dict Task configuration, including sub_type
arguments Dict Runtime arguments required by the executor
instructions Optional[str] Free-form instruction block (optional)
convertor_code Optional[str] Optional code snippet to convert input to schema
estimate_data Any Cost/latency estimates (if any)
mark_replan bool Flag indicating if replanning is required
status str Current status (pending, running, completed, etc.)
output Dict Final output from task execution
creation_time datetime When the task was created
last_update_time datetime Last status update timestamp

TaskData

Represents an entire planning session, containing multiple PlannerTasks and overall input/output tracking.

DataClass Definition

@dataclass
class TaskData:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    task_input_data: Dict = field(default_factory=dict)
    completed_tasks: List[str] = field(default_factory=list)
    output_data: Dict = field(default_factory=dict)
    creation_time: datetime = field(default_factory=datetime.utcnow)
    update_time: datetime = field(default_factory=datetime.utcnow)
    planned_tasks: List[PlannerTask] = field(default_factory=list)
    status: str = "pending"
    original_job_data: Optional[Dict] = None

Field Breakdown

Field Type Description
task_id str ID of the entire job or plan
task_input_data Dict Initial input provided to the planner
planned_tasks List[PlannerTask] List of decomposed tasks (phase 1 output)
completed_tasks List[str] IDs of tasks that have completed
output_data Dict Aggregated output from the job (if any)
creation_time datetime Timestamp when this plan was initiated
update_time datetime Timestamp of the last update
status str Status of the overall task (pending, in_progress, done, etc.)
original_job_data Optional[Dict] The job description before planning began

Planning and Execution Flow

The Agents Planning System uses a two-stage planning architecture to transform high-level job specifications into executable actions across agents, tools, LLMs, or workflows. Each stage plays a distinct role in abstract reasoning and concrete execution.


Stage 1 Planning – Abstract Task Decomposition

Purpose: Stage 1 is responsible for breaking down a job into discrete, logical tasks with metadata indicating how each task should be executed.

Component: Planner

Input:

  • job_goal: The primary objective of the job.
  • job_objectives: Supporting targets or sub-goals.
  • job_steerability_data: Constraints, preferences, or behavioral hints.
  • Resource metadata: agents, tools, LLMs, DSLs.

Process:

  • Uses an LLM via the /process-phase-1-task API of the Behavior Controller.
  • Produces a list of structured PlannerTask instances.

Output:

  • A set of tasks, each with:

  • execution_type: Indicates how the task should be executed (agent, tool, llm, or dsl)

  • parameters: Configuration values for further processing
  • agent_id, tool_id, function_id: Routing metadata
  • description, dependencies, status, etc.

Usefulness in the context of agents:

  • Determines which sub-tasks are suitable for agent execution.
  • Assigns tasks to specific agents based on traits, skills, or declared capabilities.
  • Provides enough metadata to delegate to the correct stage 2 planner.

Stage 2 Planning – Task Specialization and Execution

Purpose: Stage 2 is responsible for translating a PlannerTask into a concrete input schema and dispatching it to the appropriate backend executor.

Component: One of the following:

  • AgentsPlanner
  • ToolsPlanner
  • LLMSelectorPlanner
  • DSLPlanner

Input:

  • A PlannerTask from Stage 1
  • Input payload or user-provided runtime arguments

Process:

  • Uses the /process-phase-2-task API of the Behavior Controller.
  • Injects agent/tool-specific metadata, schema information, and transformation logic into the planning prompt.
  • Generates a finalized input format that the backend executor can consume.

Output:

  • A response containing:

  • uid: The selected agent, tool, or function identifier

  • input_data: Input parameters structured according to expected schema

Execution:

  • The selected backend is invoked using:

  • P2P messaging (for agents)

  • Tool execution framework
  • LLM inference client
  • DSL workflow engine

Usefulness in the context of planning:

  • Stage 2 planning refines the task boundaries set by Stage 1.
  • Converts abstract tasks into concrete, executable payloads using available metadata.
  • Ensures the agent or tool receives well-structured, validated, and contextually relevant input.

Summary

Stage Role Responsible Component Key Output
Stage 1 High-level task decomposition Planner PlannerTask list with execution metadata
Stage 2 Task-specific input generation Sub-type planners Structured input for agent/tool/LLM/DSL

Stage 1 answers the question: What tasks need to be done and who should do them? Stage 2 answers the question: How should the task be executed and with what input?


Certainly. Here's the structured documentation for Stage 1 Planning.


Stage 1 Planning

Overview

Stage 1 Planning is responsible for analyzing a high-level job specification and decomposing it into a set of executable units called PlannerTasks. This process is abstract and declarative—it identifies what needs to be done, the order of execution, and which executor type (e.g., agent, tool, LLM, or DSL) is appropriate for each task.

The planning is LLM-assisted and centralized through the Behavior Controller, which responds to semantic prompts with structured planning output.


Component: Planner

The Planner class encapsulates the logic for:

  • Collecting job-specific input (goals, objectives, constraints)
  • Describing available execution resources (agents, tools, LLMs, DSLs)
  • Constructing a planning prompt
  • Submitting the prompt to the Behavior Controller
  • Receiving a structured list of PlannerTask objects

Inputs

Input Name Type Description
job_goal dict The overarching goal of the task
job_objectives dict Specific measurable outcomes or criteria
job_steerability_data dict Guidance data such as constraints, preferences, or limits
task_description str Additional contextual description for LLM planning
agents list Available agents retrieved from KnownSubjects
tools/functions list Available tools and functions from ToolsFunctionsRegistry
custom_descriptions dict Optional custom textual notes to inject into prompt

Key Methods

prepare_job_data(job_goal, job_objectives, job_steerability_data)

Combines and formats the job input into a prompt-ready block.

prepare_resources_description()

Adds structured descriptions of known agents and tools to the planning prompt.

add_task_description(task_description)

Adds a textual description of the task to the planning prompt.

generate_final_input(...)

Calls the above methods in sequence and returns the complete prompt sent to the LLM.

create_plan(...)

Calls the Behavior Controller’s /process-phase-1-task endpoint with the full prompt. Returns the LLM’s output in parsed dictionary form.


Output

The output of Stage 1 is a list of serialized PlannerTask objects, each with:

  • A unique task_id
  • A description field
  • Execution metadata:

  • execution_type – e.g., "agent", "tool", "llm", "dsl"

  • parameters – e.g., { "sub_type": "agent", ... }
  • Optional fields like tool_id, agent_id, function_id
  • Dependency metadata
  • Default status = "pending"

These tasks are then forwarded to Stage 2 for execution preparation and dispatching.


Behavior Controller Integration

The LLM prompt is sent to the Behavior Controller’s API:

Endpoint: POST /process-phase-1-task Payload:

{
  "search_id": "<uuid>",
  "phase_1_data": {
    "task_input_data": "<structured prompt>",
    "n_history_samples": 0,
    "llm_data": { ... }
  }
}

Response:

{
  "success": true,
  "data": [
    {
      "task_id": "...",
      "execution_type": "agent",
      "parameters": { ... },
      ...
    },
    ...
  ]
}

Use Case Example

planner = Planner(
    behavior_controller_url="http://controller.local",
    known_subjects=subject_registry,
    tools_functions_registry=tool_registry
)

planner.set_custom_description("note", "Use only verified agents.")
planner.generate_final_input(
    job_goal={"objective": "Summarize dataset"},
    job_objectives={"quality": "high"},
    job_steerability_data={"time_limit": "2 minutes"},
    task_description="Decompose the summarization job."
)

task_plan = planner.create_plan(
    task_description=planner.get_full_description()
)

Stage 2 Planning

Overview

Stage 2 Planning takes each individual task from the Stage 1 plan (PlannerTask) and translates it into a concrete, executable input tailored for a specific executor—such as an agent, tool, LLM, or DSL workflow. While Stage 1 answers "what needs to be done and who should do it?", Stage 2 answers "how should it be done and with what input?"

Each task’s execution_type and parameters["sub_type"] determine which Stage 2 planner will handle the task.


Purpose

  • Convert high-level task metadata into detailed, structured input for downstream execution systems.
  • Enrich execution context using schema information, traits, metadata, and task descriptions.
  • Interface with the Behavior Controller's Phase 2 API to guide an LLM in generating well-formed inputs.
  • Return the resolved target identifier (uid) and prepared input (input_data) for direct execution.

Key Components

Planner Class Handles Sub-Type Execution Target
AgentsPlanner "agent" Agent via P2PManager
ToolsPlanner "tools" Tool via execute_tool()
LLMSelectorPlanner "llm" LLM via run_ai_inference()
DSLPlanner "dsl" DSL via execute()

All planners follow a common structure:

  1. Construct the full planning prompt using resource descriptions, schemas, and task descriptions.
  2. Call /process-phase-2-task via TaskProcessorClient.
  3. Parse the LLM response and return structured execution data.

Input to Stage 2 Planner

Each planner consumes:

  • A PlannerTask with execution_type and parameters["sub_type"]
  • Input data (Dict) from the calling execution framework
  • Optional LLM configuration (model name, parameters, etc.)

Output from Stage 2 Planner

Each planner returns a dictionary with:

  • uid: Identifier of the agent/tool/llm/workflow to execute
  • input_data: Transformed input ready for direct execution

Behavior Controller Integration

Endpoint: POST /process-phase-2-task Payload:

{
  "search_id": "<uuid>",
  "phase_2_data": {
    "task_input_data": "<full planning prompt>",
    "n_history_samples": 0,
    "llm_data": { ... }
  }
}

Response:

{
  "success": true,
  "data": {
    "uid": "<agent/tool id>",
    "input_data": { ... },
    "plan": "<optional reasoning steps>"
  }
}

Planner Breakdown

1. AgentsPlanner

  • Describes available agents (ID, traits, schema, description)
  • Provides example input for conversion
  • Delegates final input to P2PManager.send_recv(...)

2. ToolsPlanner

  • Describes available tools/functions
  • Provides metadata and input schemas
  • Delegates execution to known_tools.execute_tool(...)

3. LLMSelectorPlanner

  • Describes available LLM models
  • Formats input for direct inference
  • Calls known_llms.run_ai_inference(...)

4. DSLPlanner

  • Describes available workflows and DSL steps
  • Maps task inputs to schema-defined parameters
  • Calls known_dsls.execute(...)

Example Flow

For a task of type agent, the flow would be:

  1. PlanAndExecuteL2 receives a PlannerTask with parameters["sub_type"] = "agent"
  2. Initializes AgentsPlanner
  3. Builds full prompt with agents + schema + description
  4. Sends to /process-phase-2-task
  5. Receives:

  6. uid = "agent-42"

  7. input_data = { "doc": "..." }
  8. Sends to P2PManager.send_recv("agent-42", input_data)

Use Case

agents_planner = AgentsPlanner(controller_url)
agents_planner.prepare_agents(agents=known_subjects.get_searchable_representation())
agents_planner.add_task_description("Summarize the following input...")
agents_planner.add_input_conversion({"text": "Full report goes here"})

response = agents_planner.create_plan(
    task_description=agents_planner.get_full_description(),
    llm_data={"model": "gpt-4"}
)

subject_id = response['uid']
input_data = response['input_data']

You're welcome. Here's the structured documentation for sections 6 (Unified Execution Layer), 8 (Execution Backends), and 9 (Usage Examples).


Unified Execution Layer

Component: PlanAndExecuteL2

The PlanAndExecuteL2 class is the execution bridge between planning and action. It takes a single PlannerTask—typically produced by Stage 1—and delegates it to the appropriate Stage 2 planner and executor based on the task’s parameters["sub_type"].

Responsibilities

  • Select the correct Stage 2 planner (AgentsPlanner, ToolsPlanner, etc.)
  • Construct and send phase-2 prompts to the Behavior Controller
  • Invoke the corresponding execution backend (agent, tool, LLM, or DSL)
  • Return structured output including execution result and, optionally, the generated plan

Input

Parameter Type Description
task PlannerTask The task to execute (must be of type "l2_task")
known_llms KnownLLMs Registry of LLMs
known_tools ToolsFunctionsRegistry Registered tools and functions
known_subjects KnownSubjects Registered agents
known_dsls DSLWorkflowsManager Available DSL workflows
llm_config dict Optional LLM model config

Supported Sub-Types and Routing Logic

sub_type Stage 2 Planner Executor Function
"agent" AgentsPlanner P2PManager.send_recv(subject_id, input)
"llm" LLMSelectorPlanner known_llms.run_ai_inference(...)
"tools" ToolsPlanner known_tools.execute_tool(uid, input)
"dsl" DSLPlanner known_dsls.execute(uid, input, ...)

Method: execute_module(input_data: dict) -> dict

Executes the task using the proper planner and backend, returning a dictionary with:

  • output: result from the target system
  • plan: optional reasoning metadata from the planner (if available)

Execution Backends

After Stage 2 planning completes, execution is performed by one of the following mechanisms, depending on the task’s sub-type.

1. Agent Execution (P2P)

Trigger: P2PManager.send_recv(subject_id, input_data)

  • Sends the input data to the target agent using a peer-to-peer communication channel.
  • Waits synchronously for a response.
  • Expects agent to process the task and return a {"success": true, "data": ...} payload.

2. Tool Execution

Trigger: known_tools.execute_tool(tool_id, input_data)

  • Executes the specified tool (local function, API wrapper, etc.)
  • Supports input transformation based on schema prepared in planning phase.

3. LLM Execution

Trigger: known_llms.run_ai_inference(name, input_data, session_id, frame_ptr)

  • Selects the LLM to invoke based on planner response.
  • Executes prompt or structured input against the registered model backend.

4. DSL Workflow Execution

Trigger: known_dsls.execute(uid, input_data, output_module="")

  • Invokes a DSL-defined multi-step pipeline.
  • Useful for internal control flow, multi-stage validation, or compound logic execution.

End-to-End Usage Example

# Step 1: Define your job inputs
job_goal = {"objective": "Summarize research papers"}
job_objectives = {"accuracy": "high", "summary_length": "short"}
steerability = {"tone": "formal", "max_tokens": 200}

# Step 2: Instantiate the planner and generate a task plan
planner = Planner(
    behavior_controller_url=os.getenv("BEHAVIOR_CONTROLLER_URL"),
    known_subjects=known_subjects,
    tools_functions_registry=known_tools
)
full_description = planner.generate_final_input(
    job_goal=job_goal,
    job_objectives=job_objectives,
    job_steerability_data=steerability,
    task_description="Break this job into tasks for agents or tools"
)
response = planner.create_plan(full_description)

# Step 3: Parse PlannerTask(s) from response
task_plan = [PlannerTask.from_dict(t) for t in response['data']]

# Step 4: Execute each task using PlanAndExecuteL2
for task in task_plan:
    if task.execution_type == "l2_task":
        executor = PlanAndExecuteL2(
            task=task,
            known_llms=known_llms,
            known_tools=known_tools,
            known_subjects=known_subjects,
            known_dsls=known_dsls,
            llm_config={"model": "gpt-4"}
        )
        result = executor.execute_module(input_data={"text": "Paper text..."})
        print(f"Task Output:\n{result['output']}")

Optimization of Workflow DAG

Overview

As workflows become more complex, a flat list of planned tasks is often inefficient to execute. The DAG Optimization module enables intelligent grouping of tasks into SubGraphs—units that can be executed independently or in parallel based on dependency satisfaction.

This is especially valuable in agent-based distributed systems, where tasks can be dispatched to multiple agents simultaneously.


When to Use

Use the optimization module:

  • After Stage 1 planning is complete (you have a TaskData instance)
  • Before Stage 2 planning or execution begins
  • When you want to run tasks in batches or parallel, preserving their dependency order

Available Options

Option 1: LLM-Based Optimizer (OptimizationPlanner)

Use this when:

  • Task dependency structures are non-trivial
  • You want smart grouping decisions (semantic understanding of task purposes)
  • You are fine with slightly longer planning time in exchange for better graph structure

Example:

from agent_sdk.optimization import OptimizationPlanner

optimizer = OptimizationPlanner(llm_api)
subgraphs = optimizer.generate_subgraphs(task_data)

Result: A list of SubGraph objects, each containing tasks and dependencies between subgraphs.


Option 2: Manual Rule-Based Optimizer (ManualOptimizer)

Use this when:

  • You want deterministic, fast grouping
  • Task dependencies are simple and straightforward
  • You're debugging or working in resource-constrained environments

Example:

from agent_sdk.optimization import ManualOptimizer

optimizer = ManualOptimizer()
subgraphs = optimizer.generate_subgraphs(task_data.planned_tasks)

What Is a SubGraph?

A SubGraph is a lightweight wrapper around a set of PlannerTasks that can be executed as a unit. It includes:

  • A unique subgraph_id
  • List of tasks
  • List of other subgraph IDs it depends on

Once generated, SubGraphs can be dispatched for execution or passed to the replanning module for refinement.


Replanning

Overview

Replanning allows you to selectively improve tasks that need revision after initial planning. Instead of re-running the entire Stage 1 pipeline, you can replace only the problematic parts of the plan.

This is useful when:

  • A task fails or returns unexpected output
  • You’ve introduced a new agent, tool, or model
  • You want to fine-tune performance or instructions on a per-task basis

Step-by-Step Workflow

Step 1: Mark the Tasks

Before invoking the replanner, make sure that tasks requiring revision are marked with:

task.mark_replan = True

Only tasks with this flag will be included in the replanning request.


Step 2: Call the Replanner

from agent_sdk.replanning import SubGraphRePlanner

replanner = SubGraphRePlanner(
    llm_api=llm_inference_api,
    known_subjects=subject_registry,
    known_tools=tools_registry,
    known_workflows=dsl_registry,
    known_llms=llm_registry,
    known_codegens=codegen_registry
)

updated_subgraph = replanner.replace_replan_tasks(
    subgraph=original_subgraph,
    custom_append_text="Ensure memory efficiency and clarity."
)

What Gets Updated?

Only:

  • Tasks marked with mark_replan = True
  • Their task_id, description, execution_type, parameters, arguments

Preserved:

  • main_task_id
  • All dependencies

Use Cases

Scenario Replanning Value
Tool endpoint changes Update tool parameters without redoing full plan
Agent skill upgrades Reassign task to improved agent with new traits
LLM returns noisy/incomplete output Retry planning only for affected components
Tasks require tighter performance constraints Inject steerability hints for task-specific tuning

Example Task Before Replanning

{
  "task_id": "T003",
  "description": "Query API and store result.",
  "execution_type": "tool_call",
  "parameters": { "url": "http://api/endpoint" },
  "mark_replan": true
}

After Replanning

{
  "task_id": "T9001",
  "main_task_id": "T003",
  "description": "Query API, validate JSON schema, and store normalized results.",
  "execution_type": "tool_call",
  "parameters": {
    "url": "http://api/endpoint",
    "validation_schema": "schema_v2"
  }
}

Integration Guidance

End-to-End Flow with Optimization + Replanning

# 1. Generate initial plan
planner = Planner(...)
task_data = planner.create_plan(...)

# 2. Optimize the task plan
optimizer = OptimizationPlanner(llm_api)
subgraphs = optimizer.generate_subgraphs(task_data)

# 3. Mark problematic tasks for replanning
for sg in subgraphs:
    for task in sg.tasks:
        if some_custom_check(task):
            task.mark_replan = True

# 4. Replan only the marked tasks
replanner = SubGraphRePlanner(...)
updated_subgraphs = [replanner.replace_replan_tasks(sg) for sg in subgraphs]

# 5. Continue to Stage 2 planning and execution