Orchestration Engine
The Orchestration Engine is the core agent execution runtime for AEGIS. It uses LangGraph to build a stateful graph pipeline that processes user messages through system prompts, memory retrieval, LLM calls, tool execution, skill injection, HITL approval checkpoints, and output formatting.
Overview
This service is the brain of AEGIS. When a user sends a message to an AI agent, the orchestration engine:
- Loads the agent’s system prompt and conversation history
- Retrieves working memory and episodic memories
- Calls the LLM (via LiteLLM) with budget enforcement
- Executes any tool calls the LLM requests
- Detects and injects relevant skills (3-tier architecture)
- Checks if HITL approval is required
- Runs a synthesis LLM call with injected skill context
- Formats the final output
Beyond the core agent pipeline, the orchestration engine also hosts the compliance dashboard API, checklist management, rule lifecycle, and workspace SSE endpoints.
Port & Language
| Property | Value |
|---|---|
| Port | 8001 |
| Language | Python 3.12 |
| Framework | FastAPI |
| Entry point | src/orchestration/main.py |
Key Endpoints
Agent Execution
| Method | Path | Description |
|---|---|---|
POST | /execute | Synchronous full pipeline execution. Runs the entire LangGraph graph and returns the final state. |
GET | /conversations/{conversation_id}/stream | SSE streaming endpoint. Streams node_end events as each LangGraph node completes, then a done event. |
GET | /health | Health check. Returns service version. |
Conversations
| Method | Path | Description |
|---|---|---|
GET | /conversations | List conversations (filterable by user_id). |
POST | /conversations | Create a new conversation. |
GET | /conversations/{id} | Get conversation details with messages. |
PUT | /conversations/{id} | Update conversation title or status. |
DELETE | /conversations/{id} | Soft-delete a conversation. |
Compliance Dashboard (R1.2)
| Method | Path | Description |
|---|---|---|
GET | /compliance/summary | Summary cards (total entities, domain counts, status distribution). |
GET | /compliance/matrix | Entity x compliance domain matrix with color-coded status cells. |
GET | /compliance/charts/* | Chart data (deadline bar, compliance trend, filing status, flaring volume). |
POST | /compliance/assess | Trigger compliance assessment for an entity. |
Checklists (R1.3)
| Method | Path | Description |
|---|---|---|
GET | /checklists | List filing checklists. |
POST | /checklists | Create a new checklist from a template. |
GET | /checklists/{id} | Get checklist with items. |
PUT | /checklists/{id}/items/{idx} | Update a checklist item status. |
POST | /checklists/{id}/submit | Submit a completed checklist for review. |
Artifacts
| Method | Path | Description |
|---|---|---|
GET | /checklists/{id}/items/{idx}/artifacts | List artifacts for a checklist item. |
POST | /checklists/{id}/items/{idx}/artifacts | Upload or create an artifact. |
Rules (R1.5)
| Method | Path | Description |
|---|---|---|
GET | /rules | List rule versions. |
POST | /rules | Create a new rule version (immutable). |
GET | /rules/{identifier}/versions | Get all versions of a specific rule. |
GET | /rules/changes | Detect rules that changed since a reference date. |
POST | /rules/monitor/scan | Trigger a rule change monitoring scan. |
Workspaces (R1.4)
| Method | Path | Description |
|---|---|---|
POST | /workspaces/{id}/assess | Start an automated compliance assessment. |
GET | /workspaces/{id}/stream | SSE stream for workspace events. |
POST | /workspaces/{id}/items/{idx}/agent | Send a message to the scoped agent for a checklist item. |
Architecture
LangGraph Pipeline
The core execution follows a StateGraph with 9 nodes:
START -> system_prompt_node -> memory_node -> initial_llm_call
-> [tool_node -> initial_llm_call]* (tool loop)
-> skill_select_node -> [skill_inject_node] -> approval_node
-> synthesis_llm_call -> [tool_node -> synthesis_llm_call]*
-> output_format -> ENDRouting logic between nodes:
- After
initial_llm_call: If there are pending tool calls, route totool_node. If there is an error, route tooutput_format. Otherwise, route toskill_select_node. - After
tool_node: Check iteration limit. Route back to the appropriate LLM call based on the current phase (initialorsynthesis). - After
skill_select_node: If skills were detected, route toskill_inject_node. Otherwise, skip toapproval_node. - After
approval_node: If HITL is required, route tooutput_formatwithawaiting_hitlstatus. Otherwise, continue tosynthesis_llm_call. - After
synthesis_llm_call: If tool calls are needed, route totool_node. Otherwise, route tooutput_format.
Module Breakdown
src/orchestration/
├── main.py # FastAPI app, /execute and /stream endpoints
├── engine.py # LangGraph StateGraph construction and compilation
├── nodes.py # Node implementations (1 function per node)
├── tools.py # Tool definitions and execution for LLM function calling
├── state.py # GraphState TypedDict (flows through the graph)
├── schemas.py # Request/response Pydantic models
├── services.py # HTTP clients for memory, KG, approval services
├── budget.py # Token/cost budget enforcement
├── config.py # Settings from environment variables
├── conversation_routes.py # Conversation CRUD (list, create, get, update, delete)
├── skills/ # Checklist-driven skill implementations
│ ├── __init__.py # Skill registry (register_skill, list_registered_skills)
│ ├── base.py # Base skill class
│ ├── rule37/ # Rule 37 spacing exception skills (11 skills)
│ ├── rule32/ # Rule 32 flaring exception skills (10 skills)
│ ├── form_pr/ # Form PR production report skills (8 skills)
│ ├── flaring_monitor/ # Flaring monitor skills (6 skills)
│ └── field_event/ # Field event conversation skills
├── compliance/ # Compliance dashboard and workspace
│ ├── routes.py # /compliance/summary, /matrix, /charts
│ ├── engine.py # Compliance status computation engine
│ ├── queries.py # Cypher query templates for compliance
│ ├── checklist_routes.py # Checklist CRUD routes
│ ├── artifact_routes.py # Artifact management routes
│ ├── rule_routes.py # Rule version CRUD routes
│ ├── rule_lifecycle.py # Immutable rule versioning logic
│ ├── rule_monitor/ # Rule change monitoring
│ └── workspace/ # Entity workspace (assessment, SSE, agent)
│ ├── routes.py # Workspace API routes
│ ├── assessment.py # Automated assessment logic
│ └── events.py # WorkspaceEventType enum, SSE event formatting
├── seed_skills.py # Skill registry seeding script
├── seed_checklists.py # Checklist template seeding script
├── seed_rules.py # Compliance rule data seeding script
├── seed_conversations.py # Demo conversation seeding
└── seed_demo_data.py # Combined demo data seederGraphState
The GraphState TypedDict carries all execution state through the graph:
class GraphState(TypedDict, total=False):
execution_id: str
conversation_id: str
agent_id: str
agent_type: str # "rule_37", "rule_32", etc.
tenant_id: str
status: str # "pending", "running", "completed", "awaiting_hitl", "failed"
messages: list[dict] # Conversation messages (role/content/tool_calls)
system_prompt: str
model: str # LLM model to use (default: gpt-4o)
injected_skill_ids: list # Skills already injected
active_skills: dict # skill_id -> tier level
tokens_used: int
cost_usd: float
max_tokens: int
max_cost_usd: float
pending_tool_calls: list # Tool calls awaiting execution
iteration: int # Guards against infinite loops
memory_context: dict # Working + episodic memory data
selected_skill_ids: list # Skills detected from LLM output
llm_phase: str # "initial" or "synthesis"
hitl_required: bool
hitl_checkpoint_type: str
hitl_approval_id: str | NoneBudget Enforcement
The budget.py module enforces per-execution limits:
- Token budget: Raises
BudgetExceededErrorwhentokens_used >= max_tokens - Cost budget: Raises
BudgetExceededErrorwhencost_usd >= max_cost_usd
Budget is checked before every LLM call. When exceeded, the /execute endpoint returns HTTP 429. The SSE stream emits an error event.
Skill Injection (3-Tier Architecture)
When the LLM output contains SKILL_SELECT:{skill_id} patterns, the skill injection pipeline activates:
- Ledger check — Query the memory service’s injection ledger to prevent duplicate injection
- Tier 2 — Load the skill definition (description, steps, requirements, output format) from the PostgreSQL skills table
- Tier 3 — Load reference artifacts (templates, examples, regulations) from the skill_artifacts table
- Tier 3.5 — Load entity context from the knowledge graph (well data, lease data, relationships)
- Mark ledger — Record the injection to prevent re-injection in subsequent turns
All injected content is appended as system messages to the conversation.
HITL Approval Checkpoints
The approval node auto-detects HITL requirements from:
- Injected skills with known checkpoint mappings (e.g.,
rule37-filing-assemblytriggerspre_filing) - Executed tools (e.g.,
rule37_filing_assemblytriggerspre_filing) - Metadata overrides (
hitl_checkpoint_typein request metadata)
When HITL is required, the engine creates an approval request via the approval service and pauses execution with status: "awaiting_hitl".
Tool Definitions
The engine registers tools for LLM function calling based on agent type:
| Tool | Agent Types | Description |
|---|---|---|
spacing_calculation | rule_37 | Calculate distances to lease lines and offset wells |
offset_well_analysis | rule_37 | Find offset wells requiring notice |
rule37_filing_assembly | rule_37 | Assemble Form W-1 filing package |
good_cause_narrative | rule_37 | Generate good cause justification |
flaring_volume_calc | rule_32 | Calculate flaring volumes against R-32 thresholds |
gas_analysis | rule_32 | Analyze gas composition and pipeline readiness |
rule32_filing_assembly | rule_32 | Assemble Form R-32 package |
emissions_estimate | rule_32, flaring_monitor | Calculate CO2e emissions |
Dependencies
Python Packages
| Package | Version | Purpose |
|---|---|---|
fastapi | ^0.115 | Web framework |
uvicorn | ^0.34 | ASGI server |
langgraph | ^0.4 | StateGraph agent execution engine |
litellm | ^1.60 | LLM routing (OpenAI, Anthropic, etc.) |
sse-starlette | ^2.0 | Server-Sent Events for streaming |
aegis-shared | local | Shared models and DB helpers |
langfuse | ^4.0 | LLM observability / tracing (OTEL-based, optional) |
opentelemetry-{api,sdk,exporter-otlp} | ^1.27 | OTEL plumbing for the litellm langfuse_otel callback |
Service Dependencies
| Service | Protocol | Purpose |
|---|---|---|
| Memory Service (8002) | HTTP | Working memory, episodic memory, injection ledger |
| Knowledge Graph Service (8003) | HTTP | Entity context assembly (Tier 3.5) |
| Approval Service (8004) | HTTP | HITL approval request creation |
| PostgreSQL | TCP | Skill registry, conversation persistence, compliance data |
Configuration
| Environment Variable | Default | Description |
|---|---|---|
ORCHESTRATION_HOST | 0.0.0.0 | Bind address |
ORCHESTRATION_PORT | 8001 | Bind port |
DATABASE_URL | postgresql://aegis:aegis_local@localhost:5432/aegis | PostgreSQL connection |
REDIS_URL | redis://localhost:6379 | Redis connection |
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Kafka brokers |
DEFAULT_LLM_MODEL | gpt-4o | Default LLM model for agent calls |
DEFAULT_MAX_TOKENS_PER_EXECUTION | 100000 | Default token budget per execution |
DEFAULT_MAX_COST_PER_EXECUTION | 5.0 | Default cost budget (USD) per execution |
MAX_GRAPH_ITERATIONS | 20 | Max tool call loop iterations |
MEMORY_SERVICE_URL | http://localhost:8002 | Memory service URL |
KNOWLEDGE_GRAPH_SERVICE_URL | http://localhost:8003 | Knowledge graph service URL |
APPROVAL_SERVICE_URL | http://localhost:8004 | Approval service URL |
EPISODIC_TOP_K | 3 | Number of episodic memories to retrieve |
LANGFUSE_PUBLIC_KEY | (empty) | Langfuse public key — tracing is enabled only when set |
LANGFUSE_SECRET_KEY | (empty) | Langfuse secret key — tracing is enabled only when set |
LANGFUSE_HOST | https://us.cloud.langfuse.com | Langfuse host/region (US default; EU/JP/HIPAA/self-hosted supported) |
Observability (Langfuse Tracing)
The engine ships optional Langfuse tracing, wired in
orchestration/observability.py. It is off by default and activates only
when both LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY are set — otherwise
every tracing helper is a no-op and the service behaves identically.
How it works:
- At startup (
init_tracing()in the FastAPI lifespan) the engine enables the LiteLLMlangfuse_otelcallback for alllitellm.acompletioncalls and initialises the Langfuse SDK (v4, OpenTelemetry-based). - Each agent run is wrapped in a parent span via
trace_agent_run(...). LiteLLM generation spans nest under it automatically: both layers ride the global OTELTracerProviderand share OTEL context, and Langfuse’s span processor exportsgen_ai.*spans by default — yielding one trace per agent run with each LLM call as a nested generation (model, tokens and cost captured automatically).
Trace attributes set:
| Attribute | Source | Why |
|---|---|---|
session_id | conversation_id / checklist_id | Groups multi-turn runs in the Sessions view |
user_id | metadata.user_id | User filtering and cost attribution |
tags | agent type + phase (execute / stream / assessment / workspace-agent) | Per-agent / per-feature analytics |
| input / output | user message + final assistant message | Readable traces without leaking full state |
Instrumented entry points: POST /execute, GET /conversations/{id}/stream,
workspace assessment (GET /workspaces/{id}/stream), and the scoped agent
(POST /workspaces/{id}/items/{idx}/agent).
R34 entity-layer child spans (orchestration/entity_spans.py): the entity
resolution / context-assembly / HITL pipelines emit child_span observations that
nest under the agent-run span, so an auto-select decision and the full HITL flow
are reconstructable from the trace alone. They carry the internal, precise
values (the audit audience, not the LLM) and are no-ops when tracing is off. All
fire orchestration-side — the knowledge-graph service is untraced and OTEL
context doesn’t cross the HTTP boundary, so KG returns the facts (the
resolution_trace block on the resolve response) and the orchestration tool
wrappers + gate/guard emit the spans.
| Span | When | Key attributes |
|---|---|---|
entity_resolve.exact_match | every resolve | match_found, query, entity_types_scope |
entity_resolve.trigram_match | every resolve | top_name_similarity (the calibration signal), candidates_returned |
entity_resolve.proximity_score | resolve w/ candidates | top_proximity_score, context_entity_ids_count |
entity_resolve.composite_score | resolve w/ candidates | name_component / proximity_component / type_hint_component (weighted) |
entity_resolve.outcome | every resolve | outcome (auto_selected / asked_user / no_match), confidence |
context_assemble.walk | context_assemble | entity_id, depth, entities_traversed |
context_assemble.render | context_assemble | token_estimate, truncation_occurred, relationships_truncated_count |
hitl.gate_fired | mutating-capability gate fires | capability_key, tool_call_id, assistant_message_stripped |
hitl.resume_reconcile | resume_guard reconciles a verdict | verdict (approved / rejected / pending / unreadable), reviewer_note_present |
hitl.resume_replay | approved mutation replayed | replay_succeeded, duplicate_emission_suppressed |
The entity_resolve.outcome span is a separate layer from the
platform:log_entity_resolution rules-audit entry — the spans only wrap the
resolve callsite and never re-fire the on_entity_resolve trigger (one audit
entry + one outcome span per auto-select). An approved-but-unexecuted mutation is
the absence of a hitl.resume_replay span next to an approved approval
record. See docs/specs/phases/R34-phase-6-calibration-and-demo-runbook.md.
Running Locally
cd services/orchestration-engine
poetry install
poetry run uvicorn orchestration.main:app --reload --port 8001Seeding Data
The service includes several seeding scripts:
# Seed the skill registry and agent definitions
poetry run python -m orchestration.seed_skills
# Seed checklist templates
poetry run python -m orchestration.seed_checklists
# Seed compliance rules
poetry run python -m orchestration.seed_rules
# Seed demo conversations
poetry run python -m orchestration.seed_conversationsThe orchestration engine requires PostgreSQL to be running for conversation persistence and skill registry access. The memory service and knowledge graph service should be running for full pipeline functionality, but the engine degrades gracefully if they are unavailable.