Skip to Content
Developer DocsContributingCode Conventions

Code Conventions

This page documents the coding standards and patterns used across all AEGIS Python services. Consistency is enforced across the monorepo so that every service follows the same structure, style, and idioms.

Language and Runtime

  • Python 3.12 — pinned via .python-version at the repo root (managed by pyenv)
  • Poetry for dependency management — each service has its own pyproject.toml
  • FastAPI for all HTTP services with async/await throughout
  • Pydantic v2 for all data models
  • Go 1.21+ for the API gateway only

Async/Await

All Python services use async I/O. Every endpoint handler, database call, and inter-service HTTP request must be async:

# Correct @app.get("/health") async def health(): return {"status": "ok", "service": "my-service"} @app.get("/entities/{entity_id}") async def get_entity(entity_id: str): row = await pg.fetchrow("SELECT * FROM entities WHERE id = $1", entity_id) if not row: raise HTTPException(status_code=404, detail="Not found") return dict(row)
# Incorrect -- do not use synchronous database calls @app.get("/entities/{entity_id}") def get_entity(entity_id: str): # Missing async row = pg.fetchrow(...) # This will not work with asyncpg

Pydantic v2 Models

All data models use Pydantic v2. The shared library defines a base class that every model should inherit from:

from aegis_shared.models.common import AegisBase class MyModel(AegisBase): name: str count: int = 0

Base Classes

Base ClassPurposeIncludes
AegisBaseAll modelsfrom_attributes=True, populate_by_name=True, str_strip_whitespace=True
AegisEntityPersistent entitiesid: UUID, created_at, updated_at
TenantEntityTenant-scoped entitiesEverything in AegisEntity plus tenant_id: str

Pydantic v2 Conventions

from pydantic import Field from aegis_shared.models.common import AegisBase, TenantEntity class Well(TenantEntity): # Required fields -- no default value api_number: str well_name: str # Optional fields with defaults status: WellStatus = WellStatus.ACTIVE total_depth_ft: int | None = None # Collections with factory defaults production_history: list[dict] = Field(default_factory=list) # Metadata dicts metadata: dict[str, Any] = Field(default_factory=dict)

Key conventions:

  • Use str | None instead of Optional[str] (Python 3.12 union syntax)
  • Use Field(default_factory=list) for mutable defaults, never = []
  • Use StrEnum for all enumeration types (not plain Enum)
  • Model config is inherited from AegisBase — do not override model_config unless necessary

Type Hints

Type hints are required on all function signatures. Use modern Python 3.12 syntax:

# Correct async def get_vertex(label: str, entity_id: str) -> dict[str, Any] | None: ... async def list_vertices(label: str, limit: int = 50) -> list[dict[str, Any]]: ... def score_well_risk( well: dict[str, Any], deadlines: list[dict] | None = None, production_issues: list[str] | None = None, violation_count: int = 0, ) -> dict[str, Any]: ...
# Incorrect -- avoid old-style type hints from typing import Optional, List, Dict async def get_vertex(label: str, entity_id: str) -> Optional[Dict[str, Any]]: ...

Enumerations

Use StrEnum from the standard library for all enums:

from enum import StrEnum class AgentType(StrEnum): RULE_37 = "rule_37" RULE_32 = "rule_32" COMPLIANCE_MONITOR = "compliance_monitor" FLARING_MONITOR = "flaring_monitor"

StrEnum values serialize to strings automatically in JSON responses, which is essential for API compatibility.

FastAPI Patterns

Application Structure

Every service follows the same module layout:

# src/{service}/main.py from contextlib import asynccontextmanager from fastapi import FastAPI @asynccontextmanager async def lifespan(app: FastAPI): # Initialize connections (postgres, redis, etc.) global pg, redis pg = PostgresPool(dsn=settings.DATABASE_URL) await pg.connect() yield # Cleanup await pg.disconnect() app = FastAPI( title="AEGIS {Service Name}", version="0.1.0", description="...", lifespan=lifespan, )

Health Endpoint

Every service must expose a /health endpoint:

@app.get("/health") async def health(): return {"status": "ok", "service": "my-service-name"}

Error Handling

Use FastAPI’s HTTPException for error responses:

from fastapi import HTTPException @app.get("/entities/{entity_id}") async def get_entity(entity_id: str): row = await pg.fetchrow("SELECT * FROM entities WHERE id = $1", entity_id) if not row: raise HTTPException(status_code=404, detail="Entity not found") return dict(row)

Request/Response Models

Define Pydantic models for request bodies and use response_model for responses:

class CreateEntityRequest(AegisBase): name: str entity_type: str class EntityResponse(AegisBase): id: str name: str created_at: datetime @app.post("/entities", response_model=EntityResponse) async def create_entity(body: CreateEntityRequest): ...

Function Style

Keep functions small and focused. The LangGraph node pattern exemplifies this — each node in the execution graph is a single function with one responsibility:

# Good -- single responsibility async def system_prompt_node(state: GraphState) -> dict: """Load agent config and set system prompt.""" config = await load_agent_config(state["agent_id"]) return {"system_prompt": config.system_prompt} async def memory_node(state: GraphState) -> dict: """Load working memory for the conversation.""" memory = await memory_client.get(state["conversation_id"]) return {"working_memory": memory}
# Bad -- doing too many things in one function async def setup_node(state: GraphState) -> dict: config = await load_agent_config(state["agent_id"]) memory = await memory_client.get(state["conversation_id"]) context = await kg_client.get_context(state["entity_id"]) # ... more setup return { "system_prompt": config.system_prompt, "working_memory": memory, "context": context, }

Naming Conventions

ElementConventionExample
Filessnake_casevolume_tracker.py, risk_scoring.py
ClassesPascalCaseWorkingMemory, GraphCrud, PostgresPool
Functionssnake_caseassess_volume_compliance, scan_deadlines
ConstantsUPPER_SNAKE_CASEVERTEX_LABELS, STATUS_TRANSITIONS
EnumsPascalCase class, UPPER_SNAKE_CASE valuesclass WellStatus(StrEnum): ACTIVE = "active"
API routeskebab-case/working-memory/{id}, /api/v1/auth/token
Database tablessnake_casecompliance_status, filing_checklists
Test filestest_ prefixtest_api.py, test_crud.py
Test classesTest prefixTestCreateApproval, TestVertexCrud

Import Ordering

Follow standard Python import ordering:

# 1. Standard library import json import uuid from datetime import datetime, timezone from typing import Any # 2. Third-party packages import pytest from fastapi import FastAPI, HTTPException from pydantic import Field # 3. Shared library from aegis_shared.models.common import AegisBase, TenantEntity from aegis_shared.db.postgres import PostgresPool from aegis_shared.db.redis import RedisManager # 4. Local service imports from memory.working import WorkingMemory from memory.config import settings

Database Conventions

PostgreSQL Queries

Use parameterized queries with $1, $2, etc. Never use string formatting for SQL:

# Correct row = await pg.fetchrow( "SELECT * FROM compliance_status WHERE entity_id = $1 AND compliance_domain = $2", entity_id, domain, ) # Incorrect -- SQL injection risk row = await pg.fetchrow( f"SELECT * FROM compliance_status WHERE entity_id = '{entity_id}'" )

Apache AGE Cypher Queries

AGE queries require loading the extension and setting the search path:

async with pool.acquire() as conn: await conn.execute("LOAD 'age'") await conn.execute('SET search_path = ag_catalog, "$user", public') result = await conn.fetch( "SELECT * FROM cypher('oilgas', $$ MATCH (w:Well {entity_id: %s}) RETURN w $$) AS (v agtype)", entity_id, )

Service Communication

Services communicate via REST (synchronous) and Kafka events (asynchronous):

# HTTP client for inter-service calls import httpx async def get_well_context(entity_id: str, domain_tags: list[str]) -> dict: async with httpx.AsyncClient() as client: resp = await client.get( f"http://localhost:8003/context/{entity_id}", params={"domain_tags": ",".join(domain_tags)}, ) resp.raise_for_status() return resp.json()

Backend services trust the API gateway for authentication. Inter-service calls (e.g., orchestration engine calling memory service) do not pass JWT tokens — they communicate directly on the internal port.

Configuration

Service configuration uses environment variables, typically accessed through a settings module:

# src/{service}/config.py import os class Settings: HOST: str = os.getenv("HOST", "0.0.0.0") PORT: int = int(os.getenv("PORT", "8002")) DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql://aegis:aegis_local@localhost:5432/aegis") REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379") settings = Settings()

Things to Avoid

  • Do not create a root-level pyproject.toml — each service owns its own
  • Do not use synchronous I/O in FastAPI handlers
  • Do not use Optional[X] — use X | None instead
  • Do not use mutable default arguments — use Field(default_factory=...)
  • Do not hardcode secrets — use .env files locally, HashiCorp Vault in production
  • Do not skip HITL checkpoints — they are mandatory even in development
  • Do not use typing.Dict, typing.List — use built-in dict, list
Last updated on