Skip to Content

Event System

AEGIS uses two complementary event systems: Apache Kafka for asynchronous inter-service messaging and Server-Sent Events (SSE) for real-time frontend streaming. Kafka handles backend event propagation (ingestion, entity extraction), while SSE delivers live agent execution updates and workspace assessment progress to the browser.

Kafka

Infrastructure Configuration

Kafka runs as a single-node KRaft cluster (no ZooKeeper) using Confluent Platform 7.6.0, defined in docker-compose.yml:

kafka: image: confluentinc/cp-kafka:7.6.0 container_name: aegis-kafka ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
SettingValueNotes
Bootstrap serverslocalhost:9092Single node for local dev
Security protocolPLAINTEXTNo TLS in local dev
Replication factor1Single node — no replication
Process rolesbroker,controllerCombined mode (KRaft)
Data volume./docker-volumes/kafkaPersistent across restarts

Kafka Topics

TopicProducerConsumer(s)Purpose
entity-extraction-workerIngestion ServiceKnowledge Graph ServiceNotify downstream when new entities are extracted from scraped or imported data

Topics are auto-created by the producer on first publish using KafkaProducerManager.ensure_topic() with a default of 3 partitions and replication factor 1.

Producer / Consumer Pattern

The shared Kafka client lives in shared/src/aegis_shared/db/kafka.py and provides two classes: KafkaProducerManager and KafkaConsumerManager.

KafkaProducerManager

from aegis_shared.db.kafka import KafkaProducerManager kp = KafkaProducerManager() # reads KAFKA_BOOTSTRAP_SERVERS env kp.connect() kp.publish( topic="entity-extraction-worker", value={"event_type": "wells_scraped", "data": {...}}, key="wells_scraped", # optional partition key headers={"source": "ingestion"}, # optional headers ) kp.flush() # wait for delivery kp.disconnect()

Key behaviors:

  • publish() serializes the value dict to JSON and encodes to bytes
  • An optional key is encoded to bytes for Kafka partitioning
  • poll(0) is called after each publish to trigger delivery callbacks
  • flush(timeout=10) blocks until all buffered messages are delivered
  • ensure_topic() is a static method that uses the Kafka AdminClient to create a topic if it does not already exist

KafkaConsumerManager

from aegis_shared.db.kafka import KafkaConsumerManager kc = KafkaConsumerManager(group_id="orchestration-engine") kc.connect(topics=["entity-extraction-worker"]) def handle(event: dict): print(event["event_type"], event["data"]) kc.consume(handler=handle, max_messages=100, timeout=1.0) kc.disconnect()

Key behaviors:

  • Uses earliest offset reset by default (reads from the beginning if no committed offset)
  • Auto-commit is enabled
  • The consume() method polls in a loop, decoding JSON payloads and passing them to the handler
  • Set max_messages=None for indefinite consumption

Ingestion Service Usage

The ingestion service (services/ingestion-service/) is the primary Kafka producer. It publishes events after:

  1. RRC data scraping — wells, leases, permits, fields, production, flaring authorizations
  2. CSV import — bulk data from uploaded files
  3. Entity extraction — extracted entities from unstructured data

Each event has the shape:

{ "event_type": "wells_scraped", "data": { ... } }

The topic name is configured in ingestion/config.py as ENTITY_EXTRACTION_TOPIC = "entity-extraction-worker".

Kafka is optional for local development. If the broker is unavailable, the ingestion service logs a warning and continues without event publishing.

Server-Sent Events (SSE)

SSE provides real-time, one-directional streaming from the orchestration engine to the frontend. AEGIS uses SSE in two contexts: conversation streaming (LangGraph node events) and workspace assessment streaming (structured workspace events).

Conversation Streaming

Endpoint: GET /conversations/{conversation_id}/stream

This endpoint streams LangGraph execution events as the agent pipeline runs. The frontend connects via the EventSource API and receives events as each graph node completes.

GET /conversations/conv-1/stream?message=Hello&agent_id=rule37-agent&agent_type=rule_37

Event Flow

Connection opened ← event: node_end (system_prompt_node) ← event: node_end (memory_node) ← event: node_end (initial_llm_call) ← contains assistant message ← event: node_end (tool_node) ← if tools invoked ← event: node_end (initial_llm_call) ← tool result processed ← event: node_end (skill_select_node) ← event: node_end (skill_inject_node) ← contains skills_injected list ← event: node_end (approval_node) ← event: node_end (synthesis_llm_call) ← contains final assistant message ← event: node_end (output_format) ← event: done ← execution summary Connection closed

Event Payload

Each node_end event contains:

{ "event": "node_end", "node": "initial_llm_call", "data": { "status": "running", "tokens_used": 1234, "message": { "role": "assistant", "content": "I'll calculate the spacing..." }, "skills_injected": [] } }

The done event contains execution summary:

{ "event": "done", "data": { "execution_id": "exec-abc-123", "status": "completed", "tokens_used": 5678, "cost_usd": 0.045, "skills_injected": ["spacing-calculation", "offset-well-analysis"] } }

Error events are emitted if the pipeline fails or budget is exceeded:

{ "event": "error", "data": { "error": "Budget exceeded: 150000 tokens" } }

Implementation Details

The orchestration engine uses sse-starlette (EventSourceResponse) to wrap an async generator that iterates over agent_graph.astream(state). Each iteration yields a dict keyed by node name with partial state updates. The generator enriches these with message content and skill injection data before serializing to JSON.

from sse_starlette.sse import EventSourceResponse async def event_generator(): async for event in agent_graph.astream(state): for node_name, node_output in event.items(): yield { "event": stream_event.event, "data": json.dumps(stream_event.model_dump()), } return EventSourceResponse(event_generator())

Workspace Assessment Streaming

Endpoint: GET /workspaces/{checklist_id}/stream

This endpoint streams structured workspace events during the compliance assessment phase. Unlike conversation streaming (which sends node-level events), workspace streaming sends typed domain events that the frontend renders as specific UI components.

WorkspaceEventType Enum

Defined in orchestration/compliance/workspace/events.py:

Event TypeDescriptionUI Component
checklist_item_updateStatus change on a checklist itemChecklist panel status icon
artifact_generatedNew document, PDF, or data export createdArtifact list / download link
data_table_updateStructured tabular data (rows and columns)Data table with inline editing
form_field_updateForm field populated or updatedForm preview with color-coded confidence
validation_resultPass/fail validation checkValidation checklist with severity
spatial_updateGeographic/spatial data (well positions, distances)Map or spatial visualization
agent_statusAgent processing progressProgress indicator / status bar
assessment_completeAssessment phase finishedSummary card with completion stats

Event Payload Examples

Checklist Item Update:

{ "event_type": "checklist_item_update", "item_index": 2, "item_name": "Offset Well Identification", "status": "complete", "summary": "Found 4 offset wells within 1,200 ft", "completed_by": "agent" }

Data Table Update:

{ "event_type": "data_table_update", "item_index": 2, "columns": ["Well Name", "API", "Operator", "Distance (ft)"], "rows": [ {"Well Name": "Smith #3", "API": "42-329-40003", "Operator": "Devon", "Distance (ft)": 890} ], "total_rows": 4 }

Form Field Update:

{ "event_type": "form_field_update", "item_index": 5, "field_name": "operator_name", "field_value": "Permian Basin Energy LLC", "confidence": 0.98, "source": "knowledge_graph:P-5" }

Validation Result:

{ "event_type": "validation_result", "item_index": 10, "check_name": "Service list completeness", "passed": false, "message": "2 offset operators missing addresses", "severity": "warning" }

Event Model Registry

Each event type maps to a Pydantic model for serialization/validation:

EVENT_MODELS = { WorkspaceEventType.CHECKLIST_ITEM_UPDATE: ChecklistItemUpdateEvent, WorkspaceEventType.ARTIFACT_GENERATED: ArtifactGeneratedEvent, WorkspaceEventType.DATA_TABLE_UPDATE: DataTableUpdateEvent, WorkspaceEventType.FORM_FIELD_UPDATE: FormFieldUpdateEvent, WorkspaceEventType.VALIDATION_RESULT: ValidationResultEvent, WorkspaceEventType.SPATIAL_UPDATE: SpatialUpdateEvent, WorkspaceEventType.AGENT_STATUS: AgentStatusEvent, WorkspaceEventType.ASSESSMENT_COMPLETE: AssessmentCompleteEvent, }

Architecture Summary

┌─────────────────────────────────────────────────────────┐ │ Frontend (Next.js) │ │ │ │ EventSource(/conversations/{id}/stream) │ │ EventSource(/workspaces/{id}/stream) │ └────────────────────┬────────────────────────────────────┘ │ SSE (text/event-stream) ┌─────────────────────────────────────────────────────────┐ │ API Gateway (Go :8000) │ │ Proxies SSE connections through │ └────────────────────┬────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────┐ │ Orchestration Engine (Python :8001) │ │ │ │ Conversation SSE ─── LangGraph astream() events │ │ Workspace SSE ────── Structured WorkspaceEventType │ └─────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────┐ │ Ingestion Service (Python :8005) │ │ │ │ Scrape/Import → KafkaProducerManager.publish() │ │ Topic: entity-extraction-worker │ └────────────────────┬────────────────────────────────────┘ │ Kafka ┌─────────────────────────────────────────────────────────┐ │ Knowledge Graph Service (Python :8003) │ │ │ │ KafkaConsumerManager → process extracted entities │ └─────────────────────────────────────────────────────────┘

SSE connections are long-lived HTTP connections. The API gateway proxies them through to the orchestration engine without buffering. Make sure your reverse proxy or load balancer does not apply response buffering or timeout SSE connections prematurely.

Key Source Files

FilePurpose
shared/src/aegis_shared/db/kafka.pyKafka producer and consumer managers
services/ingestion-service/src/ingestion/main.pyKafka producer usage (entity extraction events)
services/ingestion-service/src/ingestion/config.pyKafka topic configuration
services/orchestration-engine/src/orchestration/main.pyConversation SSE streaming endpoint
services/orchestration-engine/src/orchestration/compliance/workspace/routes.pyWorkspace SSE streaming endpoint
services/orchestration-engine/src/orchestration/compliance/workspace/events.pyWorkspaceEventType enum and payload models
services/orchestration-engine/src/orchestration/schemas.pyStreamEvent schema for conversation SSE
Last updated on