Skip to Content
Developer DocsServicesIngestion Service

Ingestion Service

The Ingestion Service handles data import into AEGIS from external sources. It provides RRC data scrapers (currently mock implementations reading from CSV sample files), CSV file upload for operator data, SCADA telemetry ingestion, and entity extraction that converts raw records into structured knowledge graph entities.

Overview

Oil and gas regulatory data comes from multiple sources: the Railroad Commission of Texas (RRC) online system, operator-uploaded CSV files, and real-time SCADA field data. The ingestion service normalizes all of these into structured entities ready for insertion into the knowledge graph.

The service also publishes Kafka events for each ingestion operation, enabling downstream consumers (like an entity extraction worker) to process data asynchronously.

Port & Language

PropertyValue
Port8005
LanguagePython 3.12
FrameworkFastAPI
Entry pointsrc/ingestion/main.py

Key Endpoints

MethodPathDescription
POST/ingest/rrc-scrapeRun mock RRC scrapers for specified data types and extract entities.
POST/ingest/operator/csvUpload a CSV file and extract entities from it.
POST/ingest/scadaIngest SCADA flaring volume data.
POST/extractExtract structured entities from raw data (generic).
GET/healthHealth check. Reports Kafka connectivity status.

Architecture

Module Breakdown

src/ingestion/ ├── main.py # FastAPI app, endpoint definitions, Kafka setup ├── config.py # Settings from environment variables ├── csv_import.py # CSV parsing with column normalization ├── rrc_scraper.py # Mock RRC scrapers (reads from CSV sample files) └── entity_extraction.py # Entity extraction logic for all data types

RRC Scraper (rrc_scraper.py)

The scrapers are currently mock implementations that read from CSV files in data/rrc-samples/. In production, these would scrape the RRC Online System or use the RRC Query API.

Supported data types:

Scraper FunctionCSV FileEntity Type
scrape_wells()wells.csvWell
scrape_leases()leases.csvLease
scrape_fields()fields.csvField
scrape_permits()permits.csvPermit
scrape_flaring_authorizations()flaring_authorizations.csvFlaringAuthorization
scrape_production()production.csvProductionReport

The /ingest/rrc-scrape endpoint accepts a list of data_types to scrape. By default, it scrapes all six types.

CSV Import (csv_import.py)

The CSV importer normalizes column names using configurable mappings. It handles common variations in column naming:

Well columns:

api_number, api, api_no, well_api → api_number well_name, name, well → well_name operator_id, operator, op_id → operator_id ...

Production columns:

api_number, api → api_number oil_production_bbl, oil_bbl, oil → oil_production_bbl flared_mcf, flared, gas_flared → flared_mcf disposition_code, disp_code → disposition_code ...

Numeric fields (those ending in _ft, _bbl, _mcf) are automatically parsed to floats. Unknown data types pass through the raw CSV rows without normalization.

Entity Extraction (entity_extraction.py)

The extract_entities function converts raw records into structured entities grouped by type. Each entity includes:

  • entity_type — The knowledge graph vertex label (e.g., Well, Lease, Permit)
  • entity_id — A deterministic ID based on the record’s primary key (e.g., well-42123456789)
  • properties — A normalized dict of entity properties

Extraction features by data type:

Data TypeExtracted EntitiesAdditional Output
wellsWell entities with standardized propertiesImplicit Operator references, OPERATED_BY edge definitions
productionProductionReport entitiesFlaringEvent entities for records with flared_mcf > 0 or vented_mcf > 0
leasesLease entities
permitsPermit entities (includes is_rule37_exception flag)
flaring_authorizationsFlaringAuthorization entities
fieldsField entities

Kafka Event Publishing

Every ingestion operation publishes an event to the entity-extraction-worker Kafka topic:

Event TypeTrigger
rrc.{data_type}.scrapedRRC scraper completed for a data type
csv.{data_type}.uploadedCSV file uploaded and parsed
scada.flaring.ingestedSCADA flaring data ingested
extract.{data_type}Generic extraction completed

Events include metadata like record counts and entity counts. Kafka publishing is best-effort: if Kafka is unavailable, the service logs a warning and continues.

Kafka is optional for the ingestion service. If the Kafka broker is not available at startup, the service runs without event publishing and logs a warning. All other functionality (CSV import, entity extraction) works normally.

Dependencies

Python Packages

PackageVersionPurpose
fastapi^0.115Web framework
uvicorn^0.34ASGI server
httpx^0.28HTTP client (for future KG service integration)
python-multipart^0.0.20File upload support for CSV endpoint
aegis-sharedlocalShared Kafka producer manager

Infrastructure Dependencies

DependencyRequiredPurpose
Apache KafkaOptionalEvent publishing for async processing

Configuration

Environment VariableDefaultDescription
INGESTION_HOST0.0.0.0Bind address
INGESTION_PORT8005Bind port
KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Kafka broker addresses
KNOWLEDGE_GRAPH_SERVICE_URLhttp://localhost:8003Knowledge graph service URL

The sample data directory is automatically resolved to {repo_root}/data/rrc-samples/.

Running Locally

cd services/ingestion-service poetry install poetry run uvicorn ingestion.main:app --reload --port 8005

Testing the RRC Scraper

# Scrape all data types curl -X POST http://localhost:8005/ingest/rrc-scrape \ -H "Content-Type: application/json" \ -d '{"data_types": ["wells", "leases", "fields", "permits"]}' # Upload a CSV file curl -X POST http://localhost:8005/ingest/operator/csv \ -F "file=@data/rrc-samples/wells.csv" \ -F "data_type=wells"

The sample CSV files in data/rrc-samples/ must exist for the RRC scraper to return data. These files are part of the repository and contain representative Texas oil and gas data for development.

Last updated on