diff --git a/electricity_price_predictor/README.md b/electricity_price_predictor/README.md new file mode 100644 index 0000000..911158f --- /dev/null +++ b/electricity_price_predictor/README.md @@ -0,0 +1,164 @@ +# Electricity Price Predictor + +Standalone module for ENTSO-E ingestion and feature-store creation in `quant_db`. + +## Documentation + +- High-level architecture: `docs/architecture.md` +- Detailed developer documentation (file-by-file + UML): `docs/developer_guide.md` + +## What it builds + +- Input columns: + - `day_ahead_price` + - `load_forecast` + - `wind_forecast` + - `solar_forecast` +- Derived columns: + - `residual_load` + - `lagged_price` (`t-1` ... `t-24`, stored as array length 24) + - `lagged_residual_load` (`t-1` ... `t-24`, stored as array length 24) + - `hour_of_day_sin`, `hour_of_day_cos` + - `weekday_sin`, `weekday_cos` + - `month_sin`, `month_cos` + +## Column units (data dictionary) + +All timestamps are hourly and stored in UTC (`delivery_start`). + +- `day_ahead_price` + - Unit: `EUR/MWh` (euros per megawatt-hour). +- `load_forecast` + - Unit: `MW` (megawatts). +- `wind_forecast` + - Unit: `MW` (megawatts). +- `solar_forecast` + - Unit: `MW` (megawatts). +- `residual_load = load_forecast - wind_forecast - solar_forecast` + - Unit: `MW` (megawatts). +- `lagged_price` (array of 24 values for `t-1..t-24`) + - Unit: `EUR/MWh`. +- `lagged_residual_load` (array of 24 values for `t-1..t-24`) + - Unit: `MW`. +- `hour_of_day_sin`, `hour_of_day_cos` + - Unit: dimensionless in `[-1, 1]`. +- `weekday_sin`, `weekday_cos` + - Unit: dimensionless in `[-1, 1]`. +- `month_sin`, `month_cos` + - Unit: dimensionless in `[-1, 1]`. + +## Missing-data semantics + +The pipeline intentionally distinguishes **missing** from **measured zero**: + +- Forecast columns (`load_forecast`, `wind_forecast`, `solar_forecast`) remain `NaN` when ENTSO-E has no value. +- `residual_load` is computed directly from source columns and remains `NaN` when any source component is missing. +- Feature engineering drops rows only for lag warmup requirements (`day_ahead_price` and `lagged_price_1..24`), not for every nullable forecast column. +- During DB persistence to `electricity_price_features`, rows with nulls in NOT NULL core columns are skipped (to satisfy schema constraints) while still being available in the returned in-memory DataFrame. + +## Data contracts + +### In-memory contract (`run_feature_pipeline(...)` return value) + +- Index: + - Type: timezone-aware `DatetimeIndex` + - Granularity: hourly + - Timezone: UTC + - Uniqueness: unique timestamps expected +- Columns: + - Base signals: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast` + - Derived: `residual_load` + - Lag vectors (expanded): `lagged_price_1..24`, `lagged_residual_load_1..24` + - Cyclical: `hour_of_day_sin`, `hour_of_day_cos`, `weekday_sin`, `weekday_cos`, `month_sin`, `month_cos` +- Nullability: + - `day_ahead_price`: required for returned rows. + - `lagged_price_1..24`: required for returned rows. + - `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`: nullable (`NaN` allowed). + +### Persistence contract (`electricity_price_features`) + +- Persisted row key: + - (`country_code`, `delivery_start`, `feature_version`) +- NOT NULL core columns in schema: + - `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load` + - `lagged_price`, `lagged_residual_load` + - cyclical columns (`hour_of_day_*`, `weekday_*`, `month_*`) +- Write behavior: + - The persistence layer filters out non-conforming rows (nulls in NOT NULL core columns) before UPSERT. + - Persisted lag arrays are fixed length 24 and map to `t-1..t-24`. + +### Raw observations contract (`electricity_market_observations`) + +- Indexing/key semantics: + - one row per (`country_code`, `delivery_start`) +- Update semantics: + - partial refreshes do not overwrite existing non-null values with nulls (`COALESCE` merge policy) + +## Country-code and API behavior notes + +- Use ENTSO-E bidding-zone identifiers (for example `DE_LU` rather than `DE`) when querying across all endpoints. +- The pipeline includes a bidding-zone resolver for common country aliases: + - `DE -> DE_LU` + - `IT -> IT_NORD` +- Persistence uses the resolved bidding-zone code so DB keys match the queried market zone. +- Some ENTSO-E endpoints may return no matches for specific windows/countries. These are handled as empty hourly frames for that endpoint rather than hard-failing the whole fetch. +- Wind/solar responses can include duplicate semantic columns after normalization; the service coalesces duplicates by taking the first non-null value per timestamp. + +## Database objects + +`sql/001_electricity_price_schema.sql` creates: + +- `entsoe_api_cache`: generic decorator cache table (pickled payloads, TTL support) +- `electricity_market_observations`: raw hourly ENTSO-E observations +- `electricity_price_features`: model-ready feature store + +## Setup + +```bash +cd electricity_price_predictor +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +Set env vars: + +```bash +export ENTSOE_API_KEY="your_entsoe_key" +export QUANT_DB_HOST="localhost" +export QUANT_DB_PORT="5432" +export QUANT_DB_NAME="quant_db" +export QUANT_DB_USER="quant_user" +export QUANT_DB_PASSWORD="strong_password" +``` + +## Initialize schema + +```bash +PYTHONPATH=src python scripts/init_db.py +``` + +## Build feature store + +```bash +PYTHONPATH=src python scripts/build_feature_store.py \ + --country-code DE_LU \ + --start 2026-01-01T00:00:00Z \ + --end 2026-02-01T00:00:00Z \ + --cache-ttl-hours 24 +``` + +## Decorator caching behavior + +The `cache_to_db` decorator: + +- hashes function name + arguments into deterministic `cache_key` +- checks `entsoe_api_cache` first +- returns cached payload if key exists and not expired +- otherwise uses `electricity_market_observations` as secondary cache at timestamp level +- only calls ENTSO-E for missing hourly intervals, then upserts those rows +- stores final returned object in `entsoe_api_cache` + +This gives a two-layer cache: +1) fast function-result cache (`entsoe_api_cache`) and +2) canonical timestamp cache (`electricity_market_observations`). diff --git a/electricity_price_predictor/docs/architecture.md b/electricity_price_predictor/docs/architecture.md new file mode 100644 index 0000000..dc64b9e --- /dev/null +++ b/electricity_price_predictor/docs/architecture.md @@ -0,0 +1,73 @@ +# Architecture Notes + +This document is the quick architecture map. For full file-by-file implementation details, see `docs/developer_guide.md`. + +## End-to-end data flow + +1. `scripts/build_feature_store.py` parses CLI arguments and validates env vars. +2. It calls `pipeline.run_feature_pipeline(...)`. +3. `EntsoeDataService.fetch_inputs(...)` loads: + - `day_ahead_price` + - `load_forecast` + - `wind_forecast` + - `solar_forecast` +4. Each ENTSO-E call is wrapped by `cache_to_db(...)` and either: + - serves a hit from `entsoe_api_cache`, or + - falls back to `electricity_market_observations` for already-known timestamps, + - and performs API calls only for missing hourly intervals. +5. Missing intervals returned from API are upserted into `electricity_market_observations`. +6. The final merged result is cached in `entsoe_api_cache`. +7. Raw merged series are upserted to `electricity_market_observations`. +8. `features.build_feature_frame(...)` computes: + - `residual_load` + - lagged arrays (24 values each) + - cyclical encodings for hour/weekday/month + - preserves `NaN` for missing forecast-derived values. +9. `pipeline.persist_feature_frame(...)` upserts model-ready rows to `electricity_price_features`. + - filters out rows that violate feature-table NOT NULL constraints. + +## Process diagram + +```mermaid +flowchart TD + A[build_feature_store.py CLI] --> B[run_feature_pipeline] + B --> C[EntsoeDataService.fetch_inputs] + C --> D{Hit in entsoe_api_cache?} + D -->|Yes| E[Load payload from entsoe_api_cache] + D -->|No| F[Read electricity_market_observations] + F --> G{Missing hourly timestamps?} + G -->|No| H[Reuse DB observation rows] + G -->|Yes| I[Call ENTSO-E only for missing ranges] + I --> I2{NoMatchingDataError?} + I2 -->|Yes| I3[Use empty hourly frame for endpoint] + I2 -->|No| I4[Normalize payload] + I4 --> I5[Coalesce duplicate columns by first non-null] + I3 --> J[Upsert missing rows to electricity_market_observations] + I5 --> J + H --> K[Build merged input DataFrame] + J --> K + K --> L[Store payload in entsoe_api_cache] + E --> M[Use cached input DataFrame] + L --> N[Upsert electricity_market_observations] + M --> N + N --> O[build_feature_frame] + O --> P[Create lags + cyclical features] + P --> P2[Preserve NaN in forecast-derived columns] + P2 --> P3[Drop rows missing day_ahead_price or lagged_price_1..24] + P3 --> Q[Upsert persistable subset into electricity_price_features] +``` + +## Key design reasons + +- DB cache avoids repeated ENTSO-E calls during iterative model work. +- Observation-table fallback avoids re-fetching timestamps already persisted once. +- Pickled payloads preserve exact pandas object shape and index information. +- Feature table stores fixed-size lag arrays so one row corresponds to one prediction timestamp. +- Missing forecasts are kept as `NaN` in analysis outputs, avoiding misleading zero-imputation. +- Persistence layer enforces schema compatibility by skipping rows with nulls in NOT NULL feature columns. + +## Extension points + +- Add label/target tables (`t+1`, `t+24`, etc.). +- Add training metadata + model registry tables. +- Add partitioning strategy for multi-year production-scale data. diff --git a/electricity_price_predictor/docs/developer_guide.md b/electricity_price_predictor/docs/developer_guide.md new file mode 100644 index 0000000..57d5e3d --- /dev/null +++ b/electricity_price_predictor/docs/developer_guide.md @@ -0,0 +1,362 @@ +# Developer Guide (Deep Dive) + +This guide explains each file in the module, execution order, control flow, and data/state transitions so you can reason about behavior without reading source code. + +## 1) Directory map and responsibilities + +### Top-level + +- `requirements.txt` + - Python dependencies for ingestion and DB persistence. +- `README.md` + - Operator-focused setup and run commands. +- `sql/001_electricity_price_schema.sql` + - DDL for cache, raw observations, and feature store. +- `scripts/init_db.py` + - Applies the SQL schema to `quant_db`. +- `scripts/build_feature_store.py` + - CLI entrypoint for data fetch + feature persistence. +- `docs/architecture.md` + - High-level architecture summary. +- `docs/developer_guide.md` + - This detailed developer-facing explanation. + +### Python package (`src/electricity_price_predictor`) + +- `__init__.py` + - Public package exports (`get_engine`, `EntsoeDataService`, `build_feature_frame`). +- `db.py` + - Builds DB URL from env vars and creates SQLAlchemy `Engine`. +- `cache.py` + - Implements decorator-based DB cache with deterministic keying. +- `entsoe_api.py` + - Wraps ENTSO-E API calls, normalizes data, and writes raw observations. +- `features.py` + - Pure feature engineering logic (residual load, lags, cyclical encoding). +- `pipeline.py` + - Orchestration layer for end-to-end fetch -> raw persist -> feature build -> feature persist. + +## 2) Runtime execution path (step-by-step) + +When you run: + +```bash +PYTHONPATH=src python3 scripts/build_feature_store.py --country-code ... --start ... --end ... +``` + +Execution sequence: + +1. **Argument parsing** + - `build_feature_store.py` reads country code/time range/TTL. +2. **Credential/connection bootstrap** + - checks `ENTSOE_API_KEY`. + - calls `get_engine()` from `db.py`. +3. **Pipeline orchestration** + - `run_feature_pipeline(...)` in `pipeline.py` starts. +4. **API service creation** + - initializes `EntsoePandasClient`. + - creates `EntsoeDataService(client, engine, cache_ttl_hours)`. +5. **Decorator wrapping** + - in `EntsoeDataService.__post_init__`, API methods are wrapped by `cache_to_db(...)`. +6. **Data retrieval** + - `fetch_inputs(...)` calls: + - `get_day_ahead_prices(...)` + - `get_load_forecast(...)` + - `get_wind_solar_forecast(...)` + - country aliases are normalized to bidding zones before queries (currently `DE -> DE_LU`, `IT -> IT_NORD`). +7. **Cache check/compute loop (per call)** + - decorator computes hash key from function + args. + - if non-expired row exists in `entsoe_api_cache`: returns payload. + - else: reads `electricity_market_observations` for requested timestamps. + - if timestamps are missing there, only missing hourly ranges are requested from ENTSO-E. + - `NoMatchingDataError` from ENTSO-E is converted to an empty hourly frame for that endpoint/range. + - normalized responses coalesce duplicate semantic columns (for example multiple wind/solar columns) via first non-null-per-row. + - missing rows are upserted into `electricity_market_observations`. + - final merged dataset is stored in `entsoe_api_cache` and returned. +8. **Raw persistence** + - merged inputs are upserted to `electricity_market_observations`. +9. **Feature engineering** + - `build_feature_frame(...)` computes: + - `residual_load = load - wind - solar` + - `lagged_price_1..24` + - `lagged_residual_load_1..24` + - `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos` + - preserves source missingness as `NaN` (no 0.0 imputation). + - drops rows only when `day_ahead_price` / `lagged_price_1..24` are missing (lag warmup requirement). +10. **Feature-store persistence** + - lags are materialized into PostgreSQL arrays (`DOUBLE PRECISION[]`, length 24). + - rows violating NOT NULL core feature constraints are filtered out before upsert. + - persistable rows are upserted to `electricity_price_features`. +11. **CLI completion** + - prints persisted row count. + +## 3) UML diagrams + +## 3.1 Component diagram + +```mermaid +flowchart LR + CLI[scripts/build_feature_store.py] --> PIPE[pipeline.run_feature_pipeline] + PIPE --> DBMOD[db.get_engine] + PIPE --> SERVICE[EntsoeDataService] + SERVICE --> CACHEDEC[cache_to_db decorator] + SERVICE --> ENTSOE[EntsoePandasClient] + SERVICE --> SECONDARY[electricity_market_observations secondary cache] + PIPE --> FEAT[features.build_feature_frame NaN-preserving] + FEAT --> PERSIST[pipeline.persist_feature_frame null-filtered] + CACHEDEC --> DB[(quant_db.entsoe_api_cache)] + SECONDARY --> RAW[(quant_db.electricity_market_observations)] + PERSIST --> STORE[(quant_db.electricity_price_features)] +``` + +## 3.2 Class diagram (logical) + +```mermaid +classDiagram + class EntsoeDataService { + +client: EntsoePandasClient + +engine: Engine + +cache_ttl_hours: Optional[int] + +fetch_inputs(country_code, start, end) DataFrame + +upsert_raw_data(country_code, frame) None + -_get_day_ahead_prices_impl(country_code, start, end) Series + -_get_load_forecast_impl(country_code, start, end) Series + -_get_wind_solar_forecast_impl(country_code, start, end) DataFrame + } + + class CacheDecorator { + +cache_to_db(engine, namespace, ttl_hours) decorator + -_build_cache_key(function_name, args, kwargs) str + } + + class FeatureBuilder { + +build_feature_frame(inputs, max_lag=24) DataFrame + -_cyclical_encode(values, period, prefix) DataFrame + } + + class Pipeline { + +run_feature_pipeline(engine, entsoe_api_key, country_code, start, end, cache_ttl_hours) DataFrame + +persist_feature_frame(engine, country_code, feature_frame) None + } + + Pipeline --> EntsoeDataService : uses + Pipeline --> FeatureBuilder : uses + EntsoeDataService --> CacheDecorator : wraps methods +``` + +## 3.3 Sequence diagram (single API method with cache) + +```mermaid +sequenceDiagram + participant Caller as fetch_inputs() + participant Decorator as cache_to_db wrapper + participant CacheTable as entsoe_api_cache (L1) + participant ObsTable as electricity_market_observations (L2) + participant API as ENTSO-E API + + Caller->>Decorator: get_day_ahead_prices(country, start, end) + Decorator->>CacheTable: SELECT by cache_key and expires_at + alt L1 cache hit + CacheTable-->>Decorator: payload + Decorator-->>Caller: unpickled pandas object + else L1 cache miss/expired + Decorator->>ObsTable: SELECT existing timestamps + alt L2 fully covers range + ObsTable-->>Decorator: pandas-compatible rows + else L2 has gaps + Decorator->>API: query only missing ranges + alt API returns data + API-->>Decorator: missing rows + Decorator->>Decorator: normalize columns + coalesce duplicates + Decorator->>ObsTable: UPSERT missing rows + else NoMatchingDataError + Decorator->>Decorator: synthesize empty hourly frame + end + end + Decorator->>CacheTable: INSERT/UPSERT merged payload + Decorator-->>Caller: fresh result + end +``` + +## 3.4 State diagram (cache entry lifecycle) + +```mermaid +stateDiagram-v2 + [*] --> L1Missing + L1Missing --> L2Check: cache miss/expiry + L2Check --> Fresh: observation table fully covers range + L2Check --> Partial: observation table has gaps + Partial --> Fresh: fetch missing ranges, upsert L2, upsert L1 + Fresh --> Fresh: reused before expiry + Fresh --> Expired: TTL passes for L1 entry + Expired --> L2Check: next call + Fresh --> Overwritten: Same key, new payload upsert + Overwritten --> Fresh +``` + +## 3.5 ER diagram (database schema) + +```mermaid +erDiagram + entsoe_api_cache { + text cache_key PK + text namespace + text function_name + jsonb args_json + bytea payload + timestamptz created_at + timestamptz expires_at + } + + electricity_market_observations { + text country_code PK + timestamptz delivery_start PK + float day_ahead_price + float load_forecast + float wind_forecast + float solar_forecast + timestamptz ingested_at + } + + electricity_price_features { + text country_code PK + timestamptz delivery_start PK + text feature_version PK + float day_ahead_price + float load_forecast + float wind_forecast + float solar_forecast + float residual_load + float[] lagged_price + float[] lagged_residual_load + float hour_of_day_sin + float hour_of_day_cos + float weekday_sin + float weekday_cos + float month_sin + float month_cos + timestamptz created_at + } +``` + +## 4) How files collaborate + +## 4.1 `db.py` + scripts + +- Scripts never hardcode DB URI; they call `get_engine()`. +- `get_engine()` centralizes environment-driven connectivity. + +## 4.2 `cache.py` + `entsoe_api.py` + +- `cache_to_db()` is generic and independent of ENTSO-E specifics. +- `EntsoeDataService.__post_init__` binds that generic decorator to each API-fetch method. +- Result: all expensive API calls automatically become cache-aware without changing call sites. + +## 4.3 `entsoe_api.py` + `features.py` + +- `entsoe_api.py` guarantees normalized timestamp index and expected source columns. +- `features.py` assumes these columns and transforms them to model features only (no DB side effects). + +## 4.4 `features.py` + `pipeline.py` + +- `build_feature_frame()` returns wide DataFrame with `lagged_*_1..24`. +- `persist_feature_frame()` converts those to PostgreSQL arrays so table rows stay compact and versioned. + +## 5) Important implementation details + +- **Cache keys are deterministic** + - Built from JSON of function name + args + kwargs with stable sorting. +- **Cache payload type** + - `pickle` stored in `BYTEA` to preserve pandas objects. +- **TTL logic** + - `expires_at IS NULL` means never expires. + - Otherwise must be greater than current UTC time to be considered valid. +- **Two-layer cache order** + - Layer 1: `entsoe_api_cache` (function-result cache). + - Layer 2: `electricity_market_observations` (timestamp-level raw cache). + - API calls happen only for Layer-2 gaps. +- **Upsert strategy** + - Raw and feature tables use `ON CONFLICT ... DO UPDATE` for idempotent reruns. + - Raw upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting previously stored values during partial refreshes. + - Feature upsert operates on a filtered persistable subset where core NOT NULL columns are present. +- **Missingness semantics** + - Forecast and derived residual columns preserve `NaN` in memory. + - No zero-imputation is performed for missing forecast values. +- **Bidding-zone normalization** + - `resolve_bidding_zone_code(...)` maps common country aliases to ENTSO-E zone codes. + - Pipeline persistence uses the resolved code, ensuring DB keys match actual queried zones. +- **Timezone handling** + - API index is normalized to UTC to avoid DST ambiguity in lag features. +- **Feature warmup** + - Rows missing `day_ahead_price` or any `lagged_price_1..24` are dropped because lag history is incomplete. + +## 6) Failure modes and expected behavior + +- Missing `ENTSOE_API_KEY` -> CLI raises early runtime error. +- Missing required input columns -> feature builder raises `ValueError`. +- Duplicate normalized columns from ENTSO-E payloads -> coalesced before reindexing to avoid pandas duplicate-label reindex errors. +- ENTSO-E no-data responses for an endpoint/range -> transformed to empty hourly frames and merged safely. +- Empty data frame -> raw/feature persistence functions no-op safely. +- Repeated identical request -> cache hit (no API roundtrip). +- Expired L1 cache row + full L2 coverage -> no API call required. +- Expired L1 cache row + partial L2 coverage -> API called only for missing ranges. + +## 7) Data contracts + +### 7.1 In-memory features contract + +Producer: `run_feature_pipeline(...)` return value (`pd.DataFrame`). + +- **Index contract** + - hourly UTC `DatetimeIndex`, sorted ascending. + - unique timestamps expected after deduplication. +- **Column contract** + - base: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast` + - derived: `residual_load` + - lag columns: `lagged_price_1..24`, `lagged_residual_load_1..24` + - cyclical: `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos` +- **Nullability contract** + - required non-null in returned rows: `day_ahead_price`, `lagged_price_1..24` + - nullable: `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*` + - rationale: preserve upstream missingness semantics for analysis and QC. + +### 7.2 Feature-store persistence contract + +Consumer: `electricity_price_features` table. + +- **Primary key contract** + - (`country_code`, `delivery_start`, `feature_version`) +- **Schema constraint contract** + - core numeric columns are `NOT NULL`. + - lag arrays are `DOUBLE PRECISION[]` and expected length 24. +- **Write-time contract** + - `persist_feature_frame(...)` filters rows that violate NOT NULL core columns before UPSERT. + - retained rows are idempotently upserted via `ON CONFLICT ... DO UPDATE`. + +### 7.3 Raw-observation contract + +Consumer: `electricity_market_observations` table. + +- **Primary key contract** + - (`country_code`, `delivery_start`) +- **Merge contract** + - upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting prior known values. +- **Coverage contract** + - secondary cache guarantees fetched payloads are aligned to expected hourly index for the requested `[start, end)` range. + +## 8) Practical debugging checklist + +1. Run `scripts/init_db.py` and ensure tables exist. +2. Run one short-range fetch window (1-2 days) first. +3. Verify cache growth: + - `SELECT namespace, function_name, COUNT(*) FROM entsoe_api_cache GROUP BY 1,2;` +4. Verify raw persistence: + - `SELECT COUNT(*) FROM electricity_market_observations WHERE country_code = '...';` +5. Verify feature persistence: + - check lag array sizes are 24 and row count is lower than raw by about 24. + +## 9) Suggested next developer docs to add + +- Data quality rules (acceptable missingness, clipping policy, anomaly handling). +- Training-set contract (target definition, split strategy, leakage constraints). +- Backfill/replay policy for reprocessing historical periods. diff --git a/electricity_price_predictor/requirements.txt b/electricity_price_predictor/requirements.txt new file mode 100644 index 0000000..dd1171a --- /dev/null +++ b/electricity_price_predictor/requirements.txt @@ -0,0 +1,4 @@ +entsoe-py +pandas +sqlalchemy +psycopg2-binary diff --git a/electricity_price_predictor/scripts/build_feature_store.py b/electricity_price_predictor/scripts/build_feature_store.py new file mode 100644 index 0000000..fd82e95 --- /dev/null +++ b/electricity_price_predictor/scripts/build_feature_store.py @@ -0,0 +1,41 @@ +import argparse +import os + +import pandas as pd + +from electricity_price_predictor.db import get_engine +from electricity_price_predictor.pipeline import run_feature_pipeline + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Fetch ENTSO-E inputs and build feature store.") + parser.add_argument("--country-code", required=True, help="ENTSO-E bidding zone code, e.g. DE_LU") + parser.add_argument("--start", required=True, help="Inclusive start datetime, e.g. 2026-01-01T00:00:00Z") + parser.add_argument("--end", required=True, help="Exclusive end datetime, e.g. 2026-02-01T00:00:00Z") + parser.add_argument("--cache-ttl-hours", type=int, default=24, help="Decorator cache TTL in hours") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + api_key = os.getenv("ENTSOE_API_KEY") + if not api_key: + raise RuntimeError("ENTSOE_API_KEY environment variable is required.") + + engine = get_engine() + start = pd.Timestamp(args.start, tz="UTC") + end = pd.Timestamp(args.end, tz="UTC") + + features = run_feature_pipeline( + engine=engine, + entsoe_api_key=api_key, + country_code=args.country_code, + start=start, + end=end, + cache_ttl_hours=args.cache_ttl_hours, + ) + print(f"Persisted {len(features)} feature rows for {args.country_code}.") + + +if __name__ == "__main__": + main() diff --git a/electricity_price_predictor/scripts/init_db.py b/electricity_price_predictor/scripts/init_db.py new file mode 100644 index 0000000..0f7869e --- /dev/null +++ b/electricity_price_predictor/scripts/init_db.py @@ -0,0 +1,23 @@ +from pathlib import Path + +from sqlalchemy import text + +from electricity_price_predictor.db import get_engine + + +def main() -> None: + engine = get_engine() + schema_path = Path(__file__).resolve().parents[1] / "sql" / "001_electricity_price_schema.sql" + sql = schema_path.read_text(encoding="utf-8") + + with engine.begin() as conn: + for statement in sql.split(";"): + stmt = statement.strip() + if stmt: + conn.execute(text(stmt)) + + print("Schema initialized for electricity price predictor.") + + +if __name__ == "__main__": + main() diff --git a/electricity_price_predictor/sql/001_electricity_price_schema.sql b/electricity_price_predictor/sql/001_electricity_price_schema.sql new file mode 100644 index 0000000..a32fe4d --- /dev/null +++ b/electricity_price_predictor/sql/001_electricity_price_schema.sql @@ -0,0 +1,55 @@ +CREATE TABLE IF NOT EXISTS entsoe_api_cache ( + cache_key TEXT PRIMARY KEY, + namespace TEXT NOT NULL, + function_name TEXT NOT NULL, + args_json JSONB NOT NULL, + payload BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_namespace_fn + ON entsoe_api_cache(namespace, function_name); + +CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_expires_at + ON entsoe_api_cache(expires_at); + +CREATE TABLE IF NOT EXISTS electricity_market_observations ( + country_code TEXT NOT NULL, + delivery_start TIMESTAMPTZ NOT NULL, + day_ahead_price DOUBLE PRECISION, + load_forecast DOUBLE PRECISION, + wind_forecast DOUBLE PRECISION, + solar_forecast DOUBLE PRECISION, + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (country_code, delivery_start) +); + +CREATE INDEX IF NOT EXISTS idx_electricity_market_observations_delivery + ON electricity_market_observations(delivery_start); + +CREATE TABLE IF NOT EXISTS electricity_price_features ( + country_code TEXT NOT NULL, + delivery_start TIMESTAMPTZ NOT NULL, + day_ahead_price DOUBLE PRECISION NOT NULL, + load_forecast DOUBLE PRECISION NOT NULL, + wind_forecast DOUBLE PRECISION NOT NULL, + solar_forecast DOUBLE PRECISION NOT NULL, + residual_load DOUBLE PRECISION NOT NULL, + lagged_price DOUBLE PRECISION[] NOT NULL, + lagged_residual_load DOUBLE PRECISION[] NOT NULL, + hour_of_day_sin DOUBLE PRECISION NOT NULL, + hour_of_day_cos DOUBLE PRECISION NOT NULL, + weekday_sin DOUBLE PRECISION NOT NULL, + weekday_cos DOUBLE PRECISION NOT NULL, + month_sin DOUBLE PRECISION NOT NULL, + month_cos DOUBLE PRECISION NOT NULL, + feature_version TEXT NOT NULL DEFAULT 'v1', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (country_code, delivery_start, feature_version), + CONSTRAINT chk_lagged_price_len CHECK (CARDINALITY(lagged_price) = 24), + CONSTRAINT chk_lagged_residual_load_len CHECK (CARDINALITY(lagged_residual_load) = 24) +); + +CREATE INDEX IF NOT EXISTS idx_electricity_price_features_delivery + ON electricity_price_features(delivery_start); diff --git a/electricity_price_predictor/src/electricity_price_predictor/__init__.py b/electricity_price_predictor/src/electricity_price_predictor/__init__.py new file mode 100644 index 0000000..0545869 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/__init__.py @@ -0,0 +1,7 @@ +"""Electricity price forecasting data pipeline package.""" + +from .db import get_engine +from .entsoe_api import EntsoeDataService +from .features import build_feature_frame + +__all__ = ["EntsoeDataService", "build_feature_frame", "get_engine"] diff --git a/electricity_price_predictor/src/electricity_price_predictor/cache.py b/electricity_price_predictor/src/electricity_price_predictor/cache.py new file mode 100644 index 0000000..b1b35e2 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/cache.py @@ -0,0 +1,112 @@ +import functools +import hashlib +import json +import pickle +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Optional + +from sqlalchemy import text +from sqlalchemy.engine import Engine + + +def _json_fallback_serializer(value: Any) -> str: + """Serializer for values that aren't directly JSON serializable.""" + if hasattr(value, "isoformat"): + return value.isoformat() + return repr(value) + + +def _build_cache_key(function_name: str, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str: + payload = json.dumps( + {"function_name": function_name, "args": args, "kwargs": kwargs}, + sort_keys=True, + default=_json_fallback_serializer, + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def cache_to_db( + engine: Engine, + namespace: str, + ttl_hours: Optional[int] = None, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Cache function output in quant_db.entsoe_api_cache table. + + Notes: + - TTL is optional. If omitted, cached values do not expire. + - Cached payload uses pickle so pandas objects can be restored losslessly. + """ + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + cache_key = _build_cache_key(f"{namespace}.{func.__name__}", args, kwargs) + now = datetime.now(timezone.utc) + + with engine.begin() as conn: + result = conn.execute( + text( + """ + SELECT payload + FROM entsoe_api_cache + WHERE cache_key = :cache_key + AND (expires_at IS NULL OR expires_at > :now_utc) + """ + ), + {"cache_key": cache_key, "now_utc": now}, + ).fetchone() + + if result: + return pickle.loads(result[0]) + + data = func(*args, **kwargs) + expires_at = None + if ttl_hours is not None: + expires_at = now + timedelta(hours=ttl_hours) + + conn.execute( + text( + """ + INSERT INTO entsoe_api_cache ( + cache_key, + namespace, + function_name, + args_json, + payload, + created_at, + expires_at + ) VALUES ( + :cache_key, + :namespace, + :function_name, + CAST(:args_json AS JSONB), + :payload, + :created_at, + :expires_at + ) + ON CONFLICT (cache_key) DO UPDATE + SET payload = EXCLUDED.payload, + created_at = EXCLUDED.created_at, + expires_at = EXCLUDED.expires_at, + args_json = EXCLUDED.args_json + """ + ), + { + "cache_key": cache_key, + "namespace": namespace, + "function_name": func.__name__, + "args_json": json.dumps( + {"args": args, "kwargs": kwargs}, + default=_json_fallback_serializer, + ), + "payload": pickle.dumps(data), + "created_at": now, + "expires_at": expires_at, + }, + ) + return data + + return wrapper + + return decorator diff --git a/electricity_price_predictor/src/electricity_price_predictor/db.py b/electricity_price_predictor/src/electricity_price_predictor/db.py new file mode 100644 index 0000000..2230271 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/db.py @@ -0,0 +1,24 @@ +import os + +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + + +def get_database_url() -> str: + """Build database URL from env or fallback defaults.""" + explicit_url = os.getenv("QUANT_DB_URL") + if explicit_url: + return explicit_url + + host = os.getenv("QUANT_DB_HOST", "localhost") + port = os.getenv("QUANT_DB_PORT", "5432") + database = os.getenv("QUANT_DB_NAME", "quant_db") + user = os.getenv("QUANT_DB_USER", "quant_user") + password = os.getenv("QUANT_DB_PASSWORD", "strong_password") + + return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" + + +def get_engine(echo: bool = False) -> Engine: + """Create SQLAlchemy engine for quant_db.""" + return create_engine(get_database_url(), future=True, echo=echo) diff --git a/electricity_price_predictor/src/electricity_price_predictor/entsoe_api.py b/electricity_price_predictor/src/electricity_price_predictor/entsoe_api.py new file mode 100644 index 0000000..dae5320 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/entsoe_api.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +import pandas as pd +from entsoe import EntsoePandasClient +from entsoe.exceptions import NoMatchingDataError +from sqlalchemy import text +from sqlalchemy.engine import Engine + +from .cache import cache_to_db + +OBSERVATION_COLUMNS = { + "day_ahead_price", + "load_forecast", + "wind_forecast", + "solar_forecast", +} + +BIDDING_ZONE_ALIASES = { + # ENTSO-E often expects bidding-zone EIC aliases instead of plain country codes. + "DE": "DE_LU", + "IT": "IT_NORD", +} + + +def _as_utc_index(series_or_df: pd.Series | pd.DataFrame) -> pd.Series | pd.DataFrame: + if series_or_df.index.tz is None: + series_or_df.index = series_or_df.index.tz_localize("UTC") + else: + series_or_df.index = series_or_df.index.tz_convert("UTC") + return series_or_df.sort_index() + + +def _safe_float(value) -> Optional[float]: + if pd.isna(value): + return None + return float(value) + + +def resolve_bidding_zone_code(country_code: str) -> str: + code = str(country_code).strip().upper() + return BIDDING_ZONE_ALIASES.get(code, code) + + +def _coerce_single_column_frame( + data: pd.Series | pd.DataFrame, + target_column: str, + preferred_tokens: tuple[str, ...] = (), +) -> pd.DataFrame: + """ + Normalize ENTSO-E responses that can be either a Series or DataFrame. + """ + if isinstance(data, pd.Series): + series = _as_utc_index(data) + series.name = target_column + return series.to_frame() + + frame = _as_utc_index(data.copy()) + if target_column in frame.columns: + return frame[[target_column]] + + if len(frame.columns) == 1: + return frame.rename(columns={frame.columns[0]: target_column})[[target_column]] + + def _best_column(candidates: list) -> Optional[str]: + if not candidates: + return None + # Prefer the candidate with most available points. + return max(candidates, key=lambda col: int(frame[col].notna().sum())) + + lowered = {col: str(col).lower() for col in frame.columns} + preferred_candidates = [] + for token in preferred_tokens: + preferred_candidates.extend([col for col, col_lc in lowered.items() if token in col_lc]) + + preferred_col = _best_column(preferred_candidates) + if preferred_col is not None: + return frame.rename(columns={preferred_col: target_column})[[target_column]] + + any_col = _best_column(list(frame.columns)) + if any_col is not None: + return frame.rename(columns={any_col: target_column})[[target_column]] + + first_col = frame.columns[0] + return frame.rename(columns={first_col: target_column})[[target_column]] + + +def _normalize_utc_bounds(start: pd.Timestamp, end: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]: + start_ts = pd.Timestamp(start) + end_ts = pd.Timestamp(end) + if start_ts.tz is None: + start_ts = start_ts.tz_localize("UTC") + else: + start_ts = start_ts.tz_convert("UTC") + if end_ts.tz is None: + end_ts = end_ts.tz_localize("UTC") + else: + end_ts = end_ts.tz_convert("UTC") + return start_ts, end_ts + + +def _empty_hourly_frame( + columns: list[str], start: pd.Timestamp, end: pd.Timestamp +) -> pd.DataFrame: + start_ts, end_ts = _normalize_utc_bounds(start, end) + idx = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left") + return pd.DataFrame(index=idx, columns=columns) + + +def _coalesce_duplicate_columns(frame: pd.DataFrame) -> pd.DataFrame: + """ + Collapse duplicate column labels by taking the first non-null per row. + """ + if frame.columns.is_unique: + return frame + + merged: dict[str, pd.Series] = {} + for col in frame.columns.unique(): + same_name = frame.loc[:, frame.columns == col] + if isinstance(same_name, pd.Series): + merged[str(col)] = same_name + else: + merged[str(col)] = same_name.bfill(axis=1).iloc[:, 0] + return pd.DataFrame(merged, index=frame.index) + + +def _missing_ranges(missing_index: pd.DatetimeIndex) -> list[tuple[pd.Timestamp, pd.Timestamp]]: + if missing_index.empty: + return [] + missing_index = missing_index.sort_values().unique() + ranges: list[tuple[pd.Timestamp, pd.Timestamp]] = [] + current_start = missing_index[0] + prev = missing_index[0] + step = pd.Timedelta(hours=1) + + for ts in missing_index[1:]: + if ts - prev != step: + ranges.append((current_start, prev + step)) + current_start = ts + prev = ts + ranges.append((current_start, prev + step)) + return ranges + + +@dataclass +class EntsoeDataService: + client: EntsoePandasClient + engine: Engine + cache_ttl_hours: Optional[int] = 24 + + def __post_init__(self) -> None: + self.get_day_ahead_prices = cache_to_db( + self.engine, "entsoe", ttl_hours=self.cache_ttl_hours + )(self._get_day_ahead_prices_impl) + self.get_load_forecast = cache_to_db( + self.engine, "entsoe", ttl_hours=self.cache_ttl_hours + )(self._get_load_forecast_impl) + self.get_wind_solar_forecast = cache_to_db( + self.engine, "entsoe", ttl_hours=self.cache_ttl_hours + )(self._get_wind_solar_forecast_impl) + + def resolve_country_code(self, country_code: str) -> str: + return resolve_bidding_zone_code(country_code) + + def _get_day_ahead_prices_impl( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.Series: + df = self._fetch_inputs_with_secondary_cache( + country_code=country_code, + start=start, + end=end, + required_columns=["day_ahead_price"], + api_fetcher=self._query_day_ahead_prices, + ) + return df["day_ahead_price"] + + def _get_load_forecast_impl( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.Series: + df = self._fetch_inputs_with_secondary_cache( + country_code=country_code, + start=start, + end=end, + required_columns=["load_forecast"], + api_fetcher=self._query_load_forecast, + ) + return df["load_forecast"] + + def _get_wind_solar_forecast_impl( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.DataFrame: + return self._fetch_inputs_with_secondary_cache( + country_code=country_code, + start=start, + end=end, + required_columns=["wind_forecast", "solar_forecast"], + api_fetcher=self._query_wind_solar_forecast, + ) + + def _query_day_ahead_prices( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.DataFrame: + try: + raw = self.client.query_day_ahead_prices(country_code, start=start, end=end) + except NoMatchingDataError: + return _empty_hourly_frame(["day_ahead_price"], start, end) + return _coerce_single_column_frame( + raw, + target_column="day_ahead_price", + preferred_tokens=("price", "ahead"), + ) + + def _query_load_forecast( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.DataFrame: + try: + raw = self.client.query_load_forecast(country_code, start=start, end=end) + except NoMatchingDataError: + return _empty_hourly_frame(["load_forecast"], start, end) + return _coerce_single_column_frame( + raw, + target_column="load_forecast", + preferred_tokens=("load",), + ) + + def _query_wind_solar_forecast( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.DataFrame: + try: + df = self.client.query_wind_and_solar_forecast(country_code, start=start, end=end) + except NoMatchingDataError: + return _empty_hourly_frame(["wind_forecast", "solar_forecast"], start, end) + df = _as_utc_index(df) + if isinstance(df, pd.Series): + df = df.to_frame() + + renamed = {} + for column in df.columns: + lc = str(column).lower() + if "wind" in lc: + renamed[column] = "wind_forecast" + elif "solar" in lc: + renamed[column] = "solar_forecast" + df = df.rename(columns=renamed) + df = _coalesce_duplicate_columns(df) + + if "wind_forecast" not in df.columns: + df["wind_forecast"] = None + if "solar_forecast" not in df.columns: + df["solar_forecast"] = None + + return df[["wind_forecast", "solar_forecast"]] + + def _load_observations( + self, + country_code: str, + start: pd.Timestamp, + end: pd.Timestamp, + columns: list[str], + ) -> pd.DataFrame: + invalid = [col for col in columns if col not in OBSERVATION_COLUMNS] + if invalid: + raise ValueError(f"Unsupported observation columns requested: {invalid}") + + sql_columns = ", ".join(columns) + query = text( + f""" + SELECT delivery_start, {sql_columns} + FROM electricity_market_observations + WHERE country_code = :country_code + AND delivery_start >= :start_ts + AND delivery_start < :end_ts + ORDER BY delivery_start + """ + ) + + with self.engine.begin() as conn: + rows = conn.execute( + query, + {"country_code": country_code, "start_ts": start, "end_ts": end}, + ).fetchall() + + if not rows: + return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], tz="UTC")) + + db_frame = pd.DataFrame(rows, columns=["delivery_start", *columns]) + db_frame["delivery_start"] = pd.to_datetime(db_frame["delivery_start"], utc=True) + db_frame = db_frame.set_index("delivery_start").sort_index() + return db_frame + + def _fetch_inputs_with_secondary_cache( + self, + country_code: str, + start: pd.Timestamp, + end: pd.Timestamp, + required_columns: list[str], + api_fetcher, + ) -> pd.DataFrame: + start_ts, end_ts = _normalize_utc_bounds(start, end) + expected_index = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left") + from_observations = self._load_observations(country_code, start_ts, end_ts, required_columns) + + if from_observations.empty: + complete_index = pd.DatetimeIndex([], tz="UTC") + else: + complete_mask = from_observations[required_columns].notna().all(axis=1) + complete_index = from_observations.index[complete_mask] + + missing_index = expected_index.difference(complete_index) + + fetched_parts: list[pd.DataFrame] = [] + for missing_start, missing_end in _missing_ranges(missing_index): + fetched = api_fetcher(country_code, missing_start, missing_end) + fetched = fetched.reindex(columns=required_columns) + fetched_parts.append(fetched) + + if fetched_parts: + fetched_frame = pd.concat(fetched_parts).sort_index() + self.upsert_raw_data(country_code=country_code, frame=fetched_frame) + else: + fetched_frame = pd.DataFrame(columns=required_columns, index=pd.DatetimeIndex([], tz="UTC")) + + combined = pd.concat([from_observations, fetched_frame]).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + combined = combined.reindex(expected_index) + return combined[required_columns] + + def fetch_inputs( + self, country_code: str, start: pd.Timestamp, end: pd.Timestamp + ) -> pd.DataFrame: + resolved_country_code = self.resolve_country_code(country_code) + price = _coerce_single_column_frame( + self.get_day_ahead_prices(resolved_country_code, start, end), + target_column="day_ahead_price", + preferred_tokens=("price", "ahead"), + ) + load = _coerce_single_column_frame( + self.get_load_forecast(resolved_country_code, start, end), + target_column="load_forecast", + preferred_tokens=("load",), + ) + wind_solar = self.get_wind_solar_forecast(resolved_country_code, start, end) + df = price.join(load, how="outer").join(wind_solar, how="outer").sort_index() + return df + + def upsert_raw_data(self, country_code: str, frame: pd.DataFrame) -> None: + rows = [] + for ts, row in frame.iterrows(): + rows.append( + { + "country_code": country_code, + "delivery_start": ts.to_pydatetime(), + "day_ahead_price": _safe_float(row.get("day_ahead_price")), + "load_forecast": _safe_float(row.get("load_forecast")), + "wind_forecast": _safe_float(row.get("wind_forecast")), + "solar_forecast": _safe_float(row.get("solar_forecast")), + } + ) + + if not rows: + return + + with self.engine.begin() as conn: + conn.execute( + text( + """ + INSERT INTO electricity_market_observations ( + country_code, + delivery_start, + day_ahead_price, + load_forecast, + wind_forecast, + solar_forecast + ) VALUES ( + :country_code, + :delivery_start, + :day_ahead_price, + :load_forecast, + :wind_forecast, + :solar_forecast + ) + ON CONFLICT (country_code, delivery_start) DO UPDATE + SET day_ahead_price = COALESCE(EXCLUDED.day_ahead_price, electricity_market_observations.day_ahead_price), + load_forecast = COALESCE(EXCLUDED.load_forecast, electricity_market_observations.load_forecast), + wind_forecast = COALESCE(EXCLUDED.wind_forecast, electricity_market_observations.wind_forecast), + solar_forecast = COALESCE(EXCLUDED.solar_forecast, electricity_market_observations.solar_forecast), + ingested_at = NOW() + """ + ), + rows, + ) diff --git a/electricity_price_predictor/src/electricity_price_predictor/features.py b/electricity_price_predictor/src/electricity_price_predictor/features.py new file mode 100644 index 0000000..4eb5cb0 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/features.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import math + +import pandas as pd + + +def _cyclical_encode(values: pd.Series, period: int, prefix: str) -> pd.DataFrame: + angle = 2.0 * math.pi * values / period + return pd.DataFrame( + {f"{prefix}_sin": angle.apply(math.sin), f"{prefix}_cos": angle.apply(math.cos)}, + index=values.index, + ) + + +def build_feature_frame(inputs: pd.DataFrame, max_lag: int = 24) -> pd.DataFrame: + """ + Build feature set for electricity price forecasting. + + Included: + - day_ahead_price, load_forecast, wind_forecast, solar_forecast + - residual_load + - lagged_price(t-1..t-24) + - lagged_residual_load(t-1..t-24) + - hour/week_day/month cyclical encodings + """ + df = inputs.copy().sort_index() + required = {"day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"} + missing = required.difference(df.columns) + if missing: + raise ValueError(f"Missing required input columns: {sorted(missing)}") + + # Preserve source missingness semantics for downstream users. + # We keep NaNs instead of imputing to 0.0 so missing data is explicit. + df["residual_load"] = df["load_forecast"] - df["wind_forecast"] - df["solar_forecast"] + + for lag in range(1, max_lag + 1): + df[f"lagged_price_{lag}"] = df["day_ahead_price"].shift(lag) + df[f"lagged_residual_load_{lag}"] = df["residual_load"].shift(lag) + + time_index = df.index + if time_index.tz is None: + time_index = time_index.tz_localize("UTC") + else: + time_index = time_index.tz_convert("UTC") + + cyclical = [ + _cyclical_encode(pd.Series(time_index.hour, index=df.index), 24, "hour_of_day"), + _cyclical_encode(pd.Series(time_index.weekday, index=df.index), 7, "weekday"), + _cyclical_encode(pd.Series(time_index.month - 1, index=df.index), 12, "month"), + ] + for cyc in cyclical: + df = df.join(cyc) + + # Only enforce warmup/history constraints for price lags. + # Other feature columns can remain NaN when source data is missing. + required_for_row = ["day_ahead_price", *[f"lagged_price_{lag}" for lag in range(1, max_lag + 1)]] + return df.dropna(subset=required_for_row).sort_index() diff --git a/electricity_price_predictor/src/electricity_price_predictor/pipeline.py b/electricity_price_predictor/src/electricity_price_predictor/pipeline.py new file mode 100644 index 0000000..5deffe6 --- /dev/null +++ b/electricity_price_predictor/src/electricity_price_predictor/pipeline.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import pandas as pd +from entsoe import EntsoePandasClient +from sqlalchemy import text +from sqlalchemy.engine import Engine + +from .entsoe_api import EntsoeDataService +from .features import build_feature_frame + + +def _safe_float(value) -> float | None: + if pd.isna(value): + return None + return float(value) + + +def _validate_inputs_have_signal(inputs: pd.DataFrame, country_code: str) -> None: + required = ["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"] + non_null_counts = {col: int(inputs[col].notna().sum()) for col in required if col in inputs.columns} + if non_null_counts and all(count == 0 for count in non_null_counts.values()): + raise ValueError( + "No ENTSO-E data available for " + f"'{country_code}' in the requested time window. " + "Try another bidding zone/time range, and ensure the API key has access." + ) + + +def persist_feature_frame(engine: Engine, country_code: str, feature_frame: pd.DataFrame) -> None: + # Feature-store schema expects core numeric fields to be non-null. + # Keep NaNs in the returned DataFrame, but skip incomplete rows on DB write. + persistable = feature_frame.dropna( + subset=["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast", "residual_load"] + ) + + rows = [] + for ts, row in persistable.iterrows(): + lag_price = [float(row[f"lagged_price_{lag}"]) for lag in range(1, 25)] + lag_residual = [float(row[f"lagged_residual_load_{lag}"]) for lag in range(1, 25)] + + rows.append( + { + "country_code": country_code, + "delivery_start": ts.to_pydatetime(), + "day_ahead_price": float(row["day_ahead_price"]), + "load_forecast": _safe_float(row["load_forecast"]), + "wind_forecast": _safe_float(row["wind_forecast"]), + "solar_forecast": _safe_float(row["solar_forecast"]), + "residual_load": _safe_float(row["residual_load"]), + "lagged_price": lag_price, + "lagged_residual_load": lag_residual, + "hour_of_day_sin": float(row["hour_of_day_sin"]), + "hour_of_day_cos": float(row["hour_of_day_cos"]), + "weekday_sin": float(row["weekday_sin"]), + "weekday_cos": float(row["weekday_cos"]), + "month_sin": float(row["month_sin"]), + "month_cos": float(row["month_cos"]), + "feature_version": "v1", + "created_at": datetime.now(timezone.utc), + } + ) + + if not rows: + return + + with engine.begin() as conn: + conn.execute( + text( + """ + INSERT INTO electricity_price_features ( + country_code, + delivery_start, + day_ahead_price, + load_forecast, + wind_forecast, + solar_forecast, + residual_load, + lagged_price, + lagged_residual_load, + hour_of_day_sin, + hour_of_day_cos, + weekday_sin, + weekday_cos, + month_sin, + month_cos, + feature_version, + created_at + ) VALUES ( + :country_code, + :delivery_start, + :day_ahead_price, + :load_forecast, + :wind_forecast, + :solar_forecast, + :residual_load, + :lagged_price, + :lagged_residual_load, + :hour_of_day_sin, + :hour_of_day_cos, + :weekday_sin, + :weekday_cos, + :month_sin, + :month_cos, + :feature_version, + :created_at + ) + ON CONFLICT (country_code, delivery_start, feature_version) DO UPDATE + SET day_ahead_price = EXCLUDED.day_ahead_price, + load_forecast = EXCLUDED.load_forecast, + wind_forecast = EXCLUDED.wind_forecast, + solar_forecast = EXCLUDED.solar_forecast, + residual_load = EXCLUDED.residual_load, + lagged_price = EXCLUDED.lagged_price, + lagged_residual_load = EXCLUDED.lagged_residual_load, + hour_of_day_sin = EXCLUDED.hour_of_day_sin, + hour_of_day_cos = EXCLUDED.hour_of_day_cos, + weekday_sin = EXCLUDED.weekday_sin, + weekday_cos = EXCLUDED.weekday_cos, + month_sin = EXCLUDED.month_sin, + month_cos = EXCLUDED.month_cos, + created_at = EXCLUDED.created_at + """ + ), + rows, + ) + + +def run_feature_pipeline( + engine: Engine, + entsoe_api_key: str, + country_code: str, + start: pd.Timestamp, + end: pd.Timestamp, + cache_ttl_hours: int = 24, +) -> pd.DataFrame: + client = EntsoePandasClient(api_key=entsoe_api_key) + service = EntsoeDataService(client=client, engine=engine, cache_ttl_hours=cache_ttl_hours) + resolved_country_code = service.resolve_country_code(country_code) + inputs = service.fetch_inputs(country_code=resolved_country_code, start=start, end=end) + _validate_inputs_have_signal(inputs, resolved_country_code) + service.upsert_raw_data(country_code=resolved_country_code, frame=inputs) + features = build_feature_frame(inputs) + persist_feature_frame(engine, country_code=resolved_country_code, feature_frame=features) + return features