Code Conventions
This page documents the coding standards and patterns used across all AEGIS Python services. Consistency is enforced across the monorepo so that every service follows the same structure, style, and idioms.
Language and Runtime
- Python 3.12 — pinned via
.python-versionat the repo root (managed by pyenv) - Poetry for dependency management — each service has its own
pyproject.toml - FastAPI for all HTTP services with
async/awaitthroughout - Pydantic v2 for all data models
- Go 1.21+ for the API gateway only
Async/Await
All Python services use async I/O. Every endpoint handler, database call, and inter-service HTTP request must be async:
# Correct
@app.get("/health")
async def health():
return {"status": "ok", "service": "my-service"}
@app.get("/entities/{entity_id}")
async def get_entity(entity_id: str):
row = await pg.fetchrow("SELECT * FROM entities WHERE id = $1", entity_id)
if not row:
raise HTTPException(status_code=404, detail="Not found")
return dict(row)# Incorrect -- do not use synchronous database calls
@app.get("/entities/{entity_id}")
def get_entity(entity_id: str): # Missing async
row = pg.fetchrow(...) # This will not work with asyncpgPydantic v2 Models
All data models use Pydantic v2. The shared library defines a base class that every model should inherit from:
from aegis_shared.models.common import AegisBase
class MyModel(AegisBase):
name: str
count: int = 0Base Classes
| Base Class | Purpose | Includes |
|---|---|---|
AegisBase | All models | from_attributes=True, populate_by_name=True, str_strip_whitespace=True |
AegisEntity | Persistent entities | id: UUID, created_at, updated_at |
TenantEntity | Tenant-scoped entities | Everything in AegisEntity plus tenant_id: str |
Pydantic v2 Conventions
from pydantic import Field
from aegis_shared.models.common import AegisBase, TenantEntity
class Well(TenantEntity):
# Required fields -- no default value
api_number: str
well_name: str
# Optional fields with defaults
status: WellStatus = WellStatus.ACTIVE
total_depth_ft: int | None = None
# Collections with factory defaults
production_history: list[dict] = Field(default_factory=list)
# Metadata dicts
metadata: dict[str, Any] = Field(default_factory=dict)Key conventions:
- Use
str | Noneinstead ofOptional[str](Python 3.12 union syntax) - Use
Field(default_factory=list)for mutable defaults, never= [] - Use
StrEnumfor all enumeration types (not plainEnum) - Model config is inherited from
AegisBase— do not overridemodel_configunless necessary
Type Hints
Type hints are required on all function signatures. Use modern Python 3.12 syntax:
# Correct
async def get_vertex(label: str, entity_id: str) -> dict[str, Any] | None:
...
async def list_vertices(label: str, limit: int = 50) -> list[dict[str, Any]]:
...
def score_well_risk(
well: dict[str, Any],
deadlines: list[dict] | None = None,
production_issues: list[str] | None = None,
violation_count: int = 0,
) -> dict[str, Any]:
...# Incorrect -- avoid old-style type hints
from typing import Optional, List, Dict
async def get_vertex(label: str, entity_id: str) -> Optional[Dict[str, Any]]:
...Enumerations
Use StrEnum from the standard library for all enums:
from enum import StrEnum
class AgentType(StrEnum):
RULE_37 = "rule_37"
RULE_32 = "rule_32"
COMPLIANCE_MONITOR = "compliance_monitor"
FLARING_MONITOR = "flaring_monitor"StrEnum values serialize to strings automatically in JSON responses, which is essential for API compatibility.
FastAPI Patterns
Application Structure
Every service follows the same module layout:
# src/{service}/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize connections (postgres, redis, etc.)
global pg, redis
pg = PostgresPool(dsn=settings.DATABASE_URL)
await pg.connect()
yield
# Cleanup
await pg.disconnect()
app = FastAPI(
title="AEGIS {Service Name}",
version="0.1.0",
description="...",
lifespan=lifespan,
)Health Endpoint
Every service must expose a /health endpoint:
@app.get("/health")
async def health():
return {"status": "ok", "service": "my-service-name"}Error Handling
Use FastAPI’s HTTPException for error responses:
from fastapi import HTTPException
@app.get("/entities/{entity_id}")
async def get_entity(entity_id: str):
row = await pg.fetchrow("SELECT * FROM entities WHERE id = $1", entity_id)
if not row:
raise HTTPException(status_code=404, detail="Entity not found")
return dict(row)Request/Response Models
Define Pydantic models for request bodies and use response_model for responses:
class CreateEntityRequest(AegisBase):
name: str
entity_type: str
class EntityResponse(AegisBase):
id: str
name: str
created_at: datetime
@app.post("/entities", response_model=EntityResponse)
async def create_entity(body: CreateEntityRequest):
...Function Style
Keep functions small and focused. The LangGraph node pattern exemplifies this — each node in the execution graph is a single function with one responsibility:
# Good -- single responsibility
async def system_prompt_node(state: GraphState) -> dict:
"""Load agent config and set system prompt."""
config = await load_agent_config(state["agent_id"])
return {"system_prompt": config.system_prompt}
async def memory_node(state: GraphState) -> dict:
"""Load working memory for the conversation."""
memory = await memory_client.get(state["conversation_id"])
return {"working_memory": memory}# Bad -- doing too many things in one function
async def setup_node(state: GraphState) -> dict:
config = await load_agent_config(state["agent_id"])
memory = await memory_client.get(state["conversation_id"])
context = await kg_client.get_context(state["entity_id"])
# ... more setup
return {
"system_prompt": config.system_prompt,
"working_memory": memory,
"context": context,
}Naming Conventions
| Element | Convention | Example |
|---|---|---|
| Files | snake_case | volume_tracker.py, risk_scoring.py |
| Classes | PascalCase | WorkingMemory, GraphCrud, PostgresPool |
| Functions | snake_case | assess_volume_compliance, scan_deadlines |
| Constants | UPPER_SNAKE_CASE | VERTEX_LABELS, STATUS_TRANSITIONS |
| Enums | PascalCase class, UPPER_SNAKE_CASE values | class WellStatus(StrEnum): ACTIVE = "active" |
| API routes | kebab-case | /working-memory/{id}, /api/v1/auth/token |
| Database tables | snake_case | compliance_status, filing_checklists |
| Test files | test_ prefix | test_api.py, test_crud.py |
| Test classes | Test prefix | TestCreateApproval, TestVertexCrud |
Import Ordering
Follow standard Python import ordering:
# 1. Standard library
import json
import uuid
from datetime import datetime, timezone
from typing import Any
# 2. Third-party packages
import pytest
from fastapi import FastAPI, HTTPException
from pydantic import Field
# 3. Shared library
from aegis_shared.models.common import AegisBase, TenantEntity
from aegis_shared.db.postgres import PostgresPool
from aegis_shared.db.redis import RedisManager
# 4. Local service imports
from memory.working import WorkingMemory
from memory.config import settingsDatabase Conventions
PostgreSQL Queries
Use parameterized queries with $1, $2, etc. Never use string formatting for SQL:
# Correct
row = await pg.fetchrow(
"SELECT * FROM compliance_status WHERE entity_id = $1 AND compliance_domain = $2",
entity_id, domain,
)
# Incorrect -- SQL injection risk
row = await pg.fetchrow(
f"SELECT * FROM compliance_status WHERE entity_id = '{entity_id}'"
)Apache AGE Cypher Queries
AGE queries require loading the extension and setting the search path:
async with pool.acquire() as conn:
await conn.execute("LOAD 'age'")
await conn.execute('SET search_path = ag_catalog, "$user", public')
result = await conn.fetch(
"SELECT * FROM cypher('oilgas', $$ MATCH (w:Well {entity_id: %s}) RETURN w $$) AS (v agtype)",
entity_id,
)Service Communication
Services communicate via REST (synchronous) and Kafka events (asynchronous):
# HTTP client for inter-service calls
import httpx
async def get_well_context(entity_id: str, domain_tags: list[str]) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"http://localhost:8003/context/{entity_id}",
params={"domain_tags": ",".join(domain_tags)},
)
resp.raise_for_status()
return resp.json()Backend services trust the API gateway for authentication. Inter-service calls (e.g., orchestration engine calling memory service) do not pass JWT tokens — they communicate directly on the internal port.
Configuration
Service configuration uses environment variables, typically accessed through a settings module:
# src/{service}/config.py
import os
class Settings:
HOST: str = os.getenv("HOST", "0.0.0.0")
PORT: int = int(os.getenv("PORT", "8002"))
DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql://aegis:aegis_local@localhost:5432/aegis")
REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379")
settings = Settings()Things to Avoid
- Do not create a root-level
pyproject.toml— each service owns its own - Do not use synchronous I/O in FastAPI handlers
- Do not use
Optional[X]— useX | Noneinstead - Do not use mutable default arguments — use
Field(default_factory=...) - Do not hardcode secrets — use
.envfiles locally, HashiCorp Vault in production - Do not skip HITL checkpoints — they are mandatory even in development
- Do not use
typing.Dict,typing.List— use built-indict,list