Skip to Content
Developer DocsServicesOrchestration Engine

Orchestration Engine

The Orchestration Engine is the core agent execution runtime for AEGIS. It uses LangGraph to build a stateful graph pipeline that processes user messages through system prompts, memory retrieval, LLM calls, tool execution, skill injection, HITL approval checkpoints, and output formatting.

Overview

This service is the brain of AEGIS. When a user sends a message to an AI agent, the orchestration engine:

  1. Loads the agent’s system prompt and conversation history
  2. Retrieves working memory and episodic memories
  3. Calls the LLM (via LiteLLM) with budget enforcement
  4. Executes any tool calls the LLM requests
  5. Detects and injects relevant skills (3-tier architecture)
  6. Checks if HITL approval is required
  7. Runs a synthesis LLM call with injected skill context
  8. Formats the final output

Beyond the core agent pipeline, the orchestration engine also hosts the compliance dashboard API, checklist management, rule lifecycle, and workspace SSE endpoints.

Port & Language

PropertyValue
Port8001
LanguagePython 3.12
FrameworkFastAPI
Entry pointsrc/orchestration/main.py

Key Endpoints

Agent Execution

MethodPathDescription
POST/executeSynchronous full pipeline execution. Runs the entire LangGraph graph and returns the final state.
GET/conversations/{conversation_id}/streamSSE streaming endpoint. Streams node_end events as each LangGraph node completes, then a done event.
GET/healthHealth check. Returns service version.

Conversations

MethodPathDescription
GET/conversationsList conversations (filterable by user_id).
POST/conversationsCreate a new conversation.
GET/conversations/{id}Get conversation details with messages.
PUT/conversations/{id}Update conversation title or status.
DELETE/conversations/{id}Soft-delete a conversation.

Compliance Dashboard (R1.2)

MethodPathDescription
GET/compliance/summarySummary cards (total entities, domain counts, status distribution).
GET/compliance/matrixEntity x compliance domain matrix with color-coded status cells.
GET/compliance/charts/*Chart data (deadline bar, compliance trend, filing status, flaring volume).
POST/compliance/assessTrigger compliance assessment for an entity.

Checklists (R1.3)

MethodPathDescription
GET/checklistsList filing checklists.
POST/checklistsCreate a new checklist from a template.
GET/checklists/{id}Get checklist with items.
PUT/checklists/{id}/items/{idx}Update a checklist item status.
POST/checklists/{id}/submitSubmit a completed checklist for review.

Artifacts

MethodPathDescription
GET/checklists/{id}/items/{idx}/artifactsList artifacts for a checklist item.
POST/checklists/{id}/items/{idx}/artifactsUpload or create an artifact.

Rules (R1.5)

MethodPathDescription
GET/rulesList rule versions.
POST/rulesCreate a new rule version (immutable).
GET/rules/{identifier}/versionsGet all versions of a specific rule.
GET/rules/changesDetect rules that changed since a reference date.
POST/rules/monitor/scanTrigger a rule change monitoring scan.

Workspaces (R1.4)

MethodPathDescription
POST/workspaces/{id}/assessStart an automated compliance assessment.
GET/workspaces/{id}/streamSSE stream for workspace events.
POST/workspaces/{id}/items/{idx}/agentSend a message to the scoped agent for a checklist item.

Architecture

LangGraph Pipeline

The core execution follows a StateGraph with 9 nodes:

START -> system_prompt_node -> memory_node -> initial_llm_call -> [tool_node -> initial_llm_call]* (tool loop) -> skill_select_node -> [skill_inject_node] -> approval_node -> synthesis_llm_call -> [tool_node -> synthesis_llm_call]* -> output_format -> END

Routing logic between nodes:

  • After initial_llm_call: If there are pending tool calls, route to tool_node. If there is an error, route to output_format. Otherwise, route to skill_select_node.
  • After tool_node: Check iteration limit. Route back to the appropriate LLM call based on the current phase (initial or synthesis).
  • After skill_select_node: If skills were detected, route to skill_inject_node. Otherwise, skip to approval_node.
  • After approval_node: If HITL is required, route to output_format with awaiting_hitl status. Otherwise, continue to synthesis_llm_call.
  • After synthesis_llm_call: If tool calls are needed, route to tool_node. Otherwise, route to output_format.

Module Breakdown

src/orchestration/ ├── main.py # FastAPI app, /execute and /stream endpoints ├── engine.py # LangGraph StateGraph construction and compilation ├── nodes.py # Node implementations (1 function per node) ├── tools.py # Tool definitions and execution for LLM function calling ├── state.py # GraphState TypedDict (flows through the graph) ├── schemas.py # Request/response Pydantic models ├── services.py # HTTP clients for memory, KG, approval services ├── budget.py # Token/cost budget enforcement ├── config.py # Settings from environment variables ├── conversation_routes.py # Conversation CRUD (list, create, get, update, delete) ├── skills/ # Checklist-driven skill implementations │ ├── __init__.py # Skill registry (register_skill, list_registered_skills) │ ├── base.py # Base skill class │ ├── rule37/ # Rule 37 spacing exception skills (11 skills) │ ├── rule32/ # Rule 32 flaring exception skills (10 skills) │ ├── form_pr/ # Form PR production report skills (8 skills) │ ├── flaring_monitor/ # Flaring monitor skills (6 skills) │ └── field_event/ # Field event conversation skills ├── compliance/ # Compliance dashboard and workspace │ ├── routes.py # /compliance/summary, /matrix, /charts │ ├── engine.py # Compliance status computation engine │ ├── queries.py # Cypher query templates for compliance │ ├── checklist_routes.py # Checklist CRUD routes │ ├── artifact_routes.py # Artifact management routes │ ├── rule_routes.py # Rule version CRUD routes │ ├── rule_lifecycle.py # Immutable rule versioning logic │ ├── rule_monitor/ # Rule change monitoring │ └── workspace/ # Entity workspace (assessment, SSE, agent) │ ├── routes.py # Workspace API routes │ ├── assessment.py # Automated assessment logic │ └── events.py # WorkspaceEventType enum, SSE event formatting ├── seed_skills.py # Skill registry seeding script ├── seed_checklists.py # Checklist template seeding script ├── seed_rules.py # Compliance rule data seeding script ├── seed_conversations.py # Demo conversation seeding └── seed_demo_data.py # Combined demo data seeder

GraphState

The GraphState TypedDict carries all execution state through the graph:

class GraphState(TypedDict, total=False): execution_id: str conversation_id: str agent_id: str agent_type: str # "rule_37", "rule_32", etc. tenant_id: str status: str # "pending", "running", "completed", "awaiting_hitl", "failed" messages: list[dict] # Conversation messages (role/content/tool_calls) system_prompt: str model: str # LLM model to use (default: gpt-4o) injected_skill_ids: list # Skills already injected active_skills: dict # skill_id -> tier level tokens_used: int cost_usd: float max_tokens: int max_cost_usd: float pending_tool_calls: list # Tool calls awaiting execution iteration: int # Guards against infinite loops memory_context: dict # Working + episodic memory data selected_skill_ids: list # Skills detected from LLM output llm_phase: str # "initial" or "synthesis" hitl_required: bool hitl_checkpoint_type: str hitl_approval_id: str | None

Budget Enforcement

The budget.py module enforces per-execution limits:

  • Token budget: Raises BudgetExceededError when tokens_used >= max_tokens
  • Cost budget: Raises BudgetExceededError when cost_usd >= max_cost_usd

Budget is checked before every LLM call. When exceeded, the /execute endpoint returns HTTP 429. The SSE stream emits an error event.

Skill Injection (3-Tier Architecture)

When the LLM output contains SKILL_SELECT:{skill_id} patterns, the skill injection pipeline activates:

  1. Ledger check — Query the memory service’s injection ledger to prevent duplicate injection
  2. Tier 2 — Load the skill definition (description, steps, requirements, output format) from the PostgreSQL skills table
  3. Tier 3 — Load reference artifacts (templates, examples, regulations) from the skill_artifacts table
  4. Tier 3.5 — Load entity context from the knowledge graph (well data, lease data, relationships)
  5. Mark ledger — Record the injection to prevent re-injection in subsequent turns

All injected content is appended as system messages to the conversation.

HITL Approval Checkpoints

The approval node auto-detects HITL requirements from:

  • Injected skills with known checkpoint mappings (e.g., rule37-filing-assembly triggers pre_filing)
  • Executed tools (e.g., rule37_filing_assembly triggers pre_filing)
  • Metadata overrides (hitl_checkpoint_type in request metadata)

When HITL is required, the engine creates an approval request via the approval service and pauses execution with status: "awaiting_hitl".

Tool Definitions

The engine registers tools for LLM function calling based on agent type:

ToolAgent TypesDescription
spacing_calculationrule_37Calculate distances to lease lines and offset wells
offset_well_analysisrule_37Find offset wells requiring notice
rule37_filing_assemblyrule_37Assemble Form W-1 filing package
good_cause_narrativerule_37Generate good cause justification
flaring_volume_calcrule_32Calculate flaring volumes against R-32 thresholds
gas_analysisrule_32Analyze gas composition and pipeline readiness
rule32_filing_assemblyrule_32Assemble Form R-32 package
emissions_estimaterule_32, flaring_monitorCalculate CO2e emissions

Dependencies

Python Packages

PackageVersionPurpose
fastapi^0.115Web framework
uvicorn^0.34ASGI server
langgraph^0.4StateGraph agent execution engine
litellm^1.60LLM routing (OpenAI, Anthropic, etc.)
sse-starlette^2.0Server-Sent Events for streaming
aegis-sharedlocalShared models and DB helpers
langfuse^4.0LLM observability / tracing (OTEL-based, optional)
opentelemetry-{api,sdk,exporter-otlp}^1.27OTEL plumbing for the litellm langfuse_otel callback

Service Dependencies

ServiceProtocolPurpose
Memory Service (8002)HTTPWorking memory, episodic memory, injection ledger
Knowledge Graph Service (8003)HTTPEntity context assembly (Tier 3.5)
Approval Service (8004)HTTPHITL approval request creation
PostgreSQLTCPSkill registry, conversation persistence, compliance data

Configuration

Environment VariableDefaultDescription
ORCHESTRATION_HOST0.0.0.0Bind address
ORCHESTRATION_PORT8001Bind port
DATABASE_URLpostgresql://aegis:aegis_local@localhost:5432/aegisPostgreSQL connection
REDIS_URLredis://localhost:6379Redis connection
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Kafka brokers
DEFAULT_LLM_MODELgpt-4oDefault LLM model for agent calls
DEFAULT_MAX_TOKENS_PER_EXECUTION100000Default token budget per execution
DEFAULT_MAX_COST_PER_EXECUTION5.0Default cost budget (USD) per execution
MAX_GRAPH_ITERATIONS20Max tool call loop iterations
MEMORY_SERVICE_URLhttp://localhost:8002Memory service URL
KNOWLEDGE_GRAPH_SERVICE_URLhttp://localhost:8003Knowledge graph service URL
APPROVAL_SERVICE_URLhttp://localhost:8004Approval service URL
EPISODIC_TOP_K3Number of episodic memories to retrieve
LANGFUSE_PUBLIC_KEY(empty)Langfuse public key — tracing is enabled only when set
LANGFUSE_SECRET_KEY(empty)Langfuse secret key — tracing is enabled only when set
LANGFUSE_HOSThttps://us.cloud.langfuse.comLangfuse host/region (US default; EU/JP/HIPAA/self-hosted supported)

Observability (Langfuse Tracing)

The engine ships optional Langfuse  tracing, wired in orchestration/observability.py. It is off by default and activates only when both LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set — otherwise every tracing helper is a no-op and the service behaves identically.

How it works:

  • At startup (init_tracing() in the FastAPI lifespan) the engine enables the LiteLLM langfuse_otel callback for all litellm.acompletion calls and initialises the Langfuse SDK (v4, OpenTelemetry-based).
  • Each agent run is wrapped in a parent span via trace_agent_run(...). LiteLLM generation spans nest under it automatically: both layers ride the global OTEL TracerProvider and share OTEL context, and Langfuse’s span processor exports gen_ai.* spans by default — yielding one trace per agent run with each LLM call as a nested generation (model, tokens and cost captured automatically).

Trace attributes set:

AttributeSourceWhy
session_idconversation_id / checklist_idGroups multi-turn runs in the Sessions view
user_idmetadata.user_idUser filtering and cost attribution
tagsagent type + phase (execute / stream / assessment / workspace-agent)Per-agent / per-feature analytics
input / outputuser message + final assistant messageReadable traces without leaking full state

Instrumented entry points: POST /execute, GET /conversations/{id}/stream, workspace assessment (GET /workspaces/{id}/stream), and the scoped agent (POST /workspaces/{id}/items/{idx}/agent).

R34 entity-layer child spans (orchestration/entity_spans.py): the entity resolution / context-assembly / HITL pipelines emit child_span observations that nest under the agent-run span, so an auto-select decision and the full HITL flow are reconstructable from the trace alone. They carry the internal, precise values (the audit audience, not the LLM) and are no-ops when tracing is off. All fire orchestration-side — the knowledge-graph service is untraced and OTEL context doesn’t cross the HTTP boundary, so KG returns the facts (the resolution_trace block on the resolve response) and the orchestration tool wrappers + gate/guard emit the spans.

SpanWhenKey attributes
entity_resolve.exact_matchevery resolvematch_found, query, entity_types_scope
entity_resolve.trigram_matchevery resolvetop_name_similarity (the calibration signal), candidates_returned
entity_resolve.proximity_scoreresolve w/ candidatestop_proximity_score, context_entity_ids_count
entity_resolve.composite_scoreresolve w/ candidatesname_component / proximity_component / type_hint_component (weighted)
entity_resolve.outcomeevery resolveoutcome (auto_selected / asked_user / no_match), confidence
context_assemble.walkcontext_assembleentity_id, depth, entities_traversed
context_assemble.rendercontext_assembletoken_estimate, truncation_occurred, relationships_truncated_count
hitl.gate_firedmutating-capability gate firescapability_key, tool_call_id, assistant_message_stripped
hitl.resume_reconcileresume_guard reconciles a verdictverdict (approved / rejected / pending / unreadable), reviewer_note_present
hitl.resume_replayapproved mutation replayedreplay_succeeded, duplicate_emission_suppressed

The entity_resolve.outcome span is a separate layer from the platform:log_entity_resolution rules-audit entry — the spans only wrap the resolve callsite and never re-fire the on_entity_resolve trigger (one audit entry + one outcome span per auto-select). An approved-but-unexecuted mutation is the absence of a hitl.resume_replay span next to an approved approval record. See docs/specs/phases/R34-phase-6-calibration-and-demo-runbook.md.

Running Locally

cd services/orchestration-engine poetry install poetry run uvicorn orchestration.main:app --reload --port 8001

Seeding Data

The service includes several seeding scripts:

# Seed the skill registry and agent definitions poetry run python -m orchestration.seed_skills # Seed checklist templates poetry run python -m orchestration.seed_checklists # Seed compliance rules poetry run python -m orchestration.seed_rules # Seed demo conversations poetry run python -m orchestration.seed_conversations

The orchestration engine requires PostgreSQL to be running for conversation persistence and skill registry access. The memory service and knowledge graph service should be running for full pipeline functionality, but the engine degrades gracefully if they are unavailable.

Last updated on