SSE Streaming
AEGIS uses Server-Sent Events (SSE) for real-time streaming of agent execution progress to the frontend. This page documents the EventSource implementation, event types, the streaming protocol, and fallback behavior.
Overview
When a user sends a message in the conversations page, the frontend opens an SSE connection to the orchestration engine. As the agent processes the request through the LangGraph pipeline, the server emits events at each pipeline node completion. The frontend parses these events to show real-time progress indicators, tool execution status, and the final assistant response.
SSE Connection
The conversations page (src/app/(dashboard)/conversations/page.tsx) establishes SSE connections using the browser’s EventSource API:
const streamUrl = `${API_URLS.orchestration}/conversations/${conversationId}/stream`
+ `?message=${encodeURIComponent(currentMessage)}`
+ `&agent_id=${agent.id}`
+ `&agent_type=${agent.type}`;
const eventSource = new EventSource(streamUrl);The connection is made directly to the orchestration engine (port 8001), not through the API Gateway. The message, agent ID, and agent type are passed as URL query parameters.
The SSE endpoint is GET /conversations/{conversation_id}/stream on the orchestration engine. It uses query parameters rather than a request body because SSE connections are GET-only.
Event Types
The server emits two named SSE event types:
node_end
Fired each time a LangGraph pipeline node completes execution. The payload contains:
{
"node": "initial_llm_call",
"data": {
"message": {
"content": "Here is the analysis...",
"tool_calls": [
{ "function": { "name": "spacing_calculation" } }
]
}
}
}The frontend maps node names to human-readable status labels:
| Node Name | Status Label |
|---|---|
system_prompt_node | Initializing agent… |
memory_node | Loading memory context… |
initial_llm_call | Thinking… |
skill_select_node | Selecting skills… |
skill_inject_node | Injecting skill context… |
approval_node | Checking HITL requirements… |
synthesis_llm_call | Synthesizing response… |
tool_node | Executing tools… |
output_format | Finalizing… |
When a node_end event fires for an LLM call node (initial_llm_call or synthesis_llm_call):
- If
data.message.tool_callsis present, tool status indicators are added to the chat. - If
data.message.contentis present, the assistant response is displayed (or updated if already visible).
done
Fired when the entire pipeline completes:
{
"data": {
"status": "completed"
}
}If status is "awaiting_hitl", a HITL review banner is displayed with a link to the filings page.
Tool Status Display
When tool calls are detected in an LLM response, the frontend displays animated status indicators with human-readable labels:
const TOOL_LABELS: Record<string, string> = {
spacing_calculation: "Calculating spacing distances...",
offset_well_analysis: "Querying knowledge graph for offset wells...",
rule37_filing_assembly: "Assembling Form W-1 filing package...",
good_cause_narrative: "Drafting good cause narrative...",
flaring_volume_calc: "Calculating flaring volumes...",
gas_analysis: "Analyzing gas composition...",
rule32_filing_assembly: "Assembling Form R-32 filing package...",
emissions_estimate: "Estimating CO2e emissions...",
};Each tool status appears as a small inline indicator with a pulsing blue dot and the label text.
Payload Parsing
The SSE data can arrive in two formats due to how sse-starlette serializes payloads:
- Standard JSON:
data: {"node": "...", "data": {...}} - Double-encoded string:
data: "{\"node\": \"...\"}"
The frontend handles both:
const parsePayload = (raw: string): any => {
let parsed = JSON.parse(raw);
// sse-starlette may double-encode: data is a JSON string instead of object
if (typeof parsed === "string") {
parsed = JSON.parse(parsed);
}
return parsed;
};Event Listener Registration
Events are handled via two mechanisms for maximum compatibility:
Named event listeners (primary path)
eventSource.addEventListener("node_end", (e) => {
handleNodeEnd(parsePayload(e.data));
});
eventSource.addEventListener("done", (e) => {
handleDone(parsePayload(e.data));
});Generic message handler (fallback)
For servers that dispatch events without the event: field, the onmessage handler inspects the parsed payload:
eventSource.onmessage = (e) => {
const payload = parsePayload(e.data);
if (payload.event === "node_end") {
handleNodeEnd(payload);
} else if (payload.event === "done") {
handleDone(payload);
}
};Streaming UI Indicators
While streaming is active, the UI shows:
- An animated bouncing dots indicator (three blue dots with staggered
yanimation). - A text label showing the current pipeline stage (e.g., “Thinking…”, “Executing tools…”).
- The input field is disabled to prevent duplicate submissions.
{streaming && (
<div className="flex items-center gap-2 pl-2">
<div className="flex gap-1">
{[0, 1, 2].map((i) => (
<motion.div
key={i}
className="w-1.5 h-1.5 rounded-full bg-blue-400"
animate={{ y: [0, -4, 0] }}
transition={{ repeat: Infinity, duration: 0.6, delay: i * 0.15 }}
/>
))}
</div>
<span className="text-xs text-muted-foreground">{streamStatus}</span>
</div>
)}Message Types
The conversation page handles five message types:
| Role | Description | Rendering |
|---|---|---|
user | User-sent messages | Right-aligned, blue-tinted bubble |
assistant | Agent responses | Left-aligned, muted background |
tool_status | Tool execution indicators | Inline with pulsing blue dot and label |
hitl_banner | HITL approval required | Centered amber banner with “Review” button linking to /filings |
agent_handoff | Suggestion to switch agents | Purple banner with agent details and “Continue” button |
HITL Banner
When an agent execution results in status: "awaiting_hitl", a banner is added to the chat:
<div className="border border-amber-800/40 bg-amber-950/30 px-4 py-3">
<p>Filing package ready for review</p>
<p>pre_filing checkpoint -- requires human review</p>
<Link href="/filings">
<Button>Review</Button>
</Link>
</div>The conversation status is also updated to "awaiting_hitl" and a badge is shown in the conversation list.
Agent Handoff
Agent handoff messages suggest switching to a different agent. They display:
- The suggestion message content.
- The target agent’s color indicator, name, and description.
- A “Continue with {agent}” button that creates a new conversation with the suggested agent and pre-populates context from the handoff.
Fallback: Synchronous Execution
If the SSE connection fails (EventSource error event), the frontend falls back to synchronous execution via POST /execute:
const res = await fetch(`${API_URLS.orchestration}/execute`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
conversation_id: conv.id,
agent_id: conv.agent.id,
agent_type: conv.agent.type,
message: msg,
metadata: {},
}),
});This returns the complete result in a single response. Tool calls and assistant messages are extracted from the response payload and displayed in sequence.
Auth Headers for SSE
SSE connections use the browser’s EventSource API, which does not support custom headers. Authentication headers for SSE connections are obtained separately by reading the cookie directly:
function getAuthHeaders(): Record<string, string> {
if (typeof document === "undefined") return {};
const match = document.cookie.match(/(?:^|; )aegis_token=([^;]*)/);
return match
? { Authorization: `Bearer ${decodeURIComponent(match[1])}` }
: {};
}Since EventSource does not support custom headers, the SSE endpoint on the orchestration engine currently does not validate JWT tokens. In production, SSE connections should be routed through the API Gateway or use a token query parameter.
Conversation Persistence
Conversations and messages are persisted to the backend:
- Load on mount:
GET /api/v1/conversationsthrough the gateway (falls back to direct orchestration engine call). - Load messages:
GET /api/v1/conversations/{id}/messageswhen selecting a conversation. - Create:
POST /api/v1/conversationswithagent_idandconversation_type. - Update title:
PATCH /api/v1/conversations/{id}after the first message.