Event System
AEGIS uses two complementary event systems: Apache Kafka for asynchronous inter-service messaging and Server-Sent Events (SSE) for real-time frontend streaming. Kafka handles backend event propagation (ingestion, entity extraction), while SSE delivers live agent execution updates and workspace assessment progress to the browser.
Kafka
Infrastructure Configuration
Kafka runs as a single-node KRaft cluster (no ZooKeeper) using Confluent Platform 7.6.0, defined in docker-compose.yml:
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: aegis-kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"| Setting | Value | Notes |
|---|---|---|
| Bootstrap servers | localhost:9092 | Single node for local dev |
| Security protocol | PLAINTEXT | No TLS in local dev |
| Replication factor | 1 | Single node — no replication |
| Process roles | broker,controller | Combined mode (KRaft) |
| Data volume | ./docker-volumes/kafka | Persistent across restarts |
Kafka Topics
| Topic | Producer | Consumer(s) | Purpose |
|---|---|---|---|
entity-extraction-worker | Ingestion Service | Knowledge Graph Service | Notify downstream when new entities are extracted from scraped or imported data |
Topics are auto-created by the producer on first publish using KafkaProducerManager.ensure_topic() with a default of 3 partitions and replication factor 1.
Producer / Consumer Pattern
The shared Kafka client lives in shared/src/aegis_shared/db/kafka.py and provides two classes: KafkaProducerManager and KafkaConsumerManager.
KafkaProducerManager
from aegis_shared.db.kafka import KafkaProducerManager
kp = KafkaProducerManager() # reads KAFKA_BOOTSTRAP_SERVERS env
kp.connect()
kp.publish(
topic="entity-extraction-worker",
value={"event_type": "wells_scraped", "data": {...}},
key="wells_scraped", # optional partition key
headers={"source": "ingestion"}, # optional headers
)
kp.flush() # wait for delivery
kp.disconnect()Key behaviors:
publish()serializes the value dict to JSON and encodes to bytes- An optional
keyis encoded to bytes for Kafka partitioning poll(0)is called after each publish to trigger delivery callbacksflush(timeout=10)blocks until all buffered messages are deliveredensure_topic()is a static method that uses the Kafka AdminClient to create a topic if it does not already exist
KafkaConsumerManager
from aegis_shared.db.kafka import KafkaConsumerManager
kc = KafkaConsumerManager(group_id="orchestration-engine")
kc.connect(topics=["entity-extraction-worker"])
def handle(event: dict):
print(event["event_type"], event["data"])
kc.consume(handler=handle, max_messages=100, timeout=1.0)
kc.disconnect()Key behaviors:
- Uses
earliestoffset reset by default (reads from the beginning if no committed offset) - Auto-commit is enabled
- The
consume()method polls in a loop, decoding JSON payloads and passing them to the handler - Set
max_messages=Nonefor indefinite consumption
Ingestion Service Usage
The ingestion service (services/ingestion-service/) is the primary Kafka producer. It publishes events after:
- RRC data scraping — wells, leases, permits, fields, production, flaring authorizations
- CSV import — bulk data from uploaded files
- Entity extraction — extracted entities from unstructured data
Each event has the shape:
{
"event_type": "wells_scraped",
"data": { ... }
}The topic name is configured in ingestion/config.py as ENTITY_EXTRACTION_TOPIC = "entity-extraction-worker".
Kafka is optional for local development. If the broker is unavailable, the ingestion service logs a warning and continues without event publishing.
Server-Sent Events (SSE)
SSE provides real-time, one-directional streaming from the orchestration engine to the frontend. AEGIS uses SSE in two contexts: conversation streaming (LangGraph node events) and workspace assessment streaming (structured workspace events).
Conversation Streaming
Endpoint: GET /conversations/{conversation_id}/stream
This endpoint streams LangGraph execution events as the agent pipeline runs. The frontend connects via the EventSource API and receives events as each graph node completes.
GET /conversations/conv-1/stream?message=Hello&agent_id=rule37-agent&agent_type=rule_37Event Flow
Connection opened
← event: node_end (system_prompt_node)
← event: node_end (memory_node)
← event: node_end (initial_llm_call) ← contains assistant message
← event: node_end (tool_node) ← if tools invoked
← event: node_end (initial_llm_call) ← tool result processed
← event: node_end (skill_select_node)
← event: node_end (skill_inject_node) ← contains skills_injected list
← event: node_end (approval_node)
← event: node_end (synthesis_llm_call) ← contains final assistant message
← event: node_end (output_format)
← event: done ← execution summary
Connection closedEvent Payload
Each node_end event contains:
{
"event": "node_end",
"node": "initial_llm_call",
"data": {
"status": "running",
"tokens_used": 1234,
"message": {
"role": "assistant",
"content": "I'll calculate the spacing..."
},
"skills_injected": []
}
}The done event contains execution summary:
{
"event": "done",
"data": {
"execution_id": "exec-abc-123",
"status": "completed",
"tokens_used": 5678,
"cost_usd": 0.045,
"skills_injected": ["spacing-calculation", "offset-well-analysis"]
}
}Error events are emitted if the pipeline fails or budget is exceeded:
{
"event": "error",
"data": {
"error": "Budget exceeded: 150000 tokens"
}
}Implementation Details
The orchestration engine uses sse-starlette (EventSourceResponse) to wrap an async generator that iterates over agent_graph.astream(state). Each iteration yields a dict keyed by node name with partial state updates. The generator enriches these with message content and skill injection data before serializing to JSON.
from sse_starlette.sse import EventSourceResponse
async def event_generator():
async for event in agent_graph.astream(state):
for node_name, node_output in event.items():
yield {
"event": stream_event.event,
"data": json.dumps(stream_event.model_dump()),
}
return EventSourceResponse(event_generator())Workspace Assessment Streaming
Endpoint: GET /workspaces/{checklist_id}/stream
This endpoint streams structured workspace events during the compliance assessment phase. Unlike conversation streaming (which sends node-level events), workspace streaming sends typed domain events that the frontend renders as specific UI components.
WorkspaceEventType Enum
Defined in orchestration/compliance/workspace/events.py:
| Event Type | Description | UI Component |
|---|---|---|
checklist_item_update | Status change on a checklist item | Checklist panel status icon |
artifact_generated | New document, PDF, or data export created | Artifact list / download link |
data_table_update | Structured tabular data (rows and columns) | Data table with inline editing |
form_field_update | Form field populated or updated | Form preview with color-coded confidence |
validation_result | Pass/fail validation check | Validation checklist with severity |
spatial_update | Geographic/spatial data (well positions, distances) | Map or spatial visualization |
agent_status | Agent processing progress | Progress indicator / status bar |
assessment_complete | Assessment phase finished | Summary card with completion stats |
Event Payload Examples
Checklist Item Update:
{
"event_type": "checklist_item_update",
"item_index": 2,
"item_name": "Offset Well Identification",
"status": "complete",
"summary": "Found 4 offset wells within 1,200 ft",
"completed_by": "agent"
}Data Table Update:
{
"event_type": "data_table_update",
"item_index": 2,
"columns": ["Well Name", "API", "Operator", "Distance (ft)"],
"rows": [
{"Well Name": "Smith #3", "API": "42-329-40003", "Operator": "Devon", "Distance (ft)": 890}
],
"total_rows": 4
}Form Field Update:
{
"event_type": "form_field_update",
"item_index": 5,
"field_name": "operator_name",
"field_value": "Permian Basin Energy LLC",
"confidence": 0.98,
"source": "knowledge_graph:P-5"
}Validation Result:
{
"event_type": "validation_result",
"item_index": 10,
"check_name": "Service list completeness",
"passed": false,
"message": "2 offset operators missing addresses",
"severity": "warning"
}Event Model Registry
Each event type maps to a Pydantic model for serialization/validation:
EVENT_MODELS = {
WorkspaceEventType.CHECKLIST_ITEM_UPDATE: ChecklistItemUpdateEvent,
WorkspaceEventType.ARTIFACT_GENERATED: ArtifactGeneratedEvent,
WorkspaceEventType.DATA_TABLE_UPDATE: DataTableUpdateEvent,
WorkspaceEventType.FORM_FIELD_UPDATE: FormFieldUpdateEvent,
WorkspaceEventType.VALIDATION_RESULT: ValidationResultEvent,
WorkspaceEventType.SPATIAL_UPDATE: SpatialUpdateEvent,
WorkspaceEventType.AGENT_STATUS: AgentStatusEvent,
WorkspaceEventType.ASSESSMENT_COMPLETE: AssessmentCompleteEvent,
}Architecture Summary
┌─────────────────────────────────────────────────────────┐
│ Frontend (Next.js) │
│ │
│ EventSource(/conversations/{id}/stream) │
│ EventSource(/workspaces/{id}/stream) │
└────────────────────┬────────────────────────────────────┘
│ SSE (text/event-stream)
▼
┌─────────────────────────────────────────────────────────┐
│ API Gateway (Go :8000) │
│ Proxies SSE connections through │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Orchestration Engine (Python :8001) │
│ │
│ Conversation SSE ─── LangGraph astream() events │
│ Workspace SSE ────── Structured WorkspaceEventType │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Ingestion Service (Python :8005) │
│ │
│ Scrape/Import → KafkaProducerManager.publish() │
│ Topic: entity-extraction-worker │
└────────────────────┬────────────────────────────────────┘
│ Kafka
▼
┌─────────────────────────────────────────────────────────┐
│ Knowledge Graph Service (Python :8003) │
│ │
│ KafkaConsumerManager → process extracted entities │
└─────────────────────────────────────────────────────────┘SSE connections are long-lived HTTP connections. The API gateway proxies them through to the orchestration engine without buffering. Make sure your reverse proxy or load balancer does not apply response buffering or timeout SSE connections prematurely.
Key Source Files
| File | Purpose |
|---|---|
shared/src/aegis_shared/db/kafka.py | Kafka producer and consumer managers |
services/ingestion-service/src/ingestion/main.py | Kafka producer usage (entity extraction events) |
services/ingestion-service/src/ingestion/config.py | Kafka topic configuration |
services/orchestration-engine/src/orchestration/main.py | Conversation SSE streaming endpoint |
services/orchestration-engine/src/orchestration/compliance/workspace/routes.py | Workspace SSE streaming endpoint |
services/orchestration-engine/src/orchestration/compliance/workspace/events.py | WorkspaceEventType enum and payload models |
services/orchestration-engine/src/orchestration/schemas.py | StreamEvent schema for conversation SSE |