Skip to content

Behavior Controller

Introduction

The Behavior Controller module is a two-phase behavior orchestration system designed to generate, process, and evaluate structured tasks using Large Language Models (LLMs). It serves as the intelligent backend for any application that requires:

  • Automated planning of tasks (e.g., multi-step execution flows)
  • Decision-making based on prior context and structured rules
  • Integration of internal (AIOS) and external (OpenAI) inference systems
  • History-aware behavior execution using FrameDB pointers or local context

The system supports two distinct phases in task reasoning:

  1. Phase 1: Behavior Planning

  2. Generates a structured plan containing multiple tasks.

  3. Each task includes attributes like execution type, dependencies, and configuration.

  4. Phase 2: Behavior Execution

  5. From the Phase 1 plan, selects the most appropriate action to execute next.

  6. Provides a uid referencing the chosen executable behavior.

Key Features

Feature Description
LLM Agnostic Supports both AIOS-based and OpenAI-based inference
Structured Output All outputs follow a strict JSON format based on provided templates
Historical Context Support Retrieves and reuses data from past executions
Queue-Based Execution Tasks are handled asynchronously using background workers
Full Lifecycle Management Tracks task status, logs metadata, and stores results for traceability

Architecture

Behavior-controller

Download Image

The Behavior Controller is structured as a modular, two-phase reasoning pipeline designed to generate and select behavior plans using LLMs, DSL logic, and contextual memory. Each component plays a distinct role in orchestrating, enriching, executing, and finalizing behavior tasks.

Component Overview

Component Description
Behavior Selection API The external entry point for the controller. Accepts POST requests for Phase 1 and Phase 2 via /process-phase-1-task and /process-phase-2-task. It initiates the task lifecycle, prepares metadata, and dispatches the request to the internal controller.
Previous Results Populator Before planning begins, this module performs similarity-based or goal-matching retrieval from historical results. It enriches the current input with prior knowledge (context JSON or FrameDB pointers), enabling memory-aware behavior planning.
Stage 1 Executor Executes Phase 1 logic using the Phase1Executor. It applies a prompt template and invokes either the internal AIOSLLM (gRPC) or external OpenAILLM (HTTP) to generate a structured plan—a list of behavior candidates with metadata like dsl, tool_id, or execution_type.
Stage 1 Results Collector Receives the generated candidate plan from Stage 1 and stores it as a PreviousResults entry in the database. This makes the candidate pool available to Phase 2 and also reusable for future queries.
Stage 2 Executor Runs Phase 2 logic using the Phase2Executor. Given a list of behavior candidates, it applies a second LLM prompt to select the most suitable uid representing the best next action.
Stage 2 Results Collector Captures and persists the selected behavior uid in the PreviousResultsPhase2 table. This final result is returned to the client or passed downstream.
Feedback Executor Triggered if Stage 2 fails to select a valid action. It adapts or rewrites the input (e.g., modifying the LLM prompt or removing invalid candidates) and re-invokes Stage 1 to regenerate a new behavior plan.
Result Notifier Responsible for delivering the final outcome to downstream consumers. This could involve sending it to another system, pushing a status update, or logging the behavior decision.
FrameDB Client Resolves framedb_ptr references by fetching the actual context data from FrameDB (a distributed Redis-based store). Enables large historical context blocks to be dereferenced without storing them inline in the SQL database.

Task Lifecycle

Key Architectural Components

  1. Flask API Layer

  2. Receives external task requests via HTTP POST endpoints.

  3. Provides /process-phase-1-task and /process-phase-2-task.
  4. Passes incoming data to the BehaviorController for handling.

  5. Behavior Controller

  6. Acts as the central orchestrator.

  7. Validates and persists incoming tasks.
  8. Submits tasks to the background processor.
  9. Waits for result and updates task status.

  10. TaskProcessor

  11. Asynchronous background engine using ThreadPoolExecutor.

  12. Continuously monitors a queue for incoming tasks.
  13. Executes task logic (e.g., invoking LLM).
  14. Pushes results to an output queue.

  15. Phase Executors

  16. Phase1Executor: Generates structured plans from inputs.

  17. Phase2Executor: Selects the most appropriate action.
  18. Both use configurable templates and can call either:

    • AIOSLLM (internal gRPC-based LLM)
    • OpenAILLM (OpenAI API)
  19. Database Layer

  20. SQLAlchemy models persist:

    • Task metadata (Phase1Data / Phase2Data)
    • Output results (PreviousResults / PreviousResultsPhase2)
    • CRUD interfaces support creation, updates, queries.
  21. Results Handler

  22. Maintains historical execution results.

  23. Supports fetching latest results for reuse (e.g., context construction).
  24. Integrates with FrameDB for dereferencing framedb_ptr.

  25. FrameDB Client

  26. Resolves and fetches contextual data from a distributed Redis setup.

  27. Used when results store references to prior frame-based data.

Task Lifecycle

Phase 1 (Behavior Planning)

  1. Request Received

  2. User sends a POST request to /process-phase-1-task with search_id and phase_1_data.

  3. Task Initialized

  4. A Phase1TaskData object is created.

  5. Stored in the database with status pending.

  6. Submitted to Processor

  7. Task is queued via TaskProcessor.submit_task.

  8. Execution

  9. A worker thread picks the task.

  10. Prepares input using latest Phase 1 results (context-aware).
  11. Calls Phase1Executor.run_task(...) using LLM backend.
  12. Structured plan is returned.

  13. Result Stored

  14. Result is persisted.

  15. Task status updated to complete or failed.

Phase 2 (Behavior Selection)

  1. Request Received

  2. User sends a POST request to /process-phase-2-task with search_id and phase_2_data.

  3. Task Initialized

  4. A Phase2TaskData object is created and stored.

  5. Submitted to Processor

  6. Sent to the background queue.

  7. Execution

  8. Executor fetches task.

  9. Calls Phase2Executor.initialize_llm(...) to select next action.

  10. Result Stored

  11. uid of selected action stored as PreviousResultsPhase2.

  12. Status updated accordingly.

Schema

This section explains the core SQLAlchemy models used in the Behavior Controller system. These models define how tasks and results are stored in a relational database.

We will cover the following models:

  • Phase1Data
  • Phase2Data
  • PreviousResults

Phase1Data

Model Definition

class Phase1Data(Base):
    __tablename__ = "phase_1_data"

    search_id = Column(String, primary_key=True)
    phase_1_data = Column(JSON, nullable=True)
    entry_time = Column(Integer, default=int(time.now()))
    last_update_time = Column(Integer, default=int(time.now()), onupdate=int(time.now()))
    status = Column(String, nullable=True)
    log_runtime_trace = Column(JSON, nullable=True)
    n_repititions = Column(String, nullable=True)

Field Descriptions

Field Name Type Description
search_id String Unique identifier for the task (primary key).
phase_1_data JSON Raw input data for the Phase 1 behavior planning task.
entry_time Integer Unix timestamp representing when the task was created.
last_update_time Integer Unix timestamp for last status update; auto-updated.
status String Current status of the task (pending, complete, or failed).
log_runtime_trace JSON Optional structured log or debug metadata from execution.
n_repititions String Indicates how many repetitions of the plan are requested or executed.

Phase2Data

Model Definition

class Phase2Data(Base):
    __tablename__ = "phase_2_data"

    search_id = Column(String, ForeignKey("phase_1_data.search_id"), primary_key=True)
    phase_2_data = Column(JSON, nullable=True)
    entry_time = Column(Integer, default=int(time.now()))
    last_update_time = Column(Integer, default=int(time.now()), onupdate=int(time.now()))
    status = Column(String, nullable=True)
    log_runtime_trace = Column(JSON, nullable=True)

Field Descriptions

Field Name Type Description
search_id String Primary key and foreign key linking to Phase1Data.search_id.
phase_2_data JSON Input data for Phase 2 decision-making based on the Phase 1 plan.
entry_time Integer Task creation timestamp.
last_update_time Integer Timestamp of last update (e.g., completion or failure).
status String Task execution status (pending, complete, or failed).
log_runtime_trace JSON Execution trace/logs for debugging Phase 2 decisions.

PreviousResults

Model Definition

class PreviousResults(Base):
    __tablename__ = "previous_results"

    result_id = Column(String, primary_key=True)
    context_json_or_context_framedb_ptr = Column(JSON, nullable=True)
    derived_search_tags = Column(Text, nullable=True)
    goal_data = Column(JSON, nullable=True)
    candidate_pool_behavior_dsls = Column(JSON, nullable=True)
    action_type = Column(String, nullable=True)
    action_sub_type = Column(String, nullable=True)

Field Descriptions

Field Name Type Description
result_id String Unique identifier for the stored result.
context_json_or_context_framedb_ptr JSON Stores either the actual context (as JSON) or a FrameDB pointer (with dereferencing instructions).
derived_search_tags Text Tags derived from the search query or results, used for filtering/search.
goal_data JSON Represents the original goal or objective metadata behind this task.
candidate_pool_behavior_dsls JSON A list or map of DSLs that were considered as possible behavior choices.
action_type String Broad category of action represented by this result (e.g., planning, routing).
action_sub_type String More specific type of action (e.g., "fetch-weather", "allocate-node").

Processing Lifecycle

The Processing Lifecycle section outlines the full path a task follows—from submission via the REST API, through background processing, LLM invocation, and result persistence. This lifecycle applies independently to both Phase 1 (behavior planning) and Phase 2 (behavior execution), although the underlying mechanisms are similar.


Step 1: Task Submission via API

  • A client sends a POST request to either:

  • /process-phase-1-task with search_id and phase_1_data

  • /process-phase-2-task with search_id and phase_2_data
  • These requests are handled by Flask routes which extract the payload and call methods on BehaviorController.

Step 2: Task Initialization

  • The BehaviorController constructs an in-memory data object:

  • Phase1TaskData or Phase2TaskData, depending on the phase.

  • This data is serialized using .to_dict() and stored in the corresponding SQLAlchemy model via the appropriate CRUD class (Phase1DataCRUD, Phase2DataCRUD).
  • The task is inserted into the database with a default status of "pending".

Step 3: Task Submission to Processor

  • The controller then submits the task to the background TaskProcessor using:

python waiter = task_processor.submit_task(search_id, mode, task_data.to_dict())

  • This internally creates a Task object containing:

  • search_id

  • mode (either "phase1" or "phase2")
  • full input data dictionary

  • The Task is placed into an input queue monitored by a thread.


Step 4: Asynchronous Processing

  • A worker thread inside the TaskProcessor picks up the task from the queue and executes it via:

python result = task.execute()

  • The Task.execute() function calls the appropriate LLM executor:

  • Phase1Executor.run_task(...)

  • Phase2Executor.initialize_llm(...)

  • These executors use templates and configuration to invoke either:

  • AIOSLLM via gRPC

  • OpenAILLM via OpenAI's HTTP API

  • The LLM returns a JSON-formatted result, which is parsed and returned.


Step 5: Result Collection and Status Update

  • The result is placed in the output_queue.
  • The controller waits on the TaskWaiter.get() method to retrieve the result.
  • Once the result is available:

  • It is stored if needed (e.g., via create_phase_1_result or create_phase_2_result).

  • The corresponding SQLAlchemy record is updated:

    • If successful → status = "complete"
    • If failed or timed out → status = "failed"

Step 6: Historical Usage and FrameDB Support (Phase 1 only)

  • During Phase 1 input preparation, the system may call:

python get_latest_phase_1_results(history_value)

  • This returns previously stored PreviousResults, which may contain either:

  • Raw context JSON, or

  • A framedb_ptr to fetch from FrameDB (via Redis).

  • If a framedb_ptr is present:

  • FrameDBFetchClient resolves the pointer using routing logic.

  • Data is fetched securely from the appropriate Redis node.

  • This context is embedded into the input data passed to the LLM executor, enabling memory-based reasoning.


LLM Executor Interface

The Behavior Controller module uses a unified interface to interact with multiple LLM backends for executing structured behavior logic. This interface is abstracted via the BehaviorAPI protocol and implemented by two concrete classes:

  • AIOSLLM: Internal inference system using gRPC (used with AIOS framework)
  • OpenAILLM: External OpenAI models via REST API

These backends are invoked through the Phase1Executor and Phase2Executor depending on the phase of processing.


BehaviorAPI Interface

Located in generator.py, the BehaviorAPI class defines the abstract behavior expected from any LLM integration.

class BehaviorAPI(ABC):
    def search(self, task_description: str, descriptors: Dict[str, str]) -> str:
        pass

However, in practice, the interface is implemented through a more specific method called generate(...), which both AIOSLLM and OpenAILLM expose directly.


AIOSLLM (gRPC-based Internal Inference)

Purpose

Used when behavior planning or execution is routed through an internal AIOS inference block.

Key Characteristics

Property Description
Transport gRPC
Message Format Protobuf (BlockInferencePacket)
Channel BlockInferenceServiceStub
Input Pre-processed task description + serialized as JSON bytes
Output Inference result (string, usually JSON)

Usage Flow

llm_instance = AIOSLLM(...)
result = llm_instance.generate(task_description, descriptors)

The generator builds and sends a gRPC request with fields such as:

  • block_id, instance_id, session_id, etc.
  • Optional frame_ptr and query_parameters
  • data: The actual task input in bytes

OpenAILLM (OpenAI via REST)

Purpose

Connects to OpenAI’s public API (e.g., GPT-4, GPT-4o) for generating structured task responses.

Key Characteristics

Property Description
Transport HTTPS
Library openai Python client
API chat.completions.create
Input Task description passed as a user message
Output Extracted message.content from the first choice