Ingestion Service
The Ingestion Service handles data import into AEGIS from external sources. It provides RRC data scrapers (currently mock implementations reading from CSV sample files), CSV file upload for operator data, SCADA telemetry ingestion, and entity extraction that converts raw records into structured knowledge graph entities.
Overview
Oil and gas regulatory data comes from multiple sources: the Railroad Commission of Texas (RRC) online system, operator-uploaded CSV files, and real-time SCADA field data. The ingestion service normalizes all of these into structured entities ready for insertion into the knowledge graph.
The service also publishes Kafka events for each ingestion operation, enabling downstream consumers (like an entity extraction worker) to process data asynchronously.
Port & Language
| Property | Value |
|---|---|
| Port | 8005 |
| Language | Python 3.12 |
| Framework | FastAPI |
| Entry point | src/ingestion/main.py |
Key Endpoints
| Method | Path | Description |
|---|---|---|
POST | /ingest/rrc-scrape | Run mock RRC scrapers for specified data types and extract entities. |
POST | /ingest/operator/csv | Upload a CSV file and extract entities from it. |
POST | /ingest/scada | Ingest SCADA flaring volume data. |
POST | /extract | Extract structured entities from raw data (generic). |
GET | /health | Health check. Reports Kafka connectivity status. |
Architecture
Module Breakdown
src/ingestion/
├── main.py # FastAPI app, endpoint definitions, Kafka setup
├── config.py # Settings from environment variables
├── csv_import.py # CSV parsing with column normalization
├── rrc_scraper.py # Mock RRC scrapers (reads from CSV sample files)
└── entity_extraction.py # Entity extraction logic for all data typesRRC Scraper (rrc_scraper.py)
The scrapers are currently mock implementations that read from CSV files in data/rrc-samples/. In production, these would scrape the RRC Online System or use the RRC Query API.
Supported data types:
| Scraper Function | CSV File | Entity Type |
|---|---|---|
scrape_wells() | wells.csv | Well |
scrape_leases() | leases.csv | Lease |
scrape_fields() | fields.csv | Field |
scrape_permits() | permits.csv | Permit |
scrape_flaring_authorizations() | flaring_authorizations.csv | FlaringAuthorization |
scrape_production() | production.csv | ProductionReport |
The /ingest/rrc-scrape endpoint accepts a list of data_types to scrape. By default, it scrapes all six types.
CSV Import (csv_import.py)
The CSV importer normalizes column names using configurable mappings. It handles common variations in column naming:
Well columns:
api_number, api, api_no, well_api → api_number
well_name, name, well → well_name
operator_id, operator, op_id → operator_id
...Production columns:
api_number, api → api_number
oil_production_bbl, oil_bbl, oil → oil_production_bbl
flared_mcf, flared, gas_flared → flared_mcf
disposition_code, disp_code → disposition_code
...Numeric fields (those ending in _ft, _bbl, _mcf) are automatically parsed to floats. Unknown data types pass through the raw CSV rows without normalization.
Entity Extraction (entity_extraction.py)
The extract_entities function converts raw records into structured entities grouped by type. Each entity includes:
entity_type— The knowledge graph vertex label (e.g.,Well,Lease,Permit)entity_id— A deterministic ID based on the record’s primary key (e.g.,well-42123456789)properties— A normalized dict of entity properties
Extraction features by data type:
| Data Type | Extracted Entities | Additional Output |
|---|---|---|
wells | Well entities with standardized properties | Implicit Operator references, OPERATED_BY edge definitions |
production | ProductionReport entities | FlaringEvent entities for records with flared_mcf > 0 or vented_mcf > 0 |
leases | Lease entities | |
permits | Permit entities (includes is_rule37_exception flag) | |
flaring_authorizations | FlaringAuthorization entities | |
fields | Field entities |
Kafka Event Publishing
Every ingestion operation publishes an event to the entity-extraction-worker Kafka topic:
| Event Type | Trigger |
|---|---|
rrc.{data_type}.scraped | RRC scraper completed for a data type |
csv.{data_type}.uploaded | CSV file uploaded and parsed |
scada.flaring.ingested | SCADA flaring data ingested |
extract.{data_type} | Generic extraction completed |
Events include metadata like record counts and entity counts. Kafka publishing is best-effort: if Kafka is unavailable, the service logs a warning and continues.
Kafka is optional for the ingestion service. If the Kafka broker is not available at startup, the service runs without event publishing and logs a warning. All other functionality (CSV import, entity extraction) works normally.
Dependencies
Python Packages
| Package | Version | Purpose |
|---|---|---|
fastapi | ^0.115 | Web framework |
uvicorn | ^0.34 | ASGI server |
httpx | ^0.28 | HTTP client (for future KG service integration) |
python-multipart | ^0.0.20 | File upload support for CSV endpoint |
aegis-shared | local | Shared Kafka producer manager |
Infrastructure Dependencies
| Dependency | Required | Purpose |
|---|---|---|
| Apache Kafka | Optional | Event publishing for async processing |
Configuration
| Environment Variable | Default | Description |
|---|---|---|
INGESTION_HOST | 0.0.0.0 | Bind address |
INGESTION_PORT | 8005 | Bind port |
KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Kafka broker addresses |
KNOWLEDGE_GRAPH_SERVICE_URL | http://localhost:8003 | Knowledge graph service URL |
The sample data directory is automatically resolved to {repo_root}/data/rrc-samples/.
Running Locally
cd services/ingestion-service
poetry install
poetry run uvicorn ingestion.main:app --reload --port 8005Testing the RRC Scraper
# Scrape all data types
curl -X POST http://localhost:8005/ingest/rrc-scrape \
-H "Content-Type: application/json" \
-d '{"data_types": ["wells", "leases", "fields", "permits"]}'
# Upload a CSV file
curl -X POST http://localhost:8005/ingest/operator/csv \
-F "file=@data/rrc-samples/wells.csv" \
-F "data_type=wells"The sample CSV files in data/rrc-samples/ must exist for the RRC scraper to return data. These files are part of the repository and contain representative Texas oil and gas data for development.