Agent DSL Manager — Client Developer Documentation
Introduction
The DSL Manager module provides a unified interface to register, describe, and execute DSL-based workflows. It also includes utilities to render LLM-readable task descriptions and to plan which DSL/agent/tool to use for a goal.
It provides:
- Pluggable workflow backends (local loader / remote registry / custom)
- Consistent execution API (
execute
,deploy
,execute_command
, etc.) - LLM-friendly descriptions via
DSLPlanner
- Optional human-in-the-loop through NATS helpers
A typical workflow is:
- Declare a
DSLItem
in the subject spec. - Build/resolve a DSL executor from the item.
- Register the executor in
DSLWorkflowsManager
. - Describe DSLs with
DSLPlanner
, optionally plan withPlanner
. - Execute the chosen DSL and collect results/feedback.
Registering DSLs in Subject Specification
DSLs are registered using a DSLItem
.
Data Class Reference
from dataclasses import dataclass, field
from typing import Dict, Any
@dataclass
class DSLItem:
dsl_type: str # e.g. "registry", "local", "remote"
dsl_workflow_id: str # workflow identifier/URI
dsl_parameters: Dict[str, Any] = field(default_factory=dict)
dsl_type
: Application specific typedsl_workflow_id
is the logical route/URI (e.g.,"registry:add-workflow@1.0"
,"local:math.add"
,"remote://dsl-executor/add"
).dsl_parameters
carries backend-specific settings (e.g., registry URL, module path, auth headers, timeouts).
Minimal Required Configuration
{
"dsl_type": "registry",
"dsl_workflow_id": "add-workflow@1.0",
"dsl_parameters": {
"registry_url": "http://dsl-registry-service"
}
}
Field Descriptions
Field | Location | Type | Description |
---|---|---|---|
dsl_type | root | str |
Loader/executor type: "registry" , "local" , or "remote" . |
dsl_workflow_id | root | str |
Workflow name/URI/version. |
dsl_parameters | root | dict |
Backend-specific config: registry_url , module_path , executor_url , headers , timeout_s , etc. |
Example in a Subject Specification
{
"subject_id": "agent-dsl-executor",
"subject_type": "agent",
"dsls": [
{
"dsl_type": "registry",
"dsl_workflow_id": "add-workflow@1.0",
"dsl_parameters": {
"registry_url": "http://dsl-registry-service",
"extras": { "x": 10 }
}
},
{
"dsl_type": "local",
"dsl_workflow_id": "math.add",
"dsl_parameters": {
"module_path": "agents.dsl.math.add",
"output_module": "default_output"
}
},
{
"dsl_type": "remote",
"dsl_workflow_id": "transform.text.summarize",
"dsl_parameters": {
"executor_url": "http://dsl-executor.svc.cluster.local:8080/execute",
"headers": { "Authorization": "Bearer $RUNTIME_TOKEN" },
"timeout_s": 20
}
}
]
}
Import and Setup
from agent_sdk.utils.dsl_manager import (
DSLWorkflowsManager,
DSLPlanner,
HumanInteractionClient
)
from agent_sdk.utils.planner import Planner # LLM-based planning
Building an Executor from a DSLItem
def build_executor_from_dsl_item(item: dict):
dsl_type = item["dsl_type"]
workflow_id = item["dsl_workflow_id"]
params = item.get("dsl_parameters", {})
if dsl_type == "registry":
# Resolve via Registry service
registry_url = params["registry_url"]
return DSLWorkflowsManager(registry_url).get_workflow(workflow_id)
if dsl_type == "local":
# Dynamically import & construct local executor
module_path = params["module_path"] # e.g. "agents.dsl.math.add"
from importlib import import_module
mod = import_module(module_path)
return getattr(mod, "Executor")(**params)
if dsl_type == "remote":
# Wrap a remote HTTP/gRPC executor
from agent_sdk.utils.remote_dsl import RemoteDSLWorkflowExecutor
return RemoteDSLWorkflowExecutor(
executor_url=params["executor_url"],
headers=params.get("headers"),
timeout_s=params.get("timeout_s", 15)
)
raise NotImplementedError(f"Unknown dsl_type: {dsl_type}")
DSLWorkflowsManager
manager = DSLWorkflowsManager("http://dsl-registry-service")
# Register
manager.register_new_dsl_workflow("add_workflow", "add-workflow@1.0", extras={"x": 10})
# Execute (returns output of the workflow's configured output module)
result = manager.execute("add_workflow", {"x": 5}, output_module="default_output")
# Other operations
manager.deploy("add_workflow") # preload modules / warmup
manager.check_execute("add_workflow") # dry-run validation
manager.execute_command("add_workflow", "reindex", {"force": True})
wf = manager.get_workflow("add_workflow") # access underlying executor
all_dsls = manager.list() # list registered
search_meta = manager.get_searchable_representation()
manager.unregister_dsl_workflow("add_workflow")
Common parameters
name
: local alias in the manager’s registry.workflow_id
: backing workflow reference (same asdsl_workflow_id
).output_module
: which terminal output node/module to read from if multiple.extras
: optional config passed to executor (e.g., constants, defaults).
DSLPlanner
Use DSLPlanner
to generate LLM- and human-readable summaries of your available DSLs and how to call them.
planner = DSLPlanner(dsl_workflows_manager=manager)
planner.prepare_dsl_descriptions() # scans manager & builds schema overview
planner.add_task_description("Add two numbers") # free-form task
planner.add_input_conversion({"x": 5}) # example input mapping
full = planner.get_full_description() # feed to an LLM or show to users
print(full)
What it does
- Renders each registered DSL’s name, purpose, inputs/outputs, and example calls.
- Helps LLMs choose between DSLs based on a task description.
Planner (LLM-integrated DSL Selector)
Planner
composes job metadata + DSL descriptions + agents/tools into a single prompt and calls a behavior controller (LLM) to propose a plan (e.g., “Use add_workflow
with inputs {x: 5, y: 8}”).
planner = Planner(
behavior_controller_url="http://behavior.svc/planner",
known_subjects=[...],
tools_functions_registry=[...]
)
planner.set_custom_description("project_type", "NLP utilities")
final_input = planner.generate_final_input(
job_goal={"intent": "summarization"},
job_objectives={"accuracy": "high"},
job_steerability_data={"style": "concise"},
task_description="Summarize customer reviews"
)
plan_response = planner.create_plan("Summarize customer reviews")
Highlights
- Prompt construction with job goal/objectives + DSL/agent/tool metadata.
- Behavior controller call via
TaskProcessorClient
(implementation detail). - Hooks via
set_custom_job_transformer
for custom formatting/filters.
Human Interaction (Optional)
Use these if a workflow requires approval, data fixes, or confirmations:
hic = HumanInteractionClient(nats_url="nats://nats:4222", topic="dsl.human")
# Send question → await response via MessageResponseWaiter (internal utility)
Writing a Custom DSL Executor
Implement a simple interface (execute
, optional deploy
/check_execute
/execute_command
):
class MyCustomDSLExecutor:
def __init__(self, **kwargs):
...
def deploy(self):
# module preload / cache warmup
...
def check_execute(self, input_data: dict) -> bool:
# validate inputs; return True/raise
return True
def execute(self, input_data: dict, output_module: str = None):
# perform the workflow logic and return output
return {"result": ...}
def execute_command(self, command_name: str, data: dict):
# custom control-plane actions (e.g., reindex)
return {"ok": True}
Register with the manager:
executor = MyCustomDSLExecutor(...)
manager.register_new_dsl_workflow("custom.flow", "custom.flow@1.0", extras={"mode": "fast"})
manager.deploy("custom.flow")
out = manager.execute("custom.flow", {"x": 1, "y": 2})
Quick Subject Spec Example (two DSLs)
{
"subject_id": "agent-dsl",
"subject_type": "agent",
"dsls": [
{
"dsl_type": "registry",
"dsl_workflow_id": "add-workflow@1.0",
"dsl_parameters": {
"registry_url": "http://dsl-registry-service"
}
},
{
"dsl_type": "remote",
"dsl_workflow_id": "transform.text.summarize",
"dsl_parameters": {
"executor_url": "http://dsl-executor.svc.cluster.local:8080/execute",
"timeout_s": 20
}
}
]
}
Minimal End-to-End Example
# 1) Build executors from DSLItems (e.g., loaded from subject spec)
items = [
{"dsl_type": "registry", "dsl_workflow_id": "add-workflow@1.0",
"dsl_parameters": {"registry_url": "http://dsl-registry-service"}},
{"dsl_type": "remote", "dsl_workflow_id": "transform.text.summarize",
"dsl_parameters": {"executor_url": "http://dsl-exec:8080/execute"}}
]
manager = DSLWorkflowsManager("http://dsl-registry-service")
for it in items:
exec_obj = build_executor_from_dsl_item(it)
alias = it["dsl_workflow_id"]
manager.register_new_dsl_workflow(alias, it["dsl_workflow_id"])
# Optionally bind the resolved executor to alias if needed
# manager.override_executor(alias, exec_obj)
# 2) Describe for LLM
planner = DSLPlanner(manager)
planner.prepare_dsl_descriptions()
planner.add_task_description("Add two numbers")
print(planner.get_full_description())
# 3) Execute
out = manager.execute("add-workflow@1.0", {"x": 5, "y": 7}, output_module="default_output")
print(out)