Skip to Content
Developer DocsArchitectureLangGraph Pipeline

LangGraph Pipeline

The orchestration engine uses a LangGraph StateGraph to execute agent requests through a deterministic pipeline. Each node performs a specific function and returns a partial state update. Conditional edges route execution based on state values.

Pipeline Overview

START | v system_prompt_node Set the agent's system prompt | v memory_node Fetch working + episodic memory; hydrate pending_mutation | v resume_guard Reconcile a HITL-gated mutation against its approval verdict | +--[approved]---------> tool_node (replay the frozen call) +--[still pending / output_format --> END | can't evaluate] | v initial_llm_call First LLM call (may request tool calls) | +--[has tool calls?]---> tool_node ----+ | ^ | | +------ loop back ----------+ | (a mutating tool trips the before_tool_call HITL gate -> | awaiting_hitl -> output_format -> END) | v skill_select_node Scan LLM output for SKILL_SELECT:{id} patterns | +--[skills found?]---> skill_inject_node ---+ | | +-------------------------------------------+ | v approval_node Check HITL requirements, pause if needed | +--[awaiting_hitl?]---> output_format --> END | v synthesis_llm_call Second LLM call with injected skill context | +--[has tool calls?]---> tool_node ----+ | ^ | | +------ loop back ----------+ | v output_format Finalize status, increment iteration counter | v END

State Schema

The entire pipeline shares a single GraphState TypedDict that flows through every node. Each node reads from and writes to this shared state.

class GraphState(TypedDict, total=False): # Identity execution_id: str conversation_id: str agent_id: str agent_type: str # "rule_37", "rule_32", "flaring_monitor", etc. tenant_id: str # Execution control status: str # "pending", "running", "completed", "awaiting_hitl", "failed" current_node: str | None iteration: int # Guards against infinite tool-call loops error: str | None # Conversation messages: list[dict] # OpenAI-format messages (role, content, tool_calls) system_prompt: str model: str # Default: "gpt-4o" # Budget enforcement tokens_used: int cost_usd: float max_tokens: int # Default: 100,000 max_cost_usd: float # Default: $5.00 # Tool execution pending_tool_calls: list[dict] # Skill injection injected_skill_ids: list[str] injected_entity_ids: list[str] active_skills: dict[str, int] # skill_id -> tier level (2 or 3) selected_skill_ids: list[str] skill_injection_done: bool # Two-phase LLM llm_phase: str # "initial" or "synthesis" # Memory memory_context: dict # Working memory + episodic search results # HITL hitl_required: bool hitl_checkpoint_type: str hitl_approval_id: str | None hitl_reviewer_id: str | None hitl_reviewer_strategy: str # "named_individual" or "role_based" # R34 entity resolution / HITL pause-resume recent_entity_context: list[str] # last-10 entity uuids (FIFO), mirrors Redis pending_mutation: dict | None # a gated mutating tool_call awaiting approval consecutive_failed_resolves: int # force-clarify counter (Rule 2 reads it) # Metadata metadata: dict

Node-by-Node Reference

1. system_prompt_node

File: orchestration/nodes.py

Loads the agent’s root skill (from agent_definitions.root_skill_key, R35 P3a) and then assembles the base system message (R35 P3b, design §6.4) — it no longer reads the per-agent FALLBACK_PROMPTS monolith:

  1. Persona (voice/identity, §6.3 precedence): explicit agent_definition.persona_key → the root skill’s default persona → PLATFORM_DEFAULT_PERSONA. Resolution keys only on the root skill — dependency-skill personas are inert.
  2. USER IDENTITY block: a small === User === block (display name + role from metadata, soft-falling to user_id / "user").
  3. Domain facts ride in separately: the loader injects each loaded skill’s context_templates as their own system messages — this node counts them (for the prompt.assemble span), it never re-injects.

The base message (persona + identity) is inserted at index 0 as the stable cache prefix; memory and the skill context_templates are the variable tail. On the Anthropic path the outgoing payload carries a cache_control breakpoint at the end of the prefix (provider-gated — skipped for OpenAI), so a skill load/unload re-pays the tail but never invalidates the prefix.

General mode is domain-empty by design (§6.5). A general agent (root_skill_key=NULL) loads no skill → no templates → the platform default persona, which carries no O&G/RRC/Texas content. It still refuses domain work (the Step-3 hard-stop). This is the one intentional non-preserving change in R35.

A caller-supplied state.system_prompt still overrides the assembly (back-compat). Spans emitted: agent.resolve_definition (with the resolved persona) and prompt.assemble (persona_key, persona_source, context_template_count, user_identity_injected, token_estimate).

The node also initializes execution tracking fields:

{ "llm_phase": "initial", "skill_injection_done": False, "selected_skill_ids": [], "status": "running", }

2. memory_node

File: orchestration/nodes.py

Makes two HTTP calls to the memory service:

  1. Working memory (GET /working-memory/{conversation_id}): Retrieves the conversation’s scratchpad and extracted entities from Redis
  2. Episodic memory (POST /episodic/search): Performs semantic search over past conversation summaries using the latest user message as the query (returns top 3 results by default)

The retrieved memory is formatted and injected as a system message after the main system prompt:

=== Memory Context === [Working Memory -- Scratchpad] {scratchpad content} [Working Memory -- Extracted Entities] - Well: 42-329-12345 (Mitchell Ranch 1H) - Operator: op-permian-energy (Permian Basin Energy LLC) [Episodic Memory -- Relevant Past Conversations] - (similarity: 0.87) Previously filed Rule 37 exception for Mitchell Ranch 2H...

resume_guard (R34 Phase 5)

File: orchestration/nodes.py

Runs between memory_node and initial_llm_call. If there is no pending_mutation it is a pass-through. Otherwise it reconciles the gated mutation against the approval verdict (get_approval_status → approval-service GET /approvals/{id}):

  • approved → reconstructs the assistant tool_calls message from the frozen args + original tool_call_id, sets pending_tool_calls, and routes to tool_node for a single replay.
  • rejected → drops pending_mutation, appends a system note (with the reviewer comment), routes to initial_llm_call so the model acknowledges the rejection.
  • still pending / can’t evaluate → re-pauses (awaiting_hitloutput_format).

Fail-closed: get_approval_status soft-fails to None on a service error / 404. A gate that cannot read its verdict must deny (re-pause), never proceed — so a None verdict re-pauses rather than dispatching. This is regression-tested.

Because pending_mutation is hydrated in memory_node (warm: carried in state; cold: rehydrated from Redis), the guard sees identical state on warm and cold resume — warm/cold equivalence holds by construction.

3. initial_llm_call / synthesis_llm_call

File: orchestration/nodes.py (shared llm_call function), orchestration/engine.py (wrappers)

Calls the LLM via LiteLLM with budget enforcement. The node:

  1. Calls check_budget() — raises BudgetExceededError if token or cost limits are exceeded
  2. Determines the model (default: gpt-4o)
  3. Assembles the visible tool list (R35 P4): core capabilities + the loaded skills’ exposed tools. core_tool_schemas() always contributes entity_resolve + context_assemble; everything domain-specific arrives from the loaded skills’ llm_visible code blocks (skill_tool_schemas), deduped by name. There is no agent_type-keyed tool list — AGENT_TOOLS / get_tools_for_agent were deleted. The single exception is the R35-FENCE branch for the deferred flaring_monitor / compliance_monitor agents, which keep a hardcoded set via _fence_tools_for() until their skills land. A tools.assemble Langfuse span records the core-vs-skill split (core_capability_count, skill_exposed_tool_count, total_tools_visible, skills_loaded).
  4. Sends the full message history to the LLM
  5. Tracks token usage and cost from the response

If the LLM response includes tool calls, they are stored in pending_tool_calls for the tool node to execute.

The initial_llm_call wrapper sets llm_phase: "initial". The synthesis_llm_call wrapper sets llm_phase: "synthesis" and skips the actual LLM call if no skills were injected.

4. tool_node

File: orchestration/nodes.py

Executes all pending tool calls from the LLM response. For each tool call:

  1. Parses the function name and JSON arguments
  2. Calls the corresponding tool function via execute_tool()
  3. Appends a tool role message with the result

Available tools by provenance (R35 P4 — tools derive from core capabilities + loaded skills, no longer from an agent_type map):

ToolSourceDescription
entity_resolvecoreResolve an entity by name/API/alias (state-aware)
context_assemblecoreAssemble an entity’s dual-view context (state-aware)
spacing_assessmentskill rrc_rule37Sandboxed spacing assessment (replaces the retired spacing_calculation)
offset_well_analysisskill rrc_rule37Find offset wells within regulatory distance
rule37_filing_assemblyskill rrc_rule37Assemble complete Form W-1 filing package
good_cause_narrativeskill rrc_rule37Generate the good cause justification argument
flaring_volume_calcskill rrc_rule32 (fence: flaring_monitor)Calculate flaring volumes against R-32 thresholds
gas_analysisskill rrc_rule32Analyze gas composition and pipeline readiness
rule32_filing_assemblyskill rrc_rule32Assemble Form R-32 filing package
emissions_estimateskill rrc_rule32 (fence: flaring_monitor)Calculate CO2e emissions using EPA factors

Core capabilities (entity_resolve, context_assemble) are always visible regardless of loaded skills. The eight domain tools arrive from the loaded RRC skills as sandbox code blockstool_node routes a call whose name is in the per-conversation skill_code_block_tools manifest to the code-block executor (resolve → validate → jail), whose output carries a {status, message, result} wrapper. spacing_calculation stays reserved in TOOL_REGISTRY (no block may shadow the name) but is never offered — rule_37 reaches spacing via the spacing_assessment block. create_entity (the R34 mutating canary) is no longer universally offered — it rode the deleted AGENT_TOOLS["rule_37"] list; its schema + state-aware dispatch remain, but its HITL-canary role moves to skill rules in R35 P5. The deferred flaring_monitor / compliance_monitor agents load no skills, so their fence-listed flaring_volume_calc / emissions_estimate execute via the direct registry (bare result), unaffected by the code-block takeover.

State-aware tools (entity_resolve, context_assemble, create_entity) dispatch through the STATE_AWARE_TOOL_HANDLERS registry (orchestration/entity_tools.py) rather than execute_tool — they need GraphState to inject context server-side. Each handler returns a declarative StateAwareToolResult (tool output + state/working-memory updates + optional audit trigger) that the loop applies uniformly.

After executing all tools, the node clears pending_tool_calls and increments iteration.

before_tool_call HITL gate (R34 Phase 5)

Before dispatching each tool, tool_node fires the before_tool_call rule trigger. A platform rule (platform:hitl_on_mutating_capability) matches any mutating capability (MUTATING_TOOLS maps tool name → capability_key; create_entity is the only invocable one in R34). When the gate sets hitl_required, the node:

  1. Creates the approval inline via the approval service (tool_node is upstream of approval_node, so it can’t reach that node in-pass — it reuses the same create_approval_request primitive).
  2. Persists pending_mutation to Redis working memory (frozen tool_call_id + args + hitl_approval_id, status="pending") with a generous TTL (PENDING_MUTATION_TTL_SECONDS, default 7 days).
  3. Strips the round — truncates messages back to before the assistant tool_calls message, so the turn never ends with a dangling/unmatched tool_call.
  4. Sets status="awaiting_hitl" and does not dispatch — the modified _after_tool router carries the pause to output_format → END.

On the resume turn, resume_guard (below) reconstructs the frozen call and routes it back through tool_node, which detects the approved-replay (pending_mutation.status == "approved" && matching tool_call_id), skips the gate, dispatches exactly once, and clears pending_mutation. See HITL Pause-and-Resume.

5. skill_select_node

File: orchestration/nodes.py

Scans the latest assistant message for SKILL_SELECT:{skill_id} patterns using regex. Detected skill IDs are stored in selected_skill_ids, excluding any skills already in injected_skill_ids.

The LLM is instructed (via the system prompt) to emit these patterns when it determines specialized context is needed.

6. skill_inject_node

File: orchestration/nodes.py

For each selected skill, performs a multi-tier injection:

  1. Ledger check: Queries the injection ledger (POST /ledger/{id}/check) to skip already-injected skills
  2. Tier 2 — Skill definition: Loads the full skill specification from the PostgreSQL skills table (description, steps, requirements, output format)
  3. Tier 3 — Artifacts: Loads reference documents from the skill_artifacts table (regulatory text, templates, examples)
  4. Tier 3.5 — Graph context: Assembles entity context from the knowledge graph based on the working memory’s well_api or entity_id
  5. Ledger mark: Records the injection in the ledger to prevent future duplicates

Each injected skill becomes a system message with structured sections:

=== Skill Activated: Spacing Calculation === ## Description Calculates the distance from a proposed well... ## Steps 1. Retrieve the subject well location... 2. Identify the lease boundaries... ## Requirements - Well must have surface_location coordinates... ## Output Format Spacing Summary: - Distance to nearest lease line: X ft... ## Entity Context Well: Mitchell Ranch 1H (API: 42-329-12345) Lease: Mitchell Ranch Lease...

See Skill Injection for the full tier architecture.

7. approval_node

File: orchestration/nodes.py

Determines whether the current execution requires HITL approval. It reads two sources:

  1. The hitl_required flag (with hitl_checkpoint_type) set upstream. For Rule 37 / Rule 32 this is set by a skill’s require_hitl SkillRule firing on after_tool_call in tool_node — the rule matches result.tool_name == '<block_key>' (e.g. rule37_filing_assemblypre_filing, good_cause_narrativegood_cause_review, rule32_filing_assemblypre_filing) and its action_parameters.checkpoint_type becomes the label. The before-tool_call mutating-capability gate sets the same flag for create_entity-style mutations.
  2. Metadata override: the request metadata can explicitly set hitl_checkpoint_type (a generic escape hatch).

R35 P5 (HITL cutover): the old CHECKPOINT_TRIGGERS / TOOL_HITL_TRIGGERS auto-detect maps — which keyed HITL on injected-skill slug or executed-tool name inside this node — were deleted. Under “behavior comes from skills,” HITL is now driven by the seeded require_hitl SkillRules on rrc_rule37 / rrc_rule32, so the checkpoints live with the skills, not in node code. Each fire emits a hitl.rule_fire Langfuse span carrying the skill + rule provenance. Deferred agents (flaring_monitor / compliance_monitor) have no live HITL.

When HITL is required, the node:

  1. Builds a state snapshot (messages, skills, entities, token usage, metadata)
  2. Creates an approval request via the approval service (POST /approvals)
  3. Sets status: "awaiting_hitl" which causes the pipeline to exit to output_format without running the synthesis phase

If the approval service is unavailable, the node logs a warning and continues execution with a HITL_WARNING system message.

8. output_format

File: orchestration/nodes.py

Terminal node that finalizes the execution:

  • If status is awaiting_hitl, preserves that status
  • Otherwise, sets status: "completed"
  • Increments the iteration counter

Routing Logic

Conditional edges control the flow between nodes:

After NodeConditionNext Node
resume_guardAwaiting HITL (still pending / can’t evaluate)output_format
resume_guardApproved replay (reconstructed call)tool_node
resume_guardNo pending mutation / rejectedinitial_llm_call
initial_llm_callHas erroroutput_format
initial_llm_callHas pending tool callstool_node
initial_llm_callNo tool callsskill_select_node
tool_nodeStatus is awaiting_hitl (gate fired)output_format
tool_nodeIteration limit reachedoutput_format
tool_nodePhase is “synthesis”synthesis_llm_call
tool_nodePhase is “initial”initial_llm_call
skill_select_nodeSkills selectedskill_inject_node
skill_select_nodeNo skillsapproval_node
skill_inject_nodeAlwaysapproval_node
approval_nodeAwaiting HITLoutput_format
approval_nodeApproved/not neededsynthesis_llm_call
synthesis_llm_callHas erroroutput_format
synthesis_llm_callHas pending tool callstool_node
synthesis_llm_callNo tool callsoutput_format

Budget Enforcement

Every LLM call checks the budget before execution:

def check_budget(state: GraphState) -> None: tokens_used = state.get("tokens_used", 0) cost_usd = state.get("cost_usd", 0.0) max_tokens = state.get("max_tokens", 100_000) max_cost = state.get("max_cost_usd", 5.0) if tokens_used >= max_tokens: raise BudgetExceededError(...) if cost_usd >= max_cost: raise BudgetExceededError(...)

Per-agent budget defaults are defined in the agent YAML configs:

AgentMax TokensMax Cost
Rule 37150,000$10.00
Rule 32100,000$8.00
Compliance Monitor200,000$15.00
Flaring Compliance300,000$20.00

The iteration limit (default: 20) prevents infinite tool-call loops independently of the budget.

The graph is compiled once at module load time (agent_graph = compile_graph()) and reused for all executions. Each execution gets its own copy of the state — there is no shared mutable state between concurrent requests.

API Endpoints

The orchestration engine exposes two execution endpoints:

POST /execute (Synchronous)

Runs the full pipeline and returns the final state. Used for simple request-response interactions.

GET /conversations/{id}/stream (SSE)

Streams node-by-node events as the pipeline executes. Each node completion emits an SSE event:

event: node_end data: {"event":"node_end","node":"memory_node","data":{"status":"running","tokens_used":0}} event: node_end data: {"event":"node_end","node":"initial_llm_call","data":{"status":"running","tokens_used":1234,"message":{"role":"assistant","content":"..."}}} event: done data: {"event":"done","data":{"execution_id":"...","status":"completed","tokens_used":1234,"cost_usd":0.05}}

HITL Pause-and-Resume (R34 Phase 5)

A mutating tool call (e.g. create_entity) is paused for human approval before it runs, then replayed on approval. The mechanism turns the dormant before_tool_call trigger into a real governance gate.

Turn 1 (gate) Turn 2 (resume, after approval) ------------- ------------------------------- initial_llm_call memory_node (hydrates pending_mutation) emits create_entity tool_call | | v v resume_guard tool_node reads verdict via GET /approvals/{id} before_tool_call gate FIRES | - create approval (status=pending) +-- approved -> reconstruct frozen call - persist pending_mutation (Redis) | -> tool_node (replay once, - strip the round (no dangling tc) | clear pending_mutation) - status=awaiting_hitl +-- rejected -> drop + system note | | -> initial_llm_call v +-- pending / -> re-pause output_format -> END can't eval (output_format -> END)

Source of truth. The durable approval record (approval-service) is authoritative — it survives the Redis TTL. The Redis pending_mutation holds only gate state (frozen tool_call_id, args, hitl_approval_id), never the verdict. get_approval_status (its first consumer is resume_guard) reads the verdict fresh each resume.

By-construction guarantees:

  • No dangling tool_call across the turn boundary — the gate strips the round and resume_guard reconstructs the exact call (OpenAI/LiteLLM reject an unmatched assistant tool_calls).
  • Replay determinism — the call is replayed from the frozen args + original tool_call_id, independent of any new model output.
  • Warm/cold equivalencepending_mutation is hydrated in memory_node, so an in-memory resume and a cold (server-restarted) resume reconcile identically.
  • Graph/index consistencycreate_entity routes through KG POST /entities, which mints the derived uuid and write-throughs the name index in the same transaction (R34 0b).

Platform rules (seeded in agent-config-service SEED_PLATFORM_RULES, idempotent — no SQL migration):

Rule keyTriggerConditionAction
platform:hitl_on_mutating_capabilitybefore_tool_calltool.capability_key IN [create_entity, update_entity, link_entities, batch_*]require_hitl
platform:force_clarify_after_failed_resolvesafter_tool_callstate.consecutive_failed_resolves >= 3require_hitl (tenant override → abort)

The force-clarify counter (consecutive_failed_resolves) is orchestration-computed in entity_tools.run_entity_resolve — incremented on a non-decisive resolve, reset on a decisive auto-select.

Last updated on