Agents Planning System
Introduction
The Agents Planning System is a modular, two-stage task decomposition and execution engine designed to dynamically select and utilize intelligent agents, tools, DSLs, or LLMs to fulfill high-level job goals. The system is powered by a centralized Behavior Controller that uses LLMs to reason about job goals, decompose them into actionable plans, and generate structured inputs for execution.
It consists of:
- Stage 1 Planning – Decomposes job-level goals into a list of executable sub-tasks (
PlannerTask
s). - Stage 2 Planning & Execution – For each sub-task, selects the appropriate execution mechanism (e.g., Agent, Tool, DSL, LLM), prepares the input format, and executes it.
This system is used in decentralized or modular AI infrastructures to offload intelligent decisions to suitable agents or functions in a scalable, pluggable way.
Schema
The Agents Planning System is centered around two key data structures:
PlannerTask
– represents an individual executable unit of work.TaskData
– aggregates multiplePlannerTask
s and tracks overall planning/execution state.
Both are implemented using Python @dataclass
es for structured access and serialization.
PlannerTask
Represents a single task generated by the planner. It contains metadata for execution, dependencies, schema info, and optional routing fields.
DataClass Definition
@dataclass
class PlannerTask:
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
main_task_id: Optional[str] = ""
description: Optional[str] = ""
execution_type: Optional[str] = ""
dependencies: List[str] = field(default_factory=list)
tool_id: Optional[str] = None
function_id: Optional[str] = None
agent_id: Optional[str] = None
parameters: Dict = field(default_factory=dict)
arguments: Dict = field(default_factory=dict)
instructions: Optional[str] = None
convertor_code: Optional[str] = None
estimate_data = None
mark_replan = False
status: str = "pending"
output: Dict = field(default_factory=dict)
creation_time: datetime = field(default_factory=datetime.utcnow)
last_update_time: datetime = field(default_factory=datetime.utcnow)
Field Breakdown
Field | Type | Description |
---|---|---|
task_id |
str |
Unique identifier for the task |
main_task_id |
Optional[str] |
Refers to the parent job or TaskData ID |
description |
Optional[str] |
Human-readable description of the task |
execution_type |
Optional[str] |
Type of execution target: agent , tool , dsl , or llm |
dependencies |
List[str] |
Task IDs that must complete before this task is eligible |
tool_id |
Optional[str] |
ID of the tool (if any) to be executed |
function_id |
Optional[str] |
ID of the function (if relevant) |
agent_id |
Optional[str] |
Target agent ID (if applicable) |
parameters |
Dict |
Task configuration, including sub_type |
arguments |
Dict |
Runtime arguments required by the executor |
instructions |
Optional[str] |
Free-form instruction block (optional) |
convertor_code |
Optional[str] |
Optional code snippet to convert input to schema |
estimate_data |
Any |
Cost/latency estimates (if any) |
mark_replan |
bool |
Flag indicating if replanning is required |
status |
str |
Current status (pending , running , completed , etc.) |
output |
Dict |
Final output from task execution |
creation_time |
datetime |
When the task was created |
last_update_time |
datetime |
Last status update timestamp |
TaskData
Represents an entire planning session, containing multiple PlannerTask
s and overall input/output tracking.
DataClass Definition
@dataclass
class TaskData:
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
task_input_data: Dict = field(default_factory=dict)
completed_tasks: List[str] = field(default_factory=list)
output_data: Dict = field(default_factory=dict)
creation_time: datetime = field(default_factory=datetime.utcnow)
update_time: datetime = field(default_factory=datetime.utcnow)
planned_tasks: List[PlannerTask] = field(default_factory=list)
status: str = "pending"
original_job_data: Optional[Dict] = None
Field Breakdown
Field | Type | Description |
---|---|---|
task_id |
str |
ID of the entire job or plan |
task_input_data |
Dict |
Initial input provided to the planner |
planned_tasks |
List[PlannerTask] |
List of decomposed tasks (phase 1 output) |
completed_tasks |
List[str] |
IDs of tasks that have completed |
output_data |
Dict |
Aggregated output from the job (if any) |
creation_time |
datetime |
Timestamp when this plan was initiated |
update_time |
datetime |
Timestamp of the last update |
status |
str |
Status of the overall task (pending , in_progress , done , etc.) |
original_job_data |
Optional[Dict] |
The job description before planning began |
Planning and Execution Flow
The Agents Planning System uses a two-stage planning architecture to transform high-level job specifications into executable actions across agents, tools, LLMs, or workflows. Each stage plays a distinct role in abstract reasoning and concrete execution.
Stage 1 Planning – Abstract Task Decomposition
Purpose: Stage 1 is responsible for breaking down a job into discrete, logical tasks with metadata indicating how each task should be executed.
Component: Planner
Input:
job_goal
: The primary objective of the job.job_objectives
: Supporting targets or sub-goals.job_steerability_data
: Constraints, preferences, or behavioral hints.- Resource metadata: agents, tools, LLMs, DSLs.
Process:
- Uses an LLM via the
/process-phase-1-task
API of the Behavior Controller. - Produces a list of structured
PlannerTask
instances.
Output:
-
A set of tasks, each with:
-
execution_type
: Indicates how the task should be executed (agent
,tool
,llm
, ordsl
) parameters
: Configuration values for further processingagent_id
,tool_id
,function_id
: Routing metadatadescription
,dependencies
,status
, etc.
Usefulness in the context of agents:
- Determines which sub-tasks are suitable for agent execution.
- Assigns tasks to specific agents based on traits, skills, or declared capabilities.
- Provides enough metadata to delegate to the correct stage 2 planner.
Stage 2 Planning – Task Specialization and Execution
Purpose: Stage 2 is responsible for translating a PlannerTask
into a concrete input schema and dispatching it to the appropriate backend executor.
Component: One of the following:
AgentsPlanner
ToolsPlanner
LLMSelectorPlanner
DSLPlanner
Input:
- A
PlannerTask
from Stage 1 - Input payload or user-provided runtime arguments
Process:
- Uses the
/process-phase-2-task
API of the Behavior Controller. - Injects agent/tool-specific metadata, schema information, and transformation logic into the planning prompt.
- Generates a finalized input format that the backend executor can consume.
Output:
-
A response containing:
-
uid
: The selected agent, tool, or function identifier input_data
: Input parameters structured according to expected schema
Execution:
-
The selected backend is invoked using:
-
P2P messaging (for agents)
- Tool execution framework
- LLM inference client
- DSL workflow engine
Usefulness in the context of planning:
- Stage 2 planning refines the task boundaries set by Stage 1.
- Converts abstract tasks into concrete, executable payloads using available metadata.
- Ensures the agent or tool receives well-structured, validated, and contextually relevant input.
Summary
Stage | Role | Responsible Component | Key Output |
---|---|---|---|
Stage 1 | High-level task decomposition | Planner |
PlannerTask list with execution metadata |
Stage 2 | Task-specific input generation | Sub-type planners | Structured input for agent/tool/LLM/DSL |
Stage 1 answers the question: What tasks need to be done and who should do them? Stage 2 answers the question: How should the task be executed and with what input?
Certainly. Here's the structured documentation for Stage 1 Planning.
Stage 1 Planning
Overview
Stage 1 Planning is responsible for analyzing a high-level job specification and decomposing it into a set of executable units called PlannerTask
s. This process is abstract and declarative—it identifies what needs to be done, the order of execution, and which executor type (e.g., agent, tool, LLM, or DSL) is appropriate for each task.
The planning is LLM-assisted and centralized through the Behavior Controller, which responds to semantic prompts with structured planning output.
Component: Planner
The Planner
class encapsulates the logic for:
- Collecting job-specific input (goals, objectives, constraints)
- Describing available execution resources (agents, tools, LLMs, DSLs)
- Constructing a planning prompt
- Submitting the prompt to the Behavior Controller
- Receiving a structured list of
PlannerTask
objects
Inputs
Input Name | Type | Description |
---|---|---|
job_goal |
dict |
The overarching goal of the task |
job_objectives |
dict |
Specific measurable outcomes or criteria |
job_steerability_data |
dict |
Guidance data such as constraints, preferences, or limits |
task_description |
str |
Additional contextual description for LLM planning |
agents |
list |
Available agents retrieved from KnownSubjects |
tools/functions |
list |
Available tools and functions from ToolsFunctionsRegistry |
custom_descriptions |
dict |
Optional custom textual notes to inject into prompt |
Key Methods
prepare_job_data(job_goal, job_objectives, job_steerability_data)
Combines and formats the job input into a prompt-ready block.
prepare_resources_description()
Adds structured descriptions of known agents and tools to the planning prompt.
add_task_description(task_description)
Adds a textual description of the task to the planning prompt.
generate_final_input(...)
Calls the above methods in sequence and returns the complete prompt sent to the LLM.
create_plan(...)
Calls the Behavior Controller’s /process-phase-1-task
endpoint with the full prompt. Returns the LLM’s output in parsed dictionary form.
Output
The output of Stage 1 is a list of serialized PlannerTask
objects, each with:
- A unique
task_id
- A
description
field -
Execution metadata:
-
execution_type
– e.g.,"agent"
,"tool"
,"llm"
,"dsl"
parameters
– e.g.,{ "sub_type": "agent", ... }
- Optional fields like
tool_id
,agent_id
,function_id
- Dependency metadata
- Default
status = "pending"
These tasks are then forwarded to Stage 2 for execution preparation and dispatching.
Behavior Controller Integration
The LLM prompt is sent to the Behavior Controller’s API:
Endpoint: POST /process-phase-1-task
Payload:
{
"search_id": "<uuid>",
"phase_1_data": {
"task_input_data": "<structured prompt>",
"n_history_samples": 0,
"llm_data": { ... }
}
}
Response:
{
"success": true,
"data": [
{
"task_id": "...",
"execution_type": "agent",
"parameters": { ... },
...
},
...
]
}
Use Case Example
planner = Planner(
behavior_controller_url="http://controller.local",
known_subjects=subject_registry,
tools_functions_registry=tool_registry
)
planner.set_custom_description("note", "Use only verified agents.")
planner.generate_final_input(
job_goal={"objective": "Summarize dataset"},
job_objectives={"quality": "high"},
job_steerability_data={"time_limit": "2 minutes"},
task_description="Decompose the summarization job."
)
task_plan = planner.create_plan(
task_description=planner.get_full_description()
)
Stage 2 Planning
Overview
Stage 2 Planning takes each individual task from the Stage 1 plan (PlannerTask
) and translates it into a concrete, executable input tailored for a specific executor—such as an agent, tool, LLM, or DSL workflow. While Stage 1 answers "what needs to be done and who should do it?", Stage 2 answers "how should it be done and with what input?"
Each task’s execution_type
and parameters["sub_type"]
determine which Stage 2 planner will handle the task.
Purpose
- Convert high-level task metadata into detailed, structured input for downstream execution systems.
- Enrich execution context using schema information, traits, metadata, and task descriptions.
- Interface with the Behavior Controller's Phase 2 API to guide an LLM in generating well-formed inputs.
- Return the resolved target identifier (
uid
) and prepared input (input_data
) for direct execution.
Key Components
Planner Class | Handles Sub-Type | Execution Target |
---|---|---|
AgentsPlanner |
"agent" |
Agent via P2PManager |
ToolsPlanner |
"tools" |
Tool via execute_tool() |
LLMSelectorPlanner |
"llm" |
LLM via run_ai_inference() |
DSLPlanner |
"dsl" |
DSL via execute() |
All planners follow a common structure:
- Construct the full planning prompt using resource descriptions, schemas, and task descriptions.
- Call
/process-phase-2-task
viaTaskProcessorClient
. - Parse the LLM response and return structured execution data.
Input to Stage 2 Planner
Each planner consumes:
- A
PlannerTask
withexecution_type
andparameters["sub_type"]
- Input data (
Dict
) from the calling execution framework - Optional LLM configuration (model name, parameters, etc.)
Output from Stage 2 Planner
Each planner returns a dictionary with:
uid
: Identifier of the agent/tool/llm/workflow to executeinput_data
: Transformed input ready for direct execution
Behavior Controller Integration
Endpoint: POST /process-phase-2-task
Payload:
{
"search_id": "<uuid>",
"phase_2_data": {
"task_input_data": "<full planning prompt>",
"n_history_samples": 0,
"llm_data": { ... }
}
}
Response:
{
"success": true,
"data": {
"uid": "<agent/tool id>",
"input_data": { ... },
"plan": "<optional reasoning steps>"
}
}
Planner Breakdown
1. AgentsPlanner
- Describes available agents (ID, traits, schema, description)
- Provides example input for conversion
- Delegates final input to
P2PManager.send_recv(...)
2. ToolsPlanner
- Describes available tools/functions
- Provides metadata and input schemas
- Delegates execution to
known_tools.execute_tool(...)
3. LLMSelectorPlanner
- Describes available LLM models
- Formats input for direct inference
- Calls
known_llms.run_ai_inference(...)
4. DSLPlanner
- Describes available workflows and DSL steps
- Maps task inputs to schema-defined parameters
- Calls
known_dsls.execute(...)
Example Flow
For a task of type agent
, the flow would be:
PlanAndExecuteL2
receives aPlannerTask
withparameters["sub_type"] = "agent"
- Initializes
AgentsPlanner
- Builds full prompt with agents + schema + description
- Sends to
/process-phase-2-task
-
Receives:
-
uid = "agent-42"
input_data = { "doc": "..." }
- Sends to
P2PManager.send_recv("agent-42", input_data)
Use Case
agents_planner = AgentsPlanner(controller_url)
agents_planner.prepare_agents(agents=known_subjects.get_searchable_representation())
agents_planner.add_task_description("Summarize the following input...")
agents_planner.add_input_conversion({"text": "Full report goes here"})
response = agents_planner.create_plan(
task_description=agents_planner.get_full_description(),
llm_data={"model": "gpt-4"}
)
subject_id = response['uid']
input_data = response['input_data']
You're welcome. Here's the structured documentation for sections 6 (Unified Execution Layer), 8 (Execution Backends), and 9 (Usage Examples).
Unified Execution Layer
Component: PlanAndExecuteL2
The PlanAndExecuteL2
class is the execution bridge between planning and action. It takes a single PlannerTask
—typically produced by Stage 1—and delegates it to the appropriate Stage 2 planner and executor based on the task’s parameters["sub_type"]
.
Responsibilities
- Select the correct Stage 2 planner (
AgentsPlanner
,ToolsPlanner
, etc.) - Construct and send phase-2 prompts to the Behavior Controller
- Invoke the corresponding execution backend (agent, tool, LLM, or DSL)
- Return structured output including execution result and, optionally, the generated plan
Input
Parameter | Type | Description |
---|---|---|
task |
PlannerTask |
The task to execute (must be of type "l2_task" ) |
known_llms |
KnownLLMs |
Registry of LLMs |
known_tools |
ToolsFunctionsRegistry |
Registered tools and functions |
known_subjects |
KnownSubjects |
Registered agents |
known_dsls |
DSLWorkflowsManager |
Available DSL workflows |
llm_config |
dict |
Optional LLM model config |
Supported Sub-Types and Routing Logic
sub_type |
Stage 2 Planner | Executor Function |
---|---|---|
"agent" |
AgentsPlanner |
P2PManager.send_recv(subject_id, input) |
"llm" |
LLMSelectorPlanner |
known_llms.run_ai_inference(...) |
"tools" |
ToolsPlanner |
known_tools.execute_tool(uid, input) |
"dsl" |
DSLPlanner |
known_dsls.execute(uid, input, ...) |
Method: execute_module(input_data: dict) -> dict
Executes the task using the proper planner and backend, returning a dictionary with:
output
: result from the target systemplan
: optional reasoning metadata from the planner (if available)
Execution Backends
After Stage 2 planning completes, execution is performed by one of the following mechanisms, depending on the task’s sub-type.
1. Agent Execution (P2P)
Trigger: P2PManager.send_recv(subject_id, input_data)
- Sends the input data to the target agent using a peer-to-peer communication channel.
- Waits synchronously for a response.
- Expects agent to process the task and return a
{"success": true, "data": ...}
payload.
2. Tool Execution
Trigger: known_tools.execute_tool(tool_id, input_data)
- Executes the specified tool (local function, API wrapper, etc.)
- Supports input transformation based on schema prepared in planning phase.
3. LLM Execution
Trigger: known_llms.run_ai_inference(name, input_data, session_id, frame_ptr)
- Selects the LLM to invoke based on planner response.
- Executes prompt or structured input against the registered model backend.
4. DSL Workflow Execution
Trigger: known_dsls.execute(uid, input_data, output_module="")
- Invokes a DSL-defined multi-step pipeline.
- Useful for internal control flow, multi-stage validation, or compound logic execution.
End-to-End Usage Example
# Step 1: Define your job inputs
job_goal = {"objective": "Summarize research papers"}
job_objectives = {"accuracy": "high", "summary_length": "short"}
steerability = {"tone": "formal", "max_tokens": 200}
# Step 2: Instantiate the planner and generate a task plan
planner = Planner(
behavior_controller_url=os.getenv("BEHAVIOR_CONTROLLER_URL"),
known_subjects=known_subjects,
tools_functions_registry=known_tools
)
full_description = planner.generate_final_input(
job_goal=job_goal,
job_objectives=job_objectives,
job_steerability_data=steerability,
task_description="Break this job into tasks for agents or tools"
)
response = planner.create_plan(full_description)
# Step 3: Parse PlannerTask(s) from response
task_plan = [PlannerTask.from_dict(t) for t in response['data']]
# Step 4: Execute each task using PlanAndExecuteL2
for task in task_plan:
if task.execution_type == "l2_task":
executor = PlanAndExecuteL2(
task=task,
known_llms=known_llms,
known_tools=known_tools,
known_subjects=known_subjects,
known_dsls=known_dsls,
llm_config={"model": "gpt-4"}
)
result = executor.execute_module(input_data={"text": "Paper text..."})
print(f"Task Output:\n{result['output']}")
Optimization of Workflow DAG
Overview
As workflows become more complex, a flat list of planned tasks is often inefficient to execute. The DAG Optimization module enables intelligent grouping of tasks into SubGraphs—units that can be executed independently or in parallel based on dependency satisfaction.
This is especially valuable in agent-based distributed systems, where tasks can be dispatched to multiple agents simultaneously.
When to Use
Use the optimization module:
- After Stage 1 planning is complete (you have a
TaskData
instance) - Before Stage 2 planning or execution begins
- When you want to run tasks in batches or parallel, preserving their dependency order
Available Options
Option 1: LLM-Based Optimizer (OptimizationPlanner
)
Use this when:
- Task dependency structures are non-trivial
- You want smart grouping decisions (semantic understanding of task purposes)
- You are fine with slightly longer planning time in exchange for better graph structure
Example:
from agent_sdk.optimization import OptimizationPlanner
optimizer = OptimizationPlanner(llm_api)
subgraphs = optimizer.generate_subgraphs(task_data)
Result: A list of SubGraph
objects, each containing tasks and dependencies between subgraphs.
Option 2: Manual Rule-Based Optimizer (ManualOptimizer
)
Use this when:
- You want deterministic, fast grouping
- Task dependencies are simple and straightforward
- You're debugging or working in resource-constrained environments
Example:
from agent_sdk.optimization import ManualOptimizer
optimizer = ManualOptimizer()
subgraphs = optimizer.generate_subgraphs(task_data.planned_tasks)
What Is a SubGraph
?
A SubGraph
is a lightweight wrapper around a set of PlannerTask
s that can be executed as a unit. It includes:
- A unique
subgraph_id
- List of tasks
- List of other subgraph IDs it depends on
Once generated, SubGraphs can be dispatched for execution or passed to the replanning module for refinement.
Replanning
Overview
Replanning allows you to selectively improve tasks that need revision after initial planning. Instead of re-running the entire Stage 1 pipeline, you can replace only the problematic parts of the plan.
This is useful when:
- A task fails or returns unexpected output
- You’ve introduced a new agent, tool, or model
- You want to fine-tune performance or instructions on a per-task basis
Step-by-Step Workflow
Step 1: Mark the Tasks
Before invoking the replanner, make sure that tasks requiring revision are marked with:
task.mark_replan = True
Only tasks with this flag will be included in the replanning request.
Step 2: Call the Replanner
from agent_sdk.replanning import SubGraphRePlanner
replanner = SubGraphRePlanner(
llm_api=llm_inference_api,
known_subjects=subject_registry,
known_tools=tools_registry,
known_workflows=dsl_registry,
known_llms=llm_registry,
known_codegens=codegen_registry
)
updated_subgraph = replanner.replace_replan_tasks(
subgraph=original_subgraph,
custom_append_text="Ensure memory efficiency and clarity."
)
What Gets Updated?
Only:
- Tasks marked with
mark_replan = True
- Their
task_id
,description
,execution_type
,parameters
,arguments
Preserved:
main_task_id
- All dependencies
Use Cases
Scenario | Replanning Value |
---|---|
Tool endpoint changes | Update tool parameters without redoing full plan |
Agent skill upgrades | Reassign task to improved agent with new traits |
LLM returns noisy/incomplete output | Retry planning only for affected components |
Tasks require tighter performance constraints | Inject steerability hints for task-specific tuning |
Example Task Before Replanning
{
"task_id": "T003",
"description": "Query API and store result.",
"execution_type": "tool_call",
"parameters": { "url": "http://api/endpoint" },
"mark_replan": true
}
After Replanning
{
"task_id": "T9001",
"main_task_id": "T003",
"description": "Query API, validate JSON schema, and store normalized results.",
"execution_type": "tool_call",
"parameters": {
"url": "http://api/endpoint",
"validation_schema": "schema_v2"
}
}
Integration Guidance
End-to-End Flow with Optimization + Replanning
# 1. Generate initial plan
planner = Planner(...)
task_data = planner.create_plan(...)
# 2. Optimize the task plan
optimizer = OptimizationPlanner(llm_api)
subgraphs = optimizer.generate_subgraphs(task_data)
# 3. Mark problematic tasks for replanning
for sg in subgraphs:
for task in sg.tasks:
if some_custom_check(task):
task.mark_replan = True
# 4. Replan only the marked tasks
replanner = SubGraphRePlanner(...)
updated_subgraphs = [replanner.replace_replan_tasks(sg) for sg in subgraphs]
# 5. Continue to Stage 2 planning and execution