LangGraph Pipeline
The orchestration engine uses a LangGraph StateGraph to execute agent requests through a deterministic pipeline. Each node performs a specific function and returns a partial state update. Conditional edges route execution based on state values.
Pipeline Overview
START
|
v
system_prompt_node Set the agent's system prompt
|
v
memory_node Fetch working + episodic memory; hydrate pending_mutation
|
v
resume_guard Reconcile a HITL-gated mutation against its approval verdict
|
+--[approved]---------> tool_node (replay the frozen call)
+--[still pending / output_format --> END
| can't evaluate]
|
v
initial_llm_call First LLM call (may request tool calls)
|
+--[has tool calls?]---> tool_node ----+
| ^ |
| +------ loop back ----------+
| (a mutating tool trips the before_tool_call HITL gate ->
| awaiting_hitl -> output_format -> END)
|
v
skill_select_node Scan LLM output for SKILL_SELECT:{id} patterns
|
+--[skills found?]---> skill_inject_node ---+
| |
+-------------------------------------------+
|
v
approval_node Check HITL requirements, pause if needed
|
+--[awaiting_hitl?]---> output_format --> END
|
v
synthesis_llm_call Second LLM call with injected skill context
|
+--[has tool calls?]---> tool_node ----+
| ^ |
| +------ loop back ----------+
|
v
output_format Finalize status, increment iteration counter
|
v
ENDState Schema
The entire pipeline shares a single GraphState TypedDict that flows through every node. Each node reads from and writes to this shared state.
class GraphState(TypedDict, total=False):
# Identity
execution_id: str
conversation_id: str
agent_id: str
agent_type: str # "rule_37", "rule_32", "flaring_monitor", etc.
tenant_id: str
# Execution control
status: str # "pending", "running", "completed", "awaiting_hitl", "failed"
current_node: str | None
iteration: int # Guards against infinite tool-call loops
error: str | None
# Conversation
messages: list[dict] # OpenAI-format messages (role, content, tool_calls)
system_prompt: str
model: str # Default: "gpt-4o"
# Budget enforcement
tokens_used: int
cost_usd: float
max_tokens: int # Default: 100,000
max_cost_usd: float # Default: $5.00
# Tool execution
pending_tool_calls: list[dict]
# Skill injection
injected_skill_ids: list[str]
injected_entity_ids: list[str]
active_skills: dict[str, int] # skill_id -> tier level (2 or 3)
selected_skill_ids: list[str]
skill_injection_done: bool
# Two-phase LLM
llm_phase: str # "initial" or "synthesis"
# Memory
memory_context: dict # Working memory + episodic search results
# HITL
hitl_required: bool
hitl_checkpoint_type: str
hitl_approval_id: str | None
hitl_reviewer_id: str | None
hitl_reviewer_strategy: str # "named_individual" or "role_based"
# R34 entity resolution / HITL pause-resume
recent_entity_context: list[str] # last-10 entity uuids (FIFO), mirrors Redis
pending_mutation: dict | None # a gated mutating tool_call awaiting approval
consecutive_failed_resolves: int # force-clarify counter (Rule 2 reads it)
# Metadata
metadata: dictNode-by-Node Reference
1. system_prompt_node
File: orchestration/nodes.py
Loads the agent’s root skill (from agent_definitions.root_skill_key, R35 P3a) and then assembles the base system message (R35 P3b, design §6.4) — it no longer reads the per-agent FALLBACK_PROMPTS monolith:
- Persona (voice/identity, §6.3 precedence): explicit
agent_definition.persona_key→ the root skill’s default persona →PLATFORM_DEFAULT_PERSONA. Resolution keys only on the root skill — dependency-skill personas are inert. - USER IDENTITY block: a small
=== User ===block (display name + role frommetadata, soft-falling touser_id/"user"). - Domain facts ride in separately: the loader injects each loaded skill’s
context_templatesas their own system messages — this node counts them (for theprompt.assemblespan), it never re-injects.
The base message (persona + identity) is inserted at index 0 as the stable cache prefix; memory and the skill context_templates are the variable tail. On the Anthropic path the outgoing payload carries a cache_control breakpoint at the end of the prefix (provider-gated — skipped for OpenAI), so a skill load/unload re-pays the tail but never invalidates the prefix.
General mode is domain-empty by design (§6.5). A
generalagent (root_skill_key=NULL) loads no skill → no templates → the platform default persona, which carries no O&G/RRC/Texas content. It still refuses domain work (the Step-3 hard-stop). This is the one intentional non-preserving change in R35.
A caller-supplied state.system_prompt still overrides the assembly (back-compat). Spans emitted: agent.resolve_definition (with the resolved persona) and prompt.assemble (persona_key, persona_source, context_template_count, user_identity_injected, token_estimate).
The node also initializes execution tracking fields:
{
"llm_phase": "initial",
"skill_injection_done": False,
"selected_skill_ids": [],
"status": "running",
}2. memory_node
File: orchestration/nodes.py
Makes two HTTP calls to the memory service:
- Working memory (
GET /working-memory/{conversation_id}): Retrieves the conversation’s scratchpad and extracted entities from Redis - Episodic memory (
POST /episodic/search): Performs semantic search over past conversation summaries using the latest user message as the query (returns top 3 results by default)
The retrieved memory is formatted and injected as a system message after the main system prompt:
=== Memory Context ===
[Working Memory -- Scratchpad]
{scratchpad content}
[Working Memory -- Extracted Entities]
- Well: 42-329-12345 (Mitchell Ranch 1H)
- Operator: op-permian-energy (Permian Basin Energy LLC)
[Episodic Memory -- Relevant Past Conversations]
- (similarity: 0.87) Previously filed Rule 37 exception for Mitchell Ranch 2H...resume_guard (R34 Phase 5)
File: orchestration/nodes.py
Runs between memory_node and initial_llm_call. If there is no pending_mutation it is a pass-through. Otherwise it reconciles the gated mutation against the approval verdict (get_approval_status → approval-service GET /approvals/{id}):
- approved → reconstructs the assistant
tool_callsmessage from the frozen args + originaltool_call_id, setspending_tool_calls, and routes totool_nodefor a single replay. - rejected → drops
pending_mutation, appends a system note (with the reviewer comment), routes toinitial_llm_callso the model acknowledges the rejection. - still pending / can’t evaluate → re-pauses (
awaiting_hitl→output_format).
Fail-closed: get_approval_status soft-fails to None on a service error / 404. A gate that cannot read its verdict must deny (re-pause), never proceed — so a None verdict re-pauses rather than dispatching. This is regression-tested.
Because pending_mutation is hydrated in memory_node (warm: carried in state; cold: rehydrated from Redis), the guard sees identical state on warm and cold resume — warm/cold equivalence holds by construction.
3. initial_llm_call / synthesis_llm_call
File: orchestration/nodes.py (shared llm_call function), orchestration/engine.py (wrappers)
Calls the LLM via LiteLLM with budget enforcement. The node:
- Calls
check_budget()— raisesBudgetExceededErrorif token or cost limits are exceeded - Determines the model (default:
gpt-4o) - Assembles the visible tool list (R35 P4): core capabilities + the loaded skills’ exposed tools.
core_tool_schemas()always contributesentity_resolve+context_assemble; everything domain-specific arrives from the loaded skills’llm_visiblecode blocks (skill_tool_schemas), deduped by name. There is noagent_type-keyed tool list —AGENT_TOOLS/get_tools_for_agentwere deleted. The single exception is theR35-FENCEbranch for the deferredflaring_monitor/compliance_monitoragents, which keep a hardcoded set via_fence_tools_for()until their skills land. Atools.assembleLangfuse span records the core-vs-skill split (core_capability_count,skill_exposed_tool_count,total_tools_visible,skills_loaded). - Sends the full message history to the LLM
- Tracks token usage and cost from the response
If the LLM response includes tool calls, they are stored in pending_tool_calls for the tool node to execute.
The initial_llm_call wrapper sets llm_phase: "initial". The synthesis_llm_call wrapper sets llm_phase: "synthesis" and skips the actual LLM call if no skills were injected.
4. tool_node
File: orchestration/nodes.py
Executes all pending tool calls from the LLM response. For each tool call:
- Parses the function name and JSON arguments
- Calls the corresponding tool function via
execute_tool() - Appends a
toolrole message with the result
Available tools by provenance (R35 P4 — tools derive from core capabilities + loaded skills, no longer from an agent_type map):
| Tool | Source | Description |
|---|---|---|
entity_resolve | core | Resolve an entity by name/API/alias (state-aware) |
context_assemble | core | Assemble an entity’s dual-view context (state-aware) |
spacing_assessment | skill rrc_rule37 | Sandboxed spacing assessment (replaces the retired spacing_calculation) |
offset_well_analysis | skill rrc_rule37 | Find offset wells within regulatory distance |
rule37_filing_assembly | skill rrc_rule37 | Assemble complete Form W-1 filing package |
good_cause_narrative | skill rrc_rule37 | Generate the good cause justification argument |
flaring_volume_calc | skill rrc_rule32 (fence: flaring_monitor) | Calculate flaring volumes against R-32 thresholds |
gas_analysis | skill rrc_rule32 | Analyze gas composition and pipeline readiness |
rule32_filing_assembly | skill rrc_rule32 | Assemble Form R-32 filing package |
emissions_estimate | skill rrc_rule32 (fence: flaring_monitor) | Calculate CO2e emissions using EPA factors |
Core capabilities (entity_resolve, context_assemble) are always visible regardless of loaded skills. The eight domain tools arrive from the loaded RRC skills as sandbox code blocks — tool_node routes a call whose name is in the per-conversation skill_code_block_tools manifest to the code-block executor (resolve → validate → jail), whose output carries a {status, message, result} wrapper. spacing_calculation stays reserved in TOOL_REGISTRY (no block may shadow the name) but is never offered — rule_37 reaches spacing via the spacing_assessment block. create_entity (the R34 mutating canary) is no longer universally offered — it rode the deleted AGENT_TOOLS["rule_37"] list; its schema + state-aware dispatch remain, but its HITL-canary role moves to skill rules in R35 P5. The deferred flaring_monitor / compliance_monitor agents load no skills, so their fence-listed flaring_volume_calc / emissions_estimate execute via the direct registry (bare result), unaffected by the code-block takeover.
State-aware tools (entity_resolve, context_assemble, create_entity) dispatch through the STATE_AWARE_TOOL_HANDLERS registry (orchestration/entity_tools.py) rather than execute_tool — they need GraphState to inject context server-side. Each handler returns a declarative StateAwareToolResult (tool output + state/working-memory updates + optional audit trigger) that the loop applies uniformly.
After executing all tools, the node clears pending_tool_calls and increments iteration.
before_tool_call HITL gate (R34 Phase 5)
Before dispatching each tool, tool_node fires the before_tool_call rule trigger. A platform rule (platform:hitl_on_mutating_capability) matches any mutating capability (MUTATING_TOOLS maps tool name → capability_key; create_entity is the only invocable one in R34). When the gate sets hitl_required, the node:
- Creates the approval inline via the approval service (
tool_nodeis upstream ofapproval_node, so it can’t reach that node in-pass — it reuses the samecreate_approval_requestprimitive). - Persists
pending_mutationto Redis working memory (frozentool_call_id+ args +hitl_approval_id,status="pending") with a generous TTL (PENDING_MUTATION_TTL_SECONDS, default 7 days). - Strips the round — truncates
messagesback to before the assistanttool_callsmessage, so the turn never ends with a dangling/unmatchedtool_call. - Sets
status="awaiting_hitl"and does not dispatch — the modified_after_toolrouter carries the pause tooutput_format → END.
On the resume turn, resume_guard (below) reconstructs the frozen call and routes it back through tool_node, which detects the approved-replay (pending_mutation.status == "approved" && matching tool_call_id), skips the gate, dispatches exactly once, and clears pending_mutation. See HITL Pause-and-Resume.
5. skill_select_node
File: orchestration/nodes.py
Scans the latest assistant message for SKILL_SELECT:{skill_id} patterns using regex. Detected skill IDs are stored in selected_skill_ids, excluding any skills already in injected_skill_ids.
The LLM is instructed (via the system prompt) to emit these patterns when it determines specialized context is needed.
6. skill_inject_node
File: orchestration/nodes.py
For each selected skill, performs a multi-tier injection:
- Ledger check: Queries the injection ledger (
POST /ledger/{id}/check) to skip already-injected skills - Tier 2 — Skill definition: Loads the full skill specification from the PostgreSQL
skillstable (description, steps, requirements, output format) - Tier 3 — Artifacts: Loads reference documents from the
skill_artifactstable (regulatory text, templates, examples) - Tier 3.5 — Graph context: Assembles entity context from the knowledge graph based on the working memory’s
well_apiorentity_id - Ledger mark: Records the injection in the ledger to prevent future duplicates
Each injected skill becomes a system message with structured sections:
=== Skill Activated: Spacing Calculation ===
## Description
Calculates the distance from a proposed well...
## Steps
1. Retrieve the subject well location...
2. Identify the lease boundaries...
## Requirements
- Well must have surface_location coordinates...
## Output Format
Spacing Summary:
- Distance to nearest lease line: X ft...
## Entity Context
Well: Mitchell Ranch 1H (API: 42-329-12345)
Lease: Mitchell Ranch Lease...See Skill Injection for the full tier architecture.
7. approval_node
File: orchestration/nodes.py
Determines whether the current execution requires HITL approval. It reads two sources:
- The
hitl_requiredflag (withhitl_checkpoint_type) set upstream. For Rule 37 / Rule 32 this is set by a skill’srequire_hitlSkillRule firing onafter_tool_callintool_node— the rule matchesresult.tool_name == '<block_key>'(e.g.rule37_filing_assembly→pre_filing,good_cause_narrative→good_cause_review,rule32_filing_assembly→pre_filing) and itsaction_parameters.checkpoint_typebecomes the label. The before-tool_callmutating-capability gate sets the same flag forcreate_entity-style mutations. - Metadata override: the request metadata can explicitly set
hitl_checkpoint_type(a generic escape hatch).
R35 P5 (HITL cutover): the old
CHECKPOINT_TRIGGERS/TOOL_HITL_TRIGGERSauto-detect maps — which keyed HITL on injected-skill slug or executed-tool name inside this node — were deleted. Under “behavior comes from skills,” HITL is now driven by the seededrequire_hitlSkillRules onrrc_rule37/rrc_rule32, so the checkpoints live with the skills, not in node code. Each fire emits ahitl.rule_fireLangfuse span carrying the skill + rule provenance. Deferred agents (flaring_monitor/compliance_monitor) have no live HITL.
When HITL is required, the node:
- Builds a state snapshot (messages, skills, entities, token usage, metadata)
- Creates an approval request via the approval service (
POST /approvals) - Sets
status: "awaiting_hitl"which causes the pipeline to exit tooutput_formatwithout running the synthesis phase
If the approval service is unavailable, the node logs a warning and continues execution with a HITL_WARNING system message.
8. output_format
File: orchestration/nodes.py
Terminal node that finalizes the execution:
- If
statusisawaiting_hitl, preserves that status - Otherwise, sets
status: "completed" - Increments the iteration counter
Routing Logic
Conditional edges control the flow between nodes:
| After Node | Condition | Next Node |
|---|---|---|
resume_guard | Awaiting HITL (still pending / can’t evaluate) | output_format |
resume_guard | Approved replay (reconstructed call) | tool_node |
resume_guard | No pending mutation / rejected | initial_llm_call |
initial_llm_call | Has error | output_format |
initial_llm_call | Has pending tool calls | tool_node |
initial_llm_call | No tool calls | skill_select_node |
tool_node | Status is awaiting_hitl (gate fired) | output_format |
tool_node | Iteration limit reached | output_format |
tool_node | Phase is “synthesis” | synthesis_llm_call |
tool_node | Phase is “initial” | initial_llm_call |
skill_select_node | Skills selected | skill_inject_node |
skill_select_node | No skills | approval_node |
skill_inject_node | Always | approval_node |
approval_node | Awaiting HITL | output_format |
approval_node | Approved/not needed | synthesis_llm_call |
synthesis_llm_call | Has error | output_format |
synthesis_llm_call | Has pending tool calls | tool_node |
synthesis_llm_call | No tool calls | output_format |
Budget Enforcement
Every LLM call checks the budget before execution:
def check_budget(state: GraphState) -> None:
tokens_used = state.get("tokens_used", 0)
cost_usd = state.get("cost_usd", 0.0)
max_tokens = state.get("max_tokens", 100_000)
max_cost = state.get("max_cost_usd", 5.0)
if tokens_used >= max_tokens:
raise BudgetExceededError(...)
if cost_usd >= max_cost:
raise BudgetExceededError(...)Per-agent budget defaults are defined in the agent YAML configs:
| Agent | Max Tokens | Max Cost |
|---|---|---|
| Rule 37 | 150,000 | $10.00 |
| Rule 32 | 100,000 | $8.00 |
| Compliance Monitor | 200,000 | $15.00 |
| Flaring Compliance | 300,000 | $20.00 |
The iteration limit (default: 20) prevents infinite tool-call loops independently of the budget.
The graph is compiled once at module load time (agent_graph = compile_graph()) and reused for all executions. Each execution gets its own copy of the state — there is no shared mutable state between concurrent requests.
API Endpoints
The orchestration engine exposes two execution endpoints:
POST /execute (Synchronous)
Runs the full pipeline and returns the final state. Used for simple request-response interactions.
GET /conversations/{id}/stream (SSE)
Streams node-by-node events as the pipeline executes. Each node completion emits an SSE event:
event: node_end
data: {"event":"node_end","node":"memory_node","data":{"status":"running","tokens_used":0}}
event: node_end
data: {"event":"node_end","node":"initial_llm_call","data":{"status":"running","tokens_used":1234,"message":{"role":"assistant","content":"..."}}}
event: done
data: {"event":"done","data":{"execution_id":"...","status":"completed","tokens_used":1234,"cost_usd":0.05}}HITL Pause-and-Resume (R34 Phase 5)
A mutating tool call (e.g. create_entity) is paused for human approval before it runs, then replayed on approval. The mechanism turns the dormant before_tool_call trigger into a real governance gate.
Turn 1 (gate) Turn 2 (resume, after approval)
------------- -------------------------------
initial_llm_call memory_node (hydrates pending_mutation)
emits create_entity tool_call |
| v
v resume_guard
tool_node reads verdict via GET /approvals/{id}
before_tool_call gate FIRES |
- create approval (status=pending) +-- approved -> reconstruct frozen call
- persist pending_mutation (Redis) | -> tool_node (replay once,
- strip the round (no dangling tc) | clear pending_mutation)
- status=awaiting_hitl +-- rejected -> drop + system note
| | -> initial_llm_call
v +-- pending / -> re-pause
output_format -> END can't eval (output_format -> END)Source of truth. The durable approval record (approval-service) is authoritative — it survives the Redis TTL. The Redis pending_mutation holds only gate state (frozen tool_call_id, args, hitl_approval_id), never the verdict. get_approval_status (its first consumer is resume_guard) reads the verdict fresh each resume.
By-construction guarantees:
- No dangling
tool_callacross the turn boundary — the gate strips the round andresume_guardreconstructs the exact call (OpenAI/LiteLLM reject an unmatched assistanttool_calls). - Replay determinism — the call is replayed from the frozen args + original
tool_call_id, independent of any new model output. - Warm/cold equivalence —
pending_mutationis hydrated inmemory_node, so an in-memory resume and a cold (server-restarted) resume reconcile identically. - Graph/index consistency —
create_entityroutes through KGPOST /entities, which mints the deriveduuidand write-throughs the name index in the same transaction (R34 0b).
Platform rules (seeded in agent-config-service SEED_PLATFORM_RULES, idempotent — no SQL migration):
| Rule key | Trigger | Condition | Action |
|---|---|---|---|
platform:hitl_on_mutating_capability | before_tool_call | tool.capability_key IN [create_entity, update_entity, link_entities, batch_*] | require_hitl |
platform:force_clarify_after_failed_resolves | after_tool_call | state.consecutive_failed_resolves >= 3 | require_hitl (tenant override → abort) |
The force-clarify counter (consecutive_failed_resolves) is orchestration-computed in entity_tools.run_entity_resolve — incremented on a non-decisive resolve, reset on a decisive auto-select.