Skip to Content

SSE Streaming

AEGIS uses Server-Sent Events (SSE) for real-time streaming of agent execution progress to the frontend. This page documents the EventSource implementation, event types, the streaming protocol, and fallback behavior.

Overview

When a user sends a message in the conversations page, the frontend opens an SSE connection to the orchestration engine. As the agent processes the request through the LangGraph pipeline, the server emits events at each pipeline node completion. The frontend parses these events to show real-time progress indicators, tool execution status, and the final assistant response.

SSE Connection

The conversations page (src/app/(dashboard)/conversations/page.tsx) establishes SSE connections using the browser’s EventSource API:

const streamUrl = `${API_URLS.orchestration}/conversations/${conversationId}/stream` + `?message=${encodeURIComponent(currentMessage)}` + `&agent_id=${agent.id}` + `&agent_type=${agent.type}`; const eventSource = new EventSource(streamUrl);

The connection is made directly to the orchestration engine (port 8001), not through the API Gateway. The message, agent ID, and agent type are passed as URL query parameters.

The SSE endpoint is GET /conversations/{conversation_id}/stream on the orchestration engine. It uses query parameters rather than a request body because SSE connections are GET-only.

Event Types

The server emits two named SSE event types:

node_end

Fired each time a LangGraph pipeline node completes execution. The payload contains:

{ "node": "initial_llm_call", "data": { "message": { "content": "Here is the analysis...", "tool_calls": [ { "function": { "name": "spacing_calculation" } } ] } } }

The frontend maps node names to human-readable status labels:

Node NameStatus Label
system_prompt_nodeInitializing agent…
memory_nodeLoading memory context…
initial_llm_callThinking…
skill_select_nodeSelecting skills…
skill_inject_nodeInjecting skill context…
approval_nodeChecking HITL requirements…
synthesis_llm_callSynthesizing response…
tool_nodeExecuting tools…
output_formatFinalizing…

When a node_end event fires for an LLM call node (initial_llm_call or synthesis_llm_call):

  • If data.message.tool_calls is present, tool status indicators are added to the chat.
  • If data.message.content is present, the assistant response is displayed (or updated if already visible).

done

Fired when the entire pipeline completes:

{ "data": { "status": "completed" } }

If status is "awaiting_hitl", a HITL review banner is displayed with a link to the filings page.

Tool Status Display

When tool calls are detected in an LLM response, the frontend displays animated status indicators with human-readable labels:

const TOOL_LABELS: Record<string, string> = { spacing_calculation: "Calculating spacing distances...", offset_well_analysis: "Querying knowledge graph for offset wells...", rule37_filing_assembly: "Assembling Form W-1 filing package...", good_cause_narrative: "Drafting good cause narrative...", flaring_volume_calc: "Calculating flaring volumes...", gas_analysis: "Analyzing gas composition...", rule32_filing_assembly: "Assembling Form R-32 filing package...", emissions_estimate: "Estimating CO2e emissions...", };

Each tool status appears as a small inline indicator with a pulsing blue dot and the label text.

Payload Parsing

The SSE data can arrive in two formats due to how sse-starlette serializes payloads:

  1. Standard JSON: data: {"node": "...", "data": {...}}
  2. Double-encoded string: data: "{\"node\": \"...\"}"

The frontend handles both:

const parsePayload = (raw: string): any => { let parsed = JSON.parse(raw); // sse-starlette may double-encode: data is a JSON string instead of object if (typeof parsed === "string") { parsed = JSON.parse(parsed); } return parsed; };

Event Listener Registration

Events are handled via two mechanisms for maximum compatibility:

Named event listeners (primary path)

eventSource.addEventListener("node_end", (e) => { handleNodeEnd(parsePayload(e.data)); }); eventSource.addEventListener("done", (e) => { handleDone(parsePayload(e.data)); });

Generic message handler (fallback)

For servers that dispatch events without the event: field, the onmessage handler inspects the parsed payload:

eventSource.onmessage = (e) => { const payload = parsePayload(e.data); if (payload.event === "node_end") { handleNodeEnd(payload); } else if (payload.event === "done") { handleDone(payload); } };

Streaming UI Indicators

While streaming is active, the UI shows:

  • An animated bouncing dots indicator (three blue dots with staggered y animation).
  • A text label showing the current pipeline stage (e.g., “Thinking…”, “Executing tools…”).
  • The input field is disabled to prevent duplicate submissions.
{streaming && ( <div className="flex items-center gap-2 pl-2"> <div className="flex gap-1"> {[0, 1, 2].map((i) => ( <motion.div key={i} className="w-1.5 h-1.5 rounded-full bg-blue-400" animate={{ y: [0, -4, 0] }} transition={{ repeat: Infinity, duration: 0.6, delay: i * 0.15 }} /> ))} </div> <span className="text-xs text-muted-foreground">{streamStatus}</span> </div> )}

Message Types

The conversation page handles five message types:

RoleDescriptionRendering
userUser-sent messagesRight-aligned, blue-tinted bubble
assistantAgent responsesLeft-aligned, muted background
tool_statusTool execution indicatorsInline with pulsing blue dot and label
hitl_bannerHITL approval requiredCentered amber banner with “Review” button linking to /filings
agent_handoffSuggestion to switch agentsPurple banner with agent details and “Continue” button

HITL Banner

When an agent execution results in status: "awaiting_hitl", a banner is added to the chat:

<div className="border border-amber-800/40 bg-amber-950/30 px-4 py-3"> <p>Filing package ready for review</p> <p>pre_filing checkpoint -- requires human review</p> <Link href="/filings"> <Button>Review</Button> </Link> </div>

The conversation status is also updated to "awaiting_hitl" and a badge is shown in the conversation list.

Agent Handoff

Agent handoff messages suggest switching to a different agent. They display:

  • The suggestion message content.
  • The target agent’s color indicator, name, and description.
  • A “Continue with {agent}” button that creates a new conversation with the suggested agent and pre-populates context from the handoff.

Fallback: Synchronous Execution

If the SSE connection fails (EventSource error event), the frontend falls back to synchronous execution via POST /execute:

const res = await fetch(`${API_URLS.orchestration}/execute`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ conversation_id: conv.id, agent_id: conv.agent.id, agent_type: conv.agent.type, message: msg, metadata: {}, }), });

This returns the complete result in a single response. Tool calls and assistant messages are extracted from the response payload and displayed in sequence.

Auth Headers for SSE

SSE connections use the browser’s EventSource API, which does not support custom headers. Authentication headers for SSE connections are obtained separately by reading the cookie directly:

function getAuthHeaders(): Record<string, string> { if (typeof document === "undefined") return {}; const match = document.cookie.match(/(?:^|; )aegis_token=([^;]*)/); return match ? { Authorization: `Bearer ${decodeURIComponent(match[1])}` } : {}; }

Since EventSource does not support custom headers, the SSE endpoint on the orchestration engine currently does not validate JWT tokens. In production, SSE connections should be routed through the API Gateway or use a token query parameter.

Conversation Persistence

Conversations and messages are persisted to the backend:

  • Load on mount: GET /api/v1/conversations through the gateway (falls back to direct orchestration engine call).
  • Load messages: GET /api/v1/conversations/{id}/messages when selecting a conversation.
  • Create: POST /api/v1/conversations with agent_id and conversation_type.
  • Update title: PATCH /api/v1/conversations/{id} after the first message.
Last updated on