Add electricity price ingestion and feature pipeline.
Introduce ENTSO-E data retrieval with layered caching, robust bidding-zone and missing-data handling, and persist model-ready features with detailed architecture/developer documentation. Made-with: Cursor
This commit is contained in:
164
electricity_price_predictor/README.md
Normal file
164
electricity_price_predictor/README.md
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
# Electricity Price Predictor
|
||||||
|
|
||||||
|
Standalone module for ENTSO-E ingestion and feature-store creation in `quant_db`.
|
||||||
|
|
||||||
|
## Documentation
|
||||||
|
|
||||||
|
- High-level architecture: `docs/architecture.md`
|
||||||
|
- Detailed developer documentation (file-by-file + UML): `docs/developer_guide.md`
|
||||||
|
|
||||||
|
## What it builds
|
||||||
|
|
||||||
|
- Input columns:
|
||||||
|
- `day_ahead_price`
|
||||||
|
- `load_forecast`
|
||||||
|
- `wind_forecast`
|
||||||
|
- `solar_forecast`
|
||||||
|
- Derived columns:
|
||||||
|
- `residual_load`
|
||||||
|
- `lagged_price` (`t-1` ... `t-24`, stored as array length 24)
|
||||||
|
- `lagged_residual_load` (`t-1` ... `t-24`, stored as array length 24)
|
||||||
|
- `hour_of_day_sin`, `hour_of_day_cos`
|
||||||
|
- `weekday_sin`, `weekday_cos`
|
||||||
|
- `month_sin`, `month_cos`
|
||||||
|
|
||||||
|
## Column units (data dictionary)
|
||||||
|
|
||||||
|
All timestamps are hourly and stored in UTC (`delivery_start`).
|
||||||
|
|
||||||
|
- `day_ahead_price`
|
||||||
|
- Unit: `EUR/MWh` (euros per megawatt-hour).
|
||||||
|
- `load_forecast`
|
||||||
|
- Unit: `MW` (megawatts).
|
||||||
|
- `wind_forecast`
|
||||||
|
- Unit: `MW` (megawatts).
|
||||||
|
- `solar_forecast`
|
||||||
|
- Unit: `MW` (megawatts).
|
||||||
|
- `residual_load = load_forecast - wind_forecast - solar_forecast`
|
||||||
|
- Unit: `MW` (megawatts).
|
||||||
|
- `lagged_price` (array of 24 values for `t-1..t-24`)
|
||||||
|
- Unit: `EUR/MWh`.
|
||||||
|
- `lagged_residual_load` (array of 24 values for `t-1..t-24`)
|
||||||
|
- Unit: `MW`.
|
||||||
|
- `hour_of_day_sin`, `hour_of_day_cos`
|
||||||
|
- Unit: dimensionless in `[-1, 1]`.
|
||||||
|
- `weekday_sin`, `weekday_cos`
|
||||||
|
- Unit: dimensionless in `[-1, 1]`.
|
||||||
|
- `month_sin`, `month_cos`
|
||||||
|
- Unit: dimensionless in `[-1, 1]`.
|
||||||
|
|
||||||
|
## Missing-data semantics
|
||||||
|
|
||||||
|
The pipeline intentionally distinguishes **missing** from **measured zero**:
|
||||||
|
|
||||||
|
- Forecast columns (`load_forecast`, `wind_forecast`, `solar_forecast`) remain `NaN` when ENTSO-E has no value.
|
||||||
|
- `residual_load` is computed directly from source columns and remains `NaN` when any source component is missing.
|
||||||
|
- Feature engineering drops rows only for lag warmup requirements (`day_ahead_price` and `lagged_price_1..24`), not for every nullable forecast column.
|
||||||
|
- During DB persistence to `electricity_price_features`, rows with nulls in NOT NULL core columns are skipped (to satisfy schema constraints) while still being available in the returned in-memory DataFrame.
|
||||||
|
|
||||||
|
## Data contracts
|
||||||
|
|
||||||
|
### In-memory contract (`run_feature_pipeline(...)` return value)
|
||||||
|
|
||||||
|
- Index:
|
||||||
|
- Type: timezone-aware `DatetimeIndex`
|
||||||
|
- Granularity: hourly
|
||||||
|
- Timezone: UTC
|
||||||
|
- Uniqueness: unique timestamps expected
|
||||||
|
- Columns:
|
||||||
|
- Base signals: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
|
||||||
|
- Derived: `residual_load`
|
||||||
|
- Lag vectors (expanded): `lagged_price_1..24`, `lagged_residual_load_1..24`
|
||||||
|
- Cyclical: `hour_of_day_sin`, `hour_of_day_cos`, `weekday_sin`, `weekday_cos`, `month_sin`, `month_cos`
|
||||||
|
- Nullability:
|
||||||
|
- `day_ahead_price`: required for returned rows.
|
||||||
|
- `lagged_price_1..24`: required for returned rows.
|
||||||
|
- `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`: nullable (`NaN` allowed).
|
||||||
|
|
||||||
|
### Persistence contract (`electricity_price_features`)
|
||||||
|
|
||||||
|
- Persisted row key:
|
||||||
|
- (`country_code`, `delivery_start`, `feature_version`)
|
||||||
|
- NOT NULL core columns in schema:
|
||||||
|
- `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`
|
||||||
|
- `lagged_price`, `lagged_residual_load`
|
||||||
|
- cyclical columns (`hour_of_day_*`, `weekday_*`, `month_*`)
|
||||||
|
- Write behavior:
|
||||||
|
- The persistence layer filters out non-conforming rows (nulls in NOT NULL core columns) before UPSERT.
|
||||||
|
- Persisted lag arrays are fixed length 24 and map to `t-1..t-24`.
|
||||||
|
|
||||||
|
### Raw observations contract (`electricity_market_observations`)
|
||||||
|
|
||||||
|
- Indexing/key semantics:
|
||||||
|
- one row per (`country_code`, `delivery_start`)
|
||||||
|
- Update semantics:
|
||||||
|
- partial refreshes do not overwrite existing non-null values with nulls (`COALESCE` merge policy)
|
||||||
|
|
||||||
|
## Country-code and API behavior notes
|
||||||
|
|
||||||
|
- Use ENTSO-E bidding-zone identifiers (for example `DE_LU` rather than `DE`) when querying across all endpoints.
|
||||||
|
- The pipeline includes a bidding-zone resolver for common country aliases:
|
||||||
|
- `DE -> DE_LU`
|
||||||
|
- `IT -> IT_NORD`
|
||||||
|
- Persistence uses the resolved bidding-zone code so DB keys match the queried market zone.
|
||||||
|
- Some ENTSO-E endpoints may return no matches for specific windows/countries. These are handled as empty hourly frames for that endpoint rather than hard-failing the whole fetch.
|
||||||
|
- Wind/solar responses can include duplicate semantic columns after normalization; the service coalesces duplicates by taking the first non-null value per timestamp.
|
||||||
|
|
||||||
|
## Database objects
|
||||||
|
|
||||||
|
`sql/001_electricity_price_schema.sql` creates:
|
||||||
|
|
||||||
|
- `entsoe_api_cache`: generic decorator cache table (pickled payloads, TTL support)
|
||||||
|
- `electricity_market_observations`: raw hourly ENTSO-E observations
|
||||||
|
- `electricity_price_features`: model-ready feature store
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd electricity_price_predictor
|
||||||
|
python -m venv .venv
|
||||||
|
source .venv/bin/activate
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
Set env vars:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export ENTSOE_API_KEY="your_entsoe_key"
|
||||||
|
export QUANT_DB_HOST="localhost"
|
||||||
|
export QUANT_DB_PORT="5432"
|
||||||
|
export QUANT_DB_NAME="quant_db"
|
||||||
|
export QUANT_DB_USER="quant_user"
|
||||||
|
export QUANT_DB_PASSWORD="strong_password"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Initialize schema
|
||||||
|
|
||||||
|
```bash
|
||||||
|
PYTHONPATH=src python scripts/init_db.py
|
||||||
|
```
|
||||||
|
|
||||||
|
## Build feature store
|
||||||
|
|
||||||
|
```bash
|
||||||
|
PYTHONPATH=src python scripts/build_feature_store.py \
|
||||||
|
--country-code DE_LU \
|
||||||
|
--start 2026-01-01T00:00:00Z \
|
||||||
|
--end 2026-02-01T00:00:00Z \
|
||||||
|
--cache-ttl-hours 24
|
||||||
|
```
|
||||||
|
|
||||||
|
## Decorator caching behavior
|
||||||
|
|
||||||
|
The `cache_to_db` decorator:
|
||||||
|
|
||||||
|
- hashes function name + arguments into deterministic `cache_key`
|
||||||
|
- checks `entsoe_api_cache` first
|
||||||
|
- returns cached payload if key exists and not expired
|
||||||
|
- otherwise uses `electricity_market_observations` as secondary cache at timestamp level
|
||||||
|
- only calls ENTSO-E for missing hourly intervals, then upserts those rows
|
||||||
|
- stores final returned object in `entsoe_api_cache`
|
||||||
|
|
||||||
|
This gives a two-layer cache:
|
||||||
|
1) fast function-result cache (`entsoe_api_cache`) and
|
||||||
|
2) canonical timestamp cache (`electricity_market_observations`).
|
||||||
73
electricity_price_predictor/docs/architecture.md
Normal file
73
electricity_price_predictor/docs/architecture.md
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
# Architecture Notes
|
||||||
|
|
||||||
|
This document is the quick architecture map. For full file-by-file implementation details, see `docs/developer_guide.md`.
|
||||||
|
|
||||||
|
## End-to-end data flow
|
||||||
|
|
||||||
|
1. `scripts/build_feature_store.py` parses CLI arguments and validates env vars.
|
||||||
|
2. It calls `pipeline.run_feature_pipeline(...)`.
|
||||||
|
3. `EntsoeDataService.fetch_inputs(...)` loads:
|
||||||
|
- `day_ahead_price`
|
||||||
|
- `load_forecast`
|
||||||
|
- `wind_forecast`
|
||||||
|
- `solar_forecast`
|
||||||
|
4. Each ENTSO-E call is wrapped by `cache_to_db(...)` and either:
|
||||||
|
- serves a hit from `entsoe_api_cache`, or
|
||||||
|
- falls back to `electricity_market_observations` for already-known timestamps,
|
||||||
|
- and performs API calls only for missing hourly intervals.
|
||||||
|
5. Missing intervals returned from API are upserted into `electricity_market_observations`.
|
||||||
|
6. The final merged result is cached in `entsoe_api_cache`.
|
||||||
|
7. Raw merged series are upserted to `electricity_market_observations`.
|
||||||
|
8. `features.build_feature_frame(...)` computes:
|
||||||
|
- `residual_load`
|
||||||
|
- lagged arrays (24 values each)
|
||||||
|
- cyclical encodings for hour/weekday/month
|
||||||
|
- preserves `NaN` for missing forecast-derived values.
|
||||||
|
9. `pipeline.persist_feature_frame(...)` upserts model-ready rows to `electricity_price_features`.
|
||||||
|
- filters out rows that violate feature-table NOT NULL constraints.
|
||||||
|
|
||||||
|
## Process diagram
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart TD
|
||||||
|
A[build_feature_store.py CLI] --> B[run_feature_pipeline]
|
||||||
|
B --> C[EntsoeDataService.fetch_inputs]
|
||||||
|
C --> D{Hit in entsoe_api_cache?}
|
||||||
|
D -->|Yes| E[Load payload from entsoe_api_cache]
|
||||||
|
D -->|No| F[Read electricity_market_observations]
|
||||||
|
F --> G{Missing hourly timestamps?}
|
||||||
|
G -->|No| H[Reuse DB observation rows]
|
||||||
|
G -->|Yes| I[Call ENTSO-E only for missing ranges]
|
||||||
|
I --> I2{NoMatchingDataError?}
|
||||||
|
I2 -->|Yes| I3[Use empty hourly frame for endpoint]
|
||||||
|
I2 -->|No| I4[Normalize payload]
|
||||||
|
I4 --> I5[Coalesce duplicate columns by first non-null]
|
||||||
|
I3 --> J[Upsert missing rows to electricity_market_observations]
|
||||||
|
I5 --> J
|
||||||
|
H --> K[Build merged input DataFrame]
|
||||||
|
J --> K
|
||||||
|
K --> L[Store payload in entsoe_api_cache]
|
||||||
|
E --> M[Use cached input DataFrame]
|
||||||
|
L --> N[Upsert electricity_market_observations]
|
||||||
|
M --> N
|
||||||
|
N --> O[build_feature_frame]
|
||||||
|
O --> P[Create lags + cyclical features]
|
||||||
|
P --> P2[Preserve NaN in forecast-derived columns]
|
||||||
|
P2 --> P3[Drop rows missing day_ahead_price or lagged_price_1..24]
|
||||||
|
P3 --> Q[Upsert persistable subset into electricity_price_features]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key design reasons
|
||||||
|
|
||||||
|
- DB cache avoids repeated ENTSO-E calls during iterative model work.
|
||||||
|
- Observation-table fallback avoids re-fetching timestamps already persisted once.
|
||||||
|
- Pickled payloads preserve exact pandas object shape and index information.
|
||||||
|
- Feature table stores fixed-size lag arrays so one row corresponds to one prediction timestamp.
|
||||||
|
- Missing forecasts are kept as `NaN` in analysis outputs, avoiding misleading zero-imputation.
|
||||||
|
- Persistence layer enforces schema compatibility by skipping rows with nulls in NOT NULL feature columns.
|
||||||
|
|
||||||
|
## Extension points
|
||||||
|
|
||||||
|
- Add label/target tables (`t+1`, `t+24`, etc.).
|
||||||
|
- Add training metadata + model registry tables.
|
||||||
|
- Add partitioning strategy for multi-year production-scale data.
|
||||||
362
electricity_price_predictor/docs/developer_guide.md
Normal file
362
electricity_price_predictor/docs/developer_guide.md
Normal file
@@ -0,0 +1,362 @@
|
|||||||
|
# Developer Guide (Deep Dive)
|
||||||
|
|
||||||
|
This guide explains each file in the module, execution order, control flow, and data/state transitions so you can reason about behavior without reading source code.
|
||||||
|
|
||||||
|
## 1) Directory map and responsibilities
|
||||||
|
|
||||||
|
### Top-level
|
||||||
|
|
||||||
|
- `requirements.txt`
|
||||||
|
- Python dependencies for ingestion and DB persistence.
|
||||||
|
- `README.md`
|
||||||
|
- Operator-focused setup and run commands.
|
||||||
|
- `sql/001_electricity_price_schema.sql`
|
||||||
|
- DDL for cache, raw observations, and feature store.
|
||||||
|
- `scripts/init_db.py`
|
||||||
|
- Applies the SQL schema to `quant_db`.
|
||||||
|
- `scripts/build_feature_store.py`
|
||||||
|
- CLI entrypoint for data fetch + feature persistence.
|
||||||
|
- `docs/architecture.md`
|
||||||
|
- High-level architecture summary.
|
||||||
|
- `docs/developer_guide.md`
|
||||||
|
- This detailed developer-facing explanation.
|
||||||
|
|
||||||
|
### Python package (`src/electricity_price_predictor`)
|
||||||
|
|
||||||
|
- `__init__.py`
|
||||||
|
- Public package exports (`get_engine`, `EntsoeDataService`, `build_feature_frame`).
|
||||||
|
- `db.py`
|
||||||
|
- Builds DB URL from env vars and creates SQLAlchemy `Engine`.
|
||||||
|
- `cache.py`
|
||||||
|
- Implements decorator-based DB cache with deterministic keying.
|
||||||
|
- `entsoe_api.py`
|
||||||
|
- Wraps ENTSO-E API calls, normalizes data, and writes raw observations.
|
||||||
|
- `features.py`
|
||||||
|
- Pure feature engineering logic (residual load, lags, cyclical encoding).
|
||||||
|
- `pipeline.py`
|
||||||
|
- Orchestration layer for end-to-end fetch -> raw persist -> feature build -> feature persist.
|
||||||
|
|
||||||
|
## 2) Runtime execution path (step-by-step)
|
||||||
|
|
||||||
|
When you run:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
PYTHONPATH=src python3 scripts/build_feature_store.py --country-code ... --start ... --end ...
|
||||||
|
```
|
||||||
|
|
||||||
|
Execution sequence:
|
||||||
|
|
||||||
|
1. **Argument parsing**
|
||||||
|
- `build_feature_store.py` reads country code/time range/TTL.
|
||||||
|
2. **Credential/connection bootstrap**
|
||||||
|
- checks `ENTSOE_API_KEY`.
|
||||||
|
- calls `get_engine()` from `db.py`.
|
||||||
|
3. **Pipeline orchestration**
|
||||||
|
- `run_feature_pipeline(...)` in `pipeline.py` starts.
|
||||||
|
4. **API service creation**
|
||||||
|
- initializes `EntsoePandasClient`.
|
||||||
|
- creates `EntsoeDataService(client, engine, cache_ttl_hours)`.
|
||||||
|
5. **Decorator wrapping**
|
||||||
|
- in `EntsoeDataService.__post_init__`, API methods are wrapped by `cache_to_db(...)`.
|
||||||
|
6. **Data retrieval**
|
||||||
|
- `fetch_inputs(...)` calls:
|
||||||
|
- `get_day_ahead_prices(...)`
|
||||||
|
- `get_load_forecast(...)`
|
||||||
|
- `get_wind_solar_forecast(...)`
|
||||||
|
- country aliases are normalized to bidding zones before queries (currently `DE -> DE_LU`, `IT -> IT_NORD`).
|
||||||
|
7. **Cache check/compute loop (per call)**
|
||||||
|
- decorator computes hash key from function + args.
|
||||||
|
- if non-expired row exists in `entsoe_api_cache`: returns payload.
|
||||||
|
- else: reads `electricity_market_observations` for requested timestamps.
|
||||||
|
- if timestamps are missing there, only missing hourly ranges are requested from ENTSO-E.
|
||||||
|
- `NoMatchingDataError` from ENTSO-E is converted to an empty hourly frame for that endpoint/range.
|
||||||
|
- normalized responses coalesce duplicate semantic columns (for example multiple wind/solar columns) via first non-null-per-row.
|
||||||
|
- missing rows are upserted into `electricity_market_observations`.
|
||||||
|
- final merged dataset is stored in `entsoe_api_cache` and returned.
|
||||||
|
8. **Raw persistence**
|
||||||
|
- merged inputs are upserted to `electricity_market_observations`.
|
||||||
|
9. **Feature engineering**
|
||||||
|
- `build_feature_frame(...)` computes:
|
||||||
|
- `residual_load = load - wind - solar`
|
||||||
|
- `lagged_price_1..24`
|
||||||
|
- `lagged_residual_load_1..24`
|
||||||
|
- `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
|
||||||
|
- preserves source missingness as `NaN` (no 0.0 imputation).
|
||||||
|
- drops rows only when `day_ahead_price` / `lagged_price_1..24` are missing (lag warmup requirement).
|
||||||
|
10. **Feature-store persistence**
|
||||||
|
- lags are materialized into PostgreSQL arrays (`DOUBLE PRECISION[]`, length 24).
|
||||||
|
- rows violating NOT NULL core feature constraints are filtered out before upsert.
|
||||||
|
- persistable rows are upserted to `electricity_price_features`.
|
||||||
|
11. **CLI completion**
|
||||||
|
- prints persisted row count.
|
||||||
|
|
||||||
|
## 3) UML diagrams
|
||||||
|
|
||||||
|
## 3.1 Component diagram
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
flowchart LR
|
||||||
|
CLI[scripts/build_feature_store.py] --> PIPE[pipeline.run_feature_pipeline]
|
||||||
|
PIPE --> DBMOD[db.get_engine]
|
||||||
|
PIPE --> SERVICE[EntsoeDataService]
|
||||||
|
SERVICE --> CACHEDEC[cache_to_db decorator]
|
||||||
|
SERVICE --> ENTSOE[EntsoePandasClient]
|
||||||
|
SERVICE --> SECONDARY[electricity_market_observations secondary cache]
|
||||||
|
PIPE --> FEAT[features.build_feature_frame NaN-preserving]
|
||||||
|
FEAT --> PERSIST[pipeline.persist_feature_frame null-filtered]
|
||||||
|
CACHEDEC --> DB[(quant_db.entsoe_api_cache)]
|
||||||
|
SECONDARY --> RAW[(quant_db.electricity_market_observations)]
|
||||||
|
PERSIST --> STORE[(quant_db.electricity_price_features)]
|
||||||
|
```
|
||||||
|
|
||||||
|
## 3.2 Class diagram (logical)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
classDiagram
|
||||||
|
class EntsoeDataService {
|
||||||
|
+client: EntsoePandasClient
|
||||||
|
+engine: Engine
|
||||||
|
+cache_ttl_hours: Optional[int]
|
||||||
|
+fetch_inputs(country_code, start, end) DataFrame
|
||||||
|
+upsert_raw_data(country_code, frame) None
|
||||||
|
-_get_day_ahead_prices_impl(country_code, start, end) Series
|
||||||
|
-_get_load_forecast_impl(country_code, start, end) Series
|
||||||
|
-_get_wind_solar_forecast_impl(country_code, start, end) DataFrame
|
||||||
|
}
|
||||||
|
|
||||||
|
class CacheDecorator {
|
||||||
|
+cache_to_db(engine, namespace, ttl_hours) decorator
|
||||||
|
-_build_cache_key(function_name, args, kwargs) str
|
||||||
|
}
|
||||||
|
|
||||||
|
class FeatureBuilder {
|
||||||
|
+build_feature_frame(inputs, max_lag=24) DataFrame
|
||||||
|
-_cyclical_encode(values, period, prefix) DataFrame
|
||||||
|
}
|
||||||
|
|
||||||
|
class Pipeline {
|
||||||
|
+run_feature_pipeline(engine, entsoe_api_key, country_code, start, end, cache_ttl_hours) DataFrame
|
||||||
|
+persist_feature_frame(engine, country_code, feature_frame) None
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipeline --> EntsoeDataService : uses
|
||||||
|
Pipeline --> FeatureBuilder : uses
|
||||||
|
EntsoeDataService --> CacheDecorator : wraps methods
|
||||||
|
```
|
||||||
|
|
||||||
|
## 3.3 Sequence diagram (single API method with cache)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant Caller as fetch_inputs()
|
||||||
|
participant Decorator as cache_to_db wrapper
|
||||||
|
participant CacheTable as entsoe_api_cache (L1)
|
||||||
|
participant ObsTable as electricity_market_observations (L2)
|
||||||
|
participant API as ENTSO-E API
|
||||||
|
|
||||||
|
Caller->>Decorator: get_day_ahead_prices(country, start, end)
|
||||||
|
Decorator->>CacheTable: SELECT by cache_key and expires_at
|
||||||
|
alt L1 cache hit
|
||||||
|
CacheTable-->>Decorator: payload
|
||||||
|
Decorator-->>Caller: unpickled pandas object
|
||||||
|
else L1 cache miss/expired
|
||||||
|
Decorator->>ObsTable: SELECT existing timestamps
|
||||||
|
alt L2 fully covers range
|
||||||
|
ObsTable-->>Decorator: pandas-compatible rows
|
||||||
|
else L2 has gaps
|
||||||
|
Decorator->>API: query only missing ranges
|
||||||
|
alt API returns data
|
||||||
|
API-->>Decorator: missing rows
|
||||||
|
Decorator->>Decorator: normalize columns + coalesce duplicates
|
||||||
|
Decorator->>ObsTable: UPSERT missing rows
|
||||||
|
else NoMatchingDataError
|
||||||
|
Decorator->>Decorator: synthesize empty hourly frame
|
||||||
|
end
|
||||||
|
end
|
||||||
|
Decorator->>CacheTable: INSERT/UPSERT merged payload
|
||||||
|
Decorator-->>Caller: fresh result
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
## 3.4 State diagram (cache entry lifecycle)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
stateDiagram-v2
|
||||||
|
[*] --> L1Missing
|
||||||
|
L1Missing --> L2Check: cache miss/expiry
|
||||||
|
L2Check --> Fresh: observation table fully covers range
|
||||||
|
L2Check --> Partial: observation table has gaps
|
||||||
|
Partial --> Fresh: fetch missing ranges, upsert L2, upsert L1
|
||||||
|
Fresh --> Fresh: reused before expiry
|
||||||
|
Fresh --> Expired: TTL passes for L1 entry
|
||||||
|
Expired --> L2Check: next call
|
||||||
|
Fresh --> Overwritten: Same key, new payload upsert
|
||||||
|
Overwritten --> Fresh
|
||||||
|
```
|
||||||
|
|
||||||
|
## 3.5 ER diagram (database schema)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
erDiagram
|
||||||
|
entsoe_api_cache {
|
||||||
|
text cache_key PK
|
||||||
|
text namespace
|
||||||
|
text function_name
|
||||||
|
jsonb args_json
|
||||||
|
bytea payload
|
||||||
|
timestamptz created_at
|
||||||
|
timestamptz expires_at
|
||||||
|
}
|
||||||
|
|
||||||
|
electricity_market_observations {
|
||||||
|
text country_code PK
|
||||||
|
timestamptz delivery_start PK
|
||||||
|
float day_ahead_price
|
||||||
|
float load_forecast
|
||||||
|
float wind_forecast
|
||||||
|
float solar_forecast
|
||||||
|
timestamptz ingested_at
|
||||||
|
}
|
||||||
|
|
||||||
|
electricity_price_features {
|
||||||
|
text country_code PK
|
||||||
|
timestamptz delivery_start PK
|
||||||
|
text feature_version PK
|
||||||
|
float day_ahead_price
|
||||||
|
float load_forecast
|
||||||
|
float wind_forecast
|
||||||
|
float solar_forecast
|
||||||
|
float residual_load
|
||||||
|
float[] lagged_price
|
||||||
|
float[] lagged_residual_load
|
||||||
|
float hour_of_day_sin
|
||||||
|
float hour_of_day_cos
|
||||||
|
float weekday_sin
|
||||||
|
float weekday_cos
|
||||||
|
float month_sin
|
||||||
|
float month_cos
|
||||||
|
timestamptz created_at
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 4) How files collaborate
|
||||||
|
|
||||||
|
## 4.1 `db.py` + scripts
|
||||||
|
|
||||||
|
- Scripts never hardcode DB URI; they call `get_engine()`.
|
||||||
|
- `get_engine()` centralizes environment-driven connectivity.
|
||||||
|
|
||||||
|
## 4.2 `cache.py` + `entsoe_api.py`
|
||||||
|
|
||||||
|
- `cache_to_db()` is generic and independent of ENTSO-E specifics.
|
||||||
|
- `EntsoeDataService.__post_init__` binds that generic decorator to each API-fetch method.
|
||||||
|
- Result: all expensive API calls automatically become cache-aware without changing call sites.
|
||||||
|
|
||||||
|
## 4.3 `entsoe_api.py` + `features.py`
|
||||||
|
|
||||||
|
- `entsoe_api.py` guarantees normalized timestamp index and expected source columns.
|
||||||
|
- `features.py` assumes these columns and transforms them to model features only (no DB side effects).
|
||||||
|
|
||||||
|
## 4.4 `features.py` + `pipeline.py`
|
||||||
|
|
||||||
|
- `build_feature_frame()` returns wide DataFrame with `lagged_*_1..24`.
|
||||||
|
- `persist_feature_frame()` converts those to PostgreSQL arrays so table rows stay compact and versioned.
|
||||||
|
|
||||||
|
## 5) Important implementation details
|
||||||
|
|
||||||
|
- **Cache keys are deterministic**
|
||||||
|
- Built from JSON of function name + args + kwargs with stable sorting.
|
||||||
|
- **Cache payload type**
|
||||||
|
- `pickle` stored in `BYTEA` to preserve pandas objects.
|
||||||
|
- **TTL logic**
|
||||||
|
- `expires_at IS NULL` means never expires.
|
||||||
|
- Otherwise must be greater than current UTC time to be considered valid.
|
||||||
|
- **Two-layer cache order**
|
||||||
|
- Layer 1: `entsoe_api_cache` (function-result cache).
|
||||||
|
- Layer 2: `electricity_market_observations` (timestamp-level raw cache).
|
||||||
|
- API calls happen only for Layer-2 gaps.
|
||||||
|
- **Upsert strategy**
|
||||||
|
- Raw and feature tables use `ON CONFLICT ... DO UPDATE` for idempotent reruns.
|
||||||
|
- Raw upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting previously stored values during partial refreshes.
|
||||||
|
- Feature upsert operates on a filtered persistable subset where core NOT NULL columns are present.
|
||||||
|
- **Missingness semantics**
|
||||||
|
- Forecast and derived residual columns preserve `NaN` in memory.
|
||||||
|
- No zero-imputation is performed for missing forecast values.
|
||||||
|
- **Bidding-zone normalization**
|
||||||
|
- `resolve_bidding_zone_code(...)` maps common country aliases to ENTSO-E zone codes.
|
||||||
|
- Pipeline persistence uses the resolved code, ensuring DB keys match actual queried zones.
|
||||||
|
- **Timezone handling**
|
||||||
|
- API index is normalized to UTC to avoid DST ambiguity in lag features.
|
||||||
|
- **Feature warmup**
|
||||||
|
- Rows missing `day_ahead_price` or any `lagged_price_1..24` are dropped because lag history is incomplete.
|
||||||
|
|
||||||
|
## 6) Failure modes and expected behavior
|
||||||
|
|
||||||
|
- Missing `ENTSOE_API_KEY` -> CLI raises early runtime error.
|
||||||
|
- Missing required input columns -> feature builder raises `ValueError`.
|
||||||
|
- Duplicate normalized columns from ENTSO-E payloads -> coalesced before reindexing to avoid pandas duplicate-label reindex errors.
|
||||||
|
- ENTSO-E no-data responses for an endpoint/range -> transformed to empty hourly frames and merged safely.
|
||||||
|
- Empty data frame -> raw/feature persistence functions no-op safely.
|
||||||
|
- Repeated identical request -> cache hit (no API roundtrip).
|
||||||
|
- Expired L1 cache row + full L2 coverage -> no API call required.
|
||||||
|
- Expired L1 cache row + partial L2 coverage -> API called only for missing ranges.
|
||||||
|
|
||||||
|
## 7) Data contracts
|
||||||
|
|
||||||
|
### 7.1 In-memory features contract
|
||||||
|
|
||||||
|
Producer: `run_feature_pipeline(...)` return value (`pd.DataFrame`).
|
||||||
|
|
||||||
|
- **Index contract**
|
||||||
|
- hourly UTC `DatetimeIndex`, sorted ascending.
|
||||||
|
- unique timestamps expected after deduplication.
|
||||||
|
- **Column contract**
|
||||||
|
- base: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
|
||||||
|
- derived: `residual_load`
|
||||||
|
- lag columns: `lagged_price_1..24`, `lagged_residual_load_1..24`
|
||||||
|
- cyclical: `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
|
||||||
|
- **Nullability contract**
|
||||||
|
- required non-null in returned rows: `day_ahead_price`, `lagged_price_1..24`
|
||||||
|
- nullable: `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`
|
||||||
|
- rationale: preserve upstream missingness semantics for analysis and QC.
|
||||||
|
|
||||||
|
### 7.2 Feature-store persistence contract
|
||||||
|
|
||||||
|
Consumer: `electricity_price_features` table.
|
||||||
|
|
||||||
|
- **Primary key contract**
|
||||||
|
- (`country_code`, `delivery_start`, `feature_version`)
|
||||||
|
- **Schema constraint contract**
|
||||||
|
- core numeric columns are `NOT NULL`.
|
||||||
|
- lag arrays are `DOUBLE PRECISION[]` and expected length 24.
|
||||||
|
- **Write-time contract**
|
||||||
|
- `persist_feature_frame(...)` filters rows that violate NOT NULL core columns before UPSERT.
|
||||||
|
- retained rows are idempotently upserted via `ON CONFLICT ... DO UPDATE`.
|
||||||
|
|
||||||
|
### 7.3 Raw-observation contract
|
||||||
|
|
||||||
|
Consumer: `electricity_market_observations` table.
|
||||||
|
|
||||||
|
- **Primary key contract**
|
||||||
|
- (`country_code`, `delivery_start`)
|
||||||
|
- **Merge contract**
|
||||||
|
- upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting prior known values.
|
||||||
|
- **Coverage contract**
|
||||||
|
- secondary cache guarantees fetched payloads are aligned to expected hourly index for the requested `[start, end)` range.
|
||||||
|
|
||||||
|
## 8) Practical debugging checklist
|
||||||
|
|
||||||
|
1. Run `scripts/init_db.py` and ensure tables exist.
|
||||||
|
2. Run one short-range fetch window (1-2 days) first.
|
||||||
|
3. Verify cache growth:
|
||||||
|
- `SELECT namespace, function_name, COUNT(*) FROM entsoe_api_cache GROUP BY 1,2;`
|
||||||
|
4. Verify raw persistence:
|
||||||
|
- `SELECT COUNT(*) FROM electricity_market_observations WHERE country_code = '...';`
|
||||||
|
5. Verify feature persistence:
|
||||||
|
- check lag array sizes are 24 and row count is lower than raw by about 24.
|
||||||
|
|
||||||
|
## 9) Suggested next developer docs to add
|
||||||
|
|
||||||
|
- Data quality rules (acceptable missingness, clipping policy, anomaly handling).
|
||||||
|
- Training-set contract (target definition, split strategy, leakage constraints).
|
||||||
|
- Backfill/replay policy for reprocessing historical periods.
|
||||||
4
electricity_price_predictor/requirements.txt
Normal file
4
electricity_price_predictor/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
entsoe-py
|
||||||
|
pandas
|
||||||
|
sqlalchemy
|
||||||
|
psycopg2-binary
|
||||||
41
electricity_price_predictor/scripts/build_feature_store.py
Normal file
41
electricity_price_predictor/scripts/build_feature_store.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from electricity_price_predictor.db import get_engine
|
||||||
|
from electricity_price_predictor.pipeline import run_feature_pipeline
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
parser = argparse.ArgumentParser(description="Fetch ENTSO-E inputs and build feature store.")
|
||||||
|
parser.add_argument("--country-code", required=True, help="ENTSO-E bidding zone code, e.g. DE_LU")
|
||||||
|
parser.add_argument("--start", required=True, help="Inclusive start datetime, e.g. 2026-01-01T00:00:00Z")
|
||||||
|
parser.add_argument("--end", required=True, help="Exclusive end datetime, e.g. 2026-02-01T00:00:00Z")
|
||||||
|
parser.add_argument("--cache-ttl-hours", type=int, default=24, help="Decorator cache TTL in hours")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = parse_args()
|
||||||
|
api_key = os.getenv("ENTSOE_API_KEY")
|
||||||
|
if not api_key:
|
||||||
|
raise RuntimeError("ENTSOE_API_KEY environment variable is required.")
|
||||||
|
|
||||||
|
engine = get_engine()
|
||||||
|
start = pd.Timestamp(args.start, tz="UTC")
|
||||||
|
end = pd.Timestamp(args.end, tz="UTC")
|
||||||
|
|
||||||
|
features = run_feature_pipeline(
|
||||||
|
engine=engine,
|
||||||
|
entsoe_api_key=api_key,
|
||||||
|
country_code=args.country_code,
|
||||||
|
start=start,
|
||||||
|
end=end,
|
||||||
|
cache_ttl_hours=args.cache_ttl_hours,
|
||||||
|
)
|
||||||
|
print(f"Persisted {len(features)} feature rows for {args.country_code}.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
23
electricity_price_predictor/scripts/init_db.py
Normal file
23
electricity_price_predictor/scripts/init_db.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
from electricity_price_predictor.db import get_engine
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
engine = get_engine()
|
||||||
|
schema_path = Path(__file__).resolve().parents[1] / "sql" / "001_electricity_price_schema.sql"
|
||||||
|
sql = schema_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
with engine.begin() as conn:
|
||||||
|
for statement in sql.split(";"):
|
||||||
|
stmt = statement.strip()
|
||||||
|
if stmt:
|
||||||
|
conn.execute(text(stmt))
|
||||||
|
|
||||||
|
print("Schema initialized for electricity price predictor.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS entsoe_api_cache (
|
||||||
|
cache_key TEXT PRIMARY KEY,
|
||||||
|
namespace TEXT NOT NULL,
|
||||||
|
function_name TEXT NOT NULL,
|
||||||
|
args_json JSONB NOT NULL,
|
||||||
|
payload BYTEA NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
expires_at TIMESTAMPTZ
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_namespace_fn
|
||||||
|
ON entsoe_api_cache(namespace, function_name);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_expires_at
|
||||||
|
ON entsoe_api_cache(expires_at);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS electricity_market_observations (
|
||||||
|
country_code TEXT NOT NULL,
|
||||||
|
delivery_start TIMESTAMPTZ NOT NULL,
|
||||||
|
day_ahead_price DOUBLE PRECISION,
|
||||||
|
load_forecast DOUBLE PRECISION,
|
||||||
|
wind_forecast DOUBLE PRECISION,
|
||||||
|
solar_forecast DOUBLE PRECISION,
|
||||||
|
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (country_code, delivery_start)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_electricity_market_observations_delivery
|
||||||
|
ON electricity_market_observations(delivery_start);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS electricity_price_features (
|
||||||
|
country_code TEXT NOT NULL,
|
||||||
|
delivery_start TIMESTAMPTZ NOT NULL,
|
||||||
|
day_ahead_price DOUBLE PRECISION NOT NULL,
|
||||||
|
load_forecast DOUBLE PRECISION NOT NULL,
|
||||||
|
wind_forecast DOUBLE PRECISION NOT NULL,
|
||||||
|
solar_forecast DOUBLE PRECISION NOT NULL,
|
||||||
|
residual_load DOUBLE PRECISION NOT NULL,
|
||||||
|
lagged_price DOUBLE PRECISION[] NOT NULL,
|
||||||
|
lagged_residual_load DOUBLE PRECISION[] NOT NULL,
|
||||||
|
hour_of_day_sin DOUBLE PRECISION NOT NULL,
|
||||||
|
hour_of_day_cos DOUBLE PRECISION NOT NULL,
|
||||||
|
weekday_sin DOUBLE PRECISION NOT NULL,
|
||||||
|
weekday_cos DOUBLE PRECISION NOT NULL,
|
||||||
|
month_sin DOUBLE PRECISION NOT NULL,
|
||||||
|
month_cos DOUBLE PRECISION NOT NULL,
|
||||||
|
feature_version TEXT NOT NULL DEFAULT 'v1',
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
PRIMARY KEY (country_code, delivery_start, feature_version),
|
||||||
|
CONSTRAINT chk_lagged_price_len CHECK (CARDINALITY(lagged_price) = 24),
|
||||||
|
CONSTRAINT chk_lagged_residual_load_len CHECK (CARDINALITY(lagged_residual_load) = 24)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_electricity_price_features_delivery
|
||||||
|
ON electricity_price_features(delivery_start);
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
"""Electricity price forecasting data pipeline package."""
|
||||||
|
|
||||||
|
from .db import get_engine
|
||||||
|
from .entsoe_api import EntsoeDataService
|
||||||
|
from .features import build_feature_frame
|
||||||
|
|
||||||
|
__all__ = ["EntsoeDataService", "build_feature_frame", "get_engine"]
|
||||||
@@ -0,0 +1,112 @@
|
|||||||
|
import functools
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import pickle
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
|
|
||||||
|
def _json_fallback_serializer(value: Any) -> str:
|
||||||
|
"""Serializer for values that aren't directly JSON serializable."""
|
||||||
|
if hasattr(value, "isoformat"):
|
||||||
|
return value.isoformat()
|
||||||
|
return repr(value)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_cache_key(function_name: str, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
|
||||||
|
payload = json.dumps(
|
||||||
|
{"function_name": function_name, "args": args, "kwargs": kwargs},
|
||||||
|
sort_keys=True,
|
||||||
|
default=_json_fallback_serializer,
|
||||||
|
)
|
||||||
|
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def cache_to_db(
|
||||||
|
engine: Engine,
|
||||||
|
namespace: str,
|
||||||
|
ttl_hours: Optional[int] = None,
|
||||||
|
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
|
||||||
|
"""
|
||||||
|
Cache function output in quant_db.entsoe_api_cache table.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- TTL is optional. If omitted, cached values do not expire.
|
||||||
|
- Cached payload uses pickle so pandas objects can be restored losslessly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||||
|
cache_key = _build_cache_key(f"{namespace}.{func.__name__}", args, kwargs)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
with engine.begin() as conn:
|
||||||
|
result = conn.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
SELECT payload
|
||||||
|
FROM entsoe_api_cache
|
||||||
|
WHERE cache_key = :cache_key
|
||||||
|
AND (expires_at IS NULL OR expires_at > :now_utc)
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
{"cache_key": cache_key, "now_utc": now},
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
if result:
|
||||||
|
return pickle.loads(result[0])
|
||||||
|
|
||||||
|
data = func(*args, **kwargs)
|
||||||
|
expires_at = None
|
||||||
|
if ttl_hours is not None:
|
||||||
|
expires_at = now + timedelta(hours=ttl_hours)
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
INSERT INTO entsoe_api_cache (
|
||||||
|
cache_key,
|
||||||
|
namespace,
|
||||||
|
function_name,
|
||||||
|
args_json,
|
||||||
|
payload,
|
||||||
|
created_at,
|
||||||
|
expires_at
|
||||||
|
) VALUES (
|
||||||
|
:cache_key,
|
||||||
|
:namespace,
|
||||||
|
:function_name,
|
||||||
|
CAST(:args_json AS JSONB),
|
||||||
|
:payload,
|
||||||
|
:created_at,
|
||||||
|
:expires_at
|
||||||
|
)
|
||||||
|
ON CONFLICT (cache_key) DO UPDATE
|
||||||
|
SET payload = EXCLUDED.payload,
|
||||||
|
created_at = EXCLUDED.created_at,
|
||||||
|
expires_at = EXCLUDED.expires_at,
|
||||||
|
args_json = EXCLUDED.args_json
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
{
|
||||||
|
"cache_key": cache_key,
|
||||||
|
"namespace": namespace,
|
||||||
|
"function_name": func.__name__,
|
||||||
|
"args_json": json.dumps(
|
||||||
|
{"args": args, "kwargs": kwargs},
|
||||||
|
default=_json_fallback_serializer,
|
||||||
|
),
|
||||||
|
"payload": pickle.dumps(data),
|
||||||
|
"created_at": now,
|
||||||
|
"expires_at": expires_at,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return data
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
return decorator
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
|
|
||||||
|
def get_database_url() -> str:
|
||||||
|
"""Build database URL from env or fallback defaults."""
|
||||||
|
explicit_url = os.getenv("QUANT_DB_URL")
|
||||||
|
if explicit_url:
|
||||||
|
return explicit_url
|
||||||
|
|
||||||
|
host = os.getenv("QUANT_DB_HOST", "localhost")
|
||||||
|
port = os.getenv("QUANT_DB_PORT", "5432")
|
||||||
|
database = os.getenv("QUANT_DB_NAME", "quant_db")
|
||||||
|
user = os.getenv("QUANT_DB_USER", "quant_user")
|
||||||
|
password = os.getenv("QUANT_DB_PASSWORD", "strong_password")
|
||||||
|
|
||||||
|
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
|
||||||
|
|
||||||
|
|
||||||
|
def get_engine(echo: bool = False) -> Engine:
|
||||||
|
"""Create SQLAlchemy engine for quant_db."""
|
||||||
|
return create_engine(get_database_url(), future=True, echo=echo)
|
||||||
@@ -0,0 +1,393 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from entsoe import EntsoePandasClient
|
||||||
|
from entsoe.exceptions import NoMatchingDataError
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
|
from .cache import cache_to_db
|
||||||
|
|
||||||
|
OBSERVATION_COLUMNS = {
|
||||||
|
"day_ahead_price",
|
||||||
|
"load_forecast",
|
||||||
|
"wind_forecast",
|
||||||
|
"solar_forecast",
|
||||||
|
}
|
||||||
|
|
||||||
|
BIDDING_ZONE_ALIASES = {
|
||||||
|
# ENTSO-E often expects bidding-zone EIC aliases instead of plain country codes.
|
||||||
|
"DE": "DE_LU",
|
||||||
|
"IT": "IT_NORD",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _as_utc_index(series_or_df: pd.Series | pd.DataFrame) -> pd.Series | pd.DataFrame:
|
||||||
|
if series_or_df.index.tz is None:
|
||||||
|
series_or_df.index = series_or_df.index.tz_localize("UTC")
|
||||||
|
else:
|
||||||
|
series_or_df.index = series_or_df.index.tz_convert("UTC")
|
||||||
|
return series_or_df.sort_index()
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_float(value) -> Optional[float]:
|
||||||
|
if pd.isna(value):
|
||||||
|
return None
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_bidding_zone_code(country_code: str) -> str:
|
||||||
|
code = str(country_code).strip().upper()
|
||||||
|
return BIDDING_ZONE_ALIASES.get(code, code)
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_single_column_frame(
|
||||||
|
data: pd.Series | pd.DataFrame,
|
||||||
|
target_column: str,
|
||||||
|
preferred_tokens: tuple[str, ...] = (),
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Normalize ENTSO-E responses that can be either a Series or DataFrame.
|
||||||
|
"""
|
||||||
|
if isinstance(data, pd.Series):
|
||||||
|
series = _as_utc_index(data)
|
||||||
|
series.name = target_column
|
||||||
|
return series.to_frame()
|
||||||
|
|
||||||
|
frame = _as_utc_index(data.copy())
|
||||||
|
if target_column in frame.columns:
|
||||||
|
return frame[[target_column]]
|
||||||
|
|
||||||
|
if len(frame.columns) == 1:
|
||||||
|
return frame.rename(columns={frame.columns[0]: target_column})[[target_column]]
|
||||||
|
|
||||||
|
def _best_column(candidates: list) -> Optional[str]:
|
||||||
|
if not candidates:
|
||||||
|
return None
|
||||||
|
# Prefer the candidate with most available points.
|
||||||
|
return max(candidates, key=lambda col: int(frame[col].notna().sum()))
|
||||||
|
|
||||||
|
lowered = {col: str(col).lower() for col in frame.columns}
|
||||||
|
preferred_candidates = []
|
||||||
|
for token in preferred_tokens:
|
||||||
|
preferred_candidates.extend([col for col, col_lc in lowered.items() if token in col_lc])
|
||||||
|
|
||||||
|
preferred_col = _best_column(preferred_candidates)
|
||||||
|
if preferred_col is not None:
|
||||||
|
return frame.rename(columns={preferred_col: target_column})[[target_column]]
|
||||||
|
|
||||||
|
any_col = _best_column(list(frame.columns))
|
||||||
|
if any_col is not None:
|
||||||
|
return frame.rename(columns={any_col: target_column})[[target_column]]
|
||||||
|
|
||||||
|
first_col = frame.columns[0]
|
||||||
|
return frame.rename(columns={first_col: target_column})[[target_column]]
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_utc_bounds(start: pd.Timestamp, end: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]:
|
||||||
|
start_ts = pd.Timestamp(start)
|
||||||
|
end_ts = pd.Timestamp(end)
|
||||||
|
if start_ts.tz is None:
|
||||||
|
start_ts = start_ts.tz_localize("UTC")
|
||||||
|
else:
|
||||||
|
start_ts = start_ts.tz_convert("UTC")
|
||||||
|
if end_ts.tz is None:
|
||||||
|
end_ts = end_ts.tz_localize("UTC")
|
||||||
|
else:
|
||||||
|
end_ts = end_ts.tz_convert("UTC")
|
||||||
|
return start_ts, end_ts
|
||||||
|
|
||||||
|
|
||||||
|
def _empty_hourly_frame(
|
||||||
|
columns: list[str], start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
start_ts, end_ts = _normalize_utc_bounds(start, end)
|
||||||
|
idx = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
|
||||||
|
return pd.DataFrame(index=idx, columns=columns)
|
||||||
|
|
||||||
|
|
||||||
|
def _coalesce_duplicate_columns(frame: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Collapse duplicate column labels by taking the first non-null per row.
|
||||||
|
"""
|
||||||
|
if frame.columns.is_unique:
|
||||||
|
return frame
|
||||||
|
|
||||||
|
merged: dict[str, pd.Series] = {}
|
||||||
|
for col in frame.columns.unique():
|
||||||
|
same_name = frame.loc[:, frame.columns == col]
|
||||||
|
if isinstance(same_name, pd.Series):
|
||||||
|
merged[str(col)] = same_name
|
||||||
|
else:
|
||||||
|
merged[str(col)] = same_name.bfill(axis=1).iloc[:, 0]
|
||||||
|
return pd.DataFrame(merged, index=frame.index)
|
||||||
|
|
||||||
|
|
||||||
|
def _missing_ranges(missing_index: pd.DatetimeIndex) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
|
||||||
|
if missing_index.empty:
|
||||||
|
return []
|
||||||
|
missing_index = missing_index.sort_values().unique()
|
||||||
|
ranges: list[tuple[pd.Timestamp, pd.Timestamp]] = []
|
||||||
|
current_start = missing_index[0]
|
||||||
|
prev = missing_index[0]
|
||||||
|
step = pd.Timedelta(hours=1)
|
||||||
|
|
||||||
|
for ts in missing_index[1:]:
|
||||||
|
if ts - prev != step:
|
||||||
|
ranges.append((current_start, prev + step))
|
||||||
|
current_start = ts
|
||||||
|
prev = ts
|
||||||
|
ranges.append((current_start, prev + step))
|
||||||
|
return ranges
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EntsoeDataService:
|
||||||
|
client: EntsoePandasClient
|
||||||
|
engine: Engine
|
||||||
|
cache_ttl_hours: Optional[int] = 24
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
self.get_day_ahead_prices = cache_to_db(
|
||||||
|
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
|
||||||
|
)(self._get_day_ahead_prices_impl)
|
||||||
|
self.get_load_forecast = cache_to_db(
|
||||||
|
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
|
||||||
|
)(self._get_load_forecast_impl)
|
||||||
|
self.get_wind_solar_forecast = cache_to_db(
|
||||||
|
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
|
||||||
|
)(self._get_wind_solar_forecast_impl)
|
||||||
|
|
||||||
|
def resolve_country_code(self, country_code: str) -> str:
|
||||||
|
return resolve_bidding_zone_code(country_code)
|
||||||
|
|
||||||
|
def _get_day_ahead_prices_impl(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.Series:
|
||||||
|
df = self._fetch_inputs_with_secondary_cache(
|
||||||
|
country_code=country_code,
|
||||||
|
start=start,
|
||||||
|
end=end,
|
||||||
|
required_columns=["day_ahead_price"],
|
||||||
|
api_fetcher=self._query_day_ahead_prices,
|
||||||
|
)
|
||||||
|
return df["day_ahead_price"]
|
||||||
|
|
||||||
|
def _get_load_forecast_impl(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.Series:
|
||||||
|
df = self._fetch_inputs_with_secondary_cache(
|
||||||
|
country_code=country_code,
|
||||||
|
start=start,
|
||||||
|
end=end,
|
||||||
|
required_columns=["load_forecast"],
|
||||||
|
api_fetcher=self._query_load_forecast,
|
||||||
|
)
|
||||||
|
return df["load_forecast"]
|
||||||
|
|
||||||
|
def _get_wind_solar_forecast_impl(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
return self._fetch_inputs_with_secondary_cache(
|
||||||
|
country_code=country_code,
|
||||||
|
start=start,
|
||||||
|
end=end,
|
||||||
|
required_columns=["wind_forecast", "solar_forecast"],
|
||||||
|
api_fetcher=self._query_wind_solar_forecast,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _query_day_ahead_prices(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
try:
|
||||||
|
raw = self.client.query_day_ahead_prices(country_code, start=start, end=end)
|
||||||
|
except NoMatchingDataError:
|
||||||
|
return _empty_hourly_frame(["day_ahead_price"], start, end)
|
||||||
|
return _coerce_single_column_frame(
|
||||||
|
raw,
|
||||||
|
target_column="day_ahead_price",
|
||||||
|
preferred_tokens=("price", "ahead"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _query_load_forecast(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
try:
|
||||||
|
raw = self.client.query_load_forecast(country_code, start=start, end=end)
|
||||||
|
except NoMatchingDataError:
|
||||||
|
return _empty_hourly_frame(["load_forecast"], start, end)
|
||||||
|
return _coerce_single_column_frame(
|
||||||
|
raw,
|
||||||
|
target_column="load_forecast",
|
||||||
|
preferred_tokens=("load",),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _query_wind_solar_forecast(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
try:
|
||||||
|
df = self.client.query_wind_and_solar_forecast(country_code, start=start, end=end)
|
||||||
|
except NoMatchingDataError:
|
||||||
|
return _empty_hourly_frame(["wind_forecast", "solar_forecast"], start, end)
|
||||||
|
df = _as_utc_index(df)
|
||||||
|
if isinstance(df, pd.Series):
|
||||||
|
df = df.to_frame()
|
||||||
|
|
||||||
|
renamed = {}
|
||||||
|
for column in df.columns:
|
||||||
|
lc = str(column).lower()
|
||||||
|
if "wind" in lc:
|
||||||
|
renamed[column] = "wind_forecast"
|
||||||
|
elif "solar" in lc:
|
||||||
|
renamed[column] = "solar_forecast"
|
||||||
|
df = df.rename(columns=renamed)
|
||||||
|
df = _coalesce_duplicate_columns(df)
|
||||||
|
|
||||||
|
if "wind_forecast" not in df.columns:
|
||||||
|
df["wind_forecast"] = None
|
||||||
|
if "solar_forecast" not in df.columns:
|
||||||
|
df["solar_forecast"] = None
|
||||||
|
|
||||||
|
return df[["wind_forecast", "solar_forecast"]]
|
||||||
|
|
||||||
|
def _load_observations(
|
||||||
|
self,
|
||||||
|
country_code: str,
|
||||||
|
start: pd.Timestamp,
|
||||||
|
end: pd.Timestamp,
|
||||||
|
columns: list[str],
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
invalid = [col for col in columns if col not in OBSERVATION_COLUMNS]
|
||||||
|
if invalid:
|
||||||
|
raise ValueError(f"Unsupported observation columns requested: {invalid}")
|
||||||
|
|
||||||
|
sql_columns = ", ".join(columns)
|
||||||
|
query = text(
|
||||||
|
f"""
|
||||||
|
SELECT delivery_start, {sql_columns}
|
||||||
|
FROM electricity_market_observations
|
||||||
|
WHERE country_code = :country_code
|
||||||
|
AND delivery_start >= :start_ts
|
||||||
|
AND delivery_start < :end_ts
|
||||||
|
ORDER BY delivery_start
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.engine.begin() as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
query,
|
||||||
|
{"country_code": country_code, "start_ts": start, "end_ts": end},
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], tz="UTC"))
|
||||||
|
|
||||||
|
db_frame = pd.DataFrame(rows, columns=["delivery_start", *columns])
|
||||||
|
db_frame["delivery_start"] = pd.to_datetime(db_frame["delivery_start"], utc=True)
|
||||||
|
db_frame = db_frame.set_index("delivery_start").sort_index()
|
||||||
|
return db_frame
|
||||||
|
|
||||||
|
def _fetch_inputs_with_secondary_cache(
|
||||||
|
self,
|
||||||
|
country_code: str,
|
||||||
|
start: pd.Timestamp,
|
||||||
|
end: pd.Timestamp,
|
||||||
|
required_columns: list[str],
|
||||||
|
api_fetcher,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
start_ts, end_ts = _normalize_utc_bounds(start, end)
|
||||||
|
expected_index = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
|
||||||
|
from_observations = self._load_observations(country_code, start_ts, end_ts, required_columns)
|
||||||
|
|
||||||
|
if from_observations.empty:
|
||||||
|
complete_index = pd.DatetimeIndex([], tz="UTC")
|
||||||
|
else:
|
||||||
|
complete_mask = from_observations[required_columns].notna().all(axis=1)
|
||||||
|
complete_index = from_observations.index[complete_mask]
|
||||||
|
|
||||||
|
missing_index = expected_index.difference(complete_index)
|
||||||
|
|
||||||
|
fetched_parts: list[pd.DataFrame] = []
|
||||||
|
for missing_start, missing_end in _missing_ranges(missing_index):
|
||||||
|
fetched = api_fetcher(country_code, missing_start, missing_end)
|
||||||
|
fetched = fetched.reindex(columns=required_columns)
|
||||||
|
fetched_parts.append(fetched)
|
||||||
|
|
||||||
|
if fetched_parts:
|
||||||
|
fetched_frame = pd.concat(fetched_parts).sort_index()
|
||||||
|
self.upsert_raw_data(country_code=country_code, frame=fetched_frame)
|
||||||
|
else:
|
||||||
|
fetched_frame = pd.DataFrame(columns=required_columns, index=pd.DatetimeIndex([], tz="UTC"))
|
||||||
|
|
||||||
|
combined = pd.concat([from_observations, fetched_frame]).sort_index()
|
||||||
|
combined = combined[~combined.index.duplicated(keep="last")]
|
||||||
|
combined = combined.reindex(expected_index)
|
||||||
|
return combined[required_columns]
|
||||||
|
|
||||||
|
def fetch_inputs(
|
||||||
|
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
resolved_country_code = self.resolve_country_code(country_code)
|
||||||
|
price = _coerce_single_column_frame(
|
||||||
|
self.get_day_ahead_prices(resolved_country_code, start, end),
|
||||||
|
target_column="day_ahead_price",
|
||||||
|
preferred_tokens=("price", "ahead"),
|
||||||
|
)
|
||||||
|
load = _coerce_single_column_frame(
|
||||||
|
self.get_load_forecast(resolved_country_code, start, end),
|
||||||
|
target_column="load_forecast",
|
||||||
|
preferred_tokens=("load",),
|
||||||
|
)
|
||||||
|
wind_solar = self.get_wind_solar_forecast(resolved_country_code, start, end)
|
||||||
|
df = price.join(load, how="outer").join(wind_solar, how="outer").sort_index()
|
||||||
|
return df
|
||||||
|
|
||||||
|
def upsert_raw_data(self, country_code: str, frame: pd.DataFrame) -> None:
|
||||||
|
rows = []
|
||||||
|
for ts, row in frame.iterrows():
|
||||||
|
rows.append(
|
||||||
|
{
|
||||||
|
"country_code": country_code,
|
||||||
|
"delivery_start": ts.to_pydatetime(),
|
||||||
|
"day_ahead_price": _safe_float(row.get("day_ahead_price")),
|
||||||
|
"load_forecast": _safe_float(row.get("load_forecast")),
|
||||||
|
"wind_forecast": _safe_float(row.get("wind_forecast")),
|
||||||
|
"solar_forecast": _safe_float(row.get("solar_forecast")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self.engine.begin() as conn:
|
||||||
|
conn.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
INSERT INTO electricity_market_observations (
|
||||||
|
country_code,
|
||||||
|
delivery_start,
|
||||||
|
day_ahead_price,
|
||||||
|
load_forecast,
|
||||||
|
wind_forecast,
|
||||||
|
solar_forecast
|
||||||
|
) VALUES (
|
||||||
|
:country_code,
|
||||||
|
:delivery_start,
|
||||||
|
:day_ahead_price,
|
||||||
|
:load_forecast,
|
||||||
|
:wind_forecast,
|
||||||
|
:solar_forecast
|
||||||
|
)
|
||||||
|
ON CONFLICT (country_code, delivery_start) DO UPDATE
|
||||||
|
SET day_ahead_price = COALESCE(EXCLUDED.day_ahead_price, electricity_market_observations.day_ahead_price),
|
||||||
|
load_forecast = COALESCE(EXCLUDED.load_forecast, electricity_market_observations.load_forecast),
|
||||||
|
wind_forecast = COALESCE(EXCLUDED.wind_forecast, electricity_market_observations.wind_forecast),
|
||||||
|
solar_forecast = COALESCE(EXCLUDED.solar_forecast, electricity_market_observations.solar_forecast),
|
||||||
|
ingested_at = NOW()
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
rows,
|
||||||
|
)
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
def _cyclical_encode(values: pd.Series, period: int, prefix: str) -> pd.DataFrame:
|
||||||
|
angle = 2.0 * math.pi * values / period
|
||||||
|
return pd.DataFrame(
|
||||||
|
{f"{prefix}_sin": angle.apply(math.sin), f"{prefix}_cos": angle.apply(math.cos)},
|
||||||
|
index=values.index,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_feature_frame(inputs: pd.DataFrame, max_lag: int = 24) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Build feature set for electricity price forecasting.
|
||||||
|
|
||||||
|
Included:
|
||||||
|
- day_ahead_price, load_forecast, wind_forecast, solar_forecast
|
||||||
|
- residual_load
|
||||||
|
- lagged_price(t-1..t-24)
|
||||||
|
- lagged_residual_load(t-1..t-24)
|
||||||
|
- hour/week_day/month cyclical encodings
|
||||||
|
"""
|
||||||
|
df = inputs.copy().sort_index()
|
||||||
|
required = {"day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"}
|
||||||
|
missing = required.difference(df.columns)
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"Missing required input columns: {sorted(missing)}")
|
||||||
|
|
||||||
|
# Preserve source missingness semantics for downstream users.
|
||||||
|
# We keep NaNs instead of imputing to 0.0 so missing data is explicit.
|
||||||
|
df["residual_load"] = df["load_forecast"] - df["wind_forecast"] - df["solar_forecast"]
|
||||||
|
|
||||||
|
for lag in range(1, max_lag + 1):
|
||||||
|
df[f"lagged_price_{lag}"] = df["day_ahead_price"].shift(lag)
|
||||||
|
df[f"lagged_residual_load_{lag}"] = df["residual_load"].shift(lag)
|
||||||
|
|
||||||
|
time_index = df.index
|
||||||
|
if time_index.tz is None:
|
||||||
|
time_index = time_index.tz_localize("UTC")
|
||||||
|
else:
|
||||||
|
time_index = time_index.tz_convert("UTC")
|
||||||
|
|
||||||
|
cyclical = [
|
||||||
|
_cyclical_encode(pd.Series(time_index.hour, index=df.index), 24, "hour_of_day"),
|
||||||
|
_cyclical_encode(pd.Series(time_index.weekday, index=df.index), 7, "weekday"),
|
||||||
|
_cyclical_encode(pd.Series(time_index.month - 1, index=df.index), 12, "month"),
|
||||||
|
]
|
||||||
|
for cyc in cyclical:
|
||||||
|
df = df.join(cyc)
|
||||||
|
|
||||||
|
# Only enforce warmup/history constraints for price lags.
|
||||||
|
# Other feature columns can remain NaN when source data is missing.
|
||||||
|
required_for_row = ["day_ahead_price", *[f"lagged_price_{lag}" for lag in range(1, max_lag + 1)]]
|
||||||
|
return df.dropna(subset=required_for_row).sort_index()
|
||||||
@@ -0,0 +1,146 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from entsoe import EntsoePandasClient
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.engine import Engine
|
||||||
|
|
||||||
|
from .entsoe_api import EntsoeDataService
|
||||||
|
from .features import build_feature_frame
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_float(value) -> float | None:
|
||||||
|
if pd.isna(value):
|
||||||
|
return None
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_inputs_have_signal(inputs: pd.DataFrame, country_code: str) -> None:
|
||||||
|
required = ["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"]
|
||||||
|
non_null_counts = {col: int(inputs[col].notna().sum()) for col in required if col in inputs.columns}
|
||||||
|
if non_null_counts and all(count == 0 for count in non_null_counts.values()):
|
||||||
|
raise ValueError(
|
||||||
|
"No ENTSO-E data available for "
|
||||||
|
f"'{country_code}' in the requested time window. "
|
||||||
|
"Try another bidding zone/time range, and ensure the API key has access."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def persist_feature_frame(engine: Engine, country_code: str, feature_frame: pd.DataFrame) -> None:
|
||||||
|
# Feature-store schema expects core numeric fields to be non-null.
|
||||||
|
# Keep NaNs in the returned DataFrame, but skip incomplete rows on DB write.
|
||||||
|
persistable = feature_frame.dropna(
|
||||||
|
subset=["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast", "residual_load"]
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = []
|
||||||
|
for ts, row in persistable.iterrows():
|
||||||
|
lag_price = [float(row[f"lagged_price_{lag}"]) for lag in range(1, 25)]
|
||||||
|
lag_residual = [float(row[f"lagged_residual_load_{lag}"]) for lag in range(1, 25)]
|
||||||
|
|
||||||
|
rows.append(
|
||||||
|
{
|
||||||
|
"country_code": country_code,
|
||||||
|
"delivery_start": ts.to_pydatetime(),
|
||||||
|
"day_ahead_price": float(row["day_ahead_price"]),
|
||||||
|
"load_forecast": _safe_float(row["load_forecast"]),
|
||||||
|
"wind_forecast": _safe_float(row["wind_forecast"]),
|
||||||
|
"solar_forecast": _safe_float(row["solar_forecast"]),
|
||||||
|
"residual_load": _safe_float(row["residual_load"]),
|
||||||
|
"lagged_price": lag_price,
|
||||||
|
"lagged_residual_load": lag_residual,
|
||||||
|
"hour_of_day_sin": float(row["hour_of_day_sin"]),
|
||||||
|
"hour_of_day_cos": float(row["hour_of_day_cos"]),
|
||||||
|
"weekday_sin": float(row["weekday_sin"]),
|
||||||
|
"weekday_cos": float(row["weekday_cos"]),
|
||||||
|
"month_sin": float(row["month_sin"]),
|
||||||
|
"month_cos": float(row["month_cos"]),
|
||||||
|
"feature_version": "v1",
|
||||||
|
"created_at": datetime.now(timezone.utc),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return
|
||||||
|
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
INSERT INTO electricity_price_features (
|
||||||
|
country_code,
|
||||||
|
delivery_start,
|
||||||
|
day_ahead_price,
|
||||||
|
load_forecast,
|
||||||
|
wind_forecast,
|
||||||
|
solar_forecast,
|
||||||
|
residual_load,
|
||||||
|
lagged_price,
|
||||||
|
lagged_residual_load,
|
||||||
|
hour_of_day_sin,
|
||||||
|
hour_of_day_cos,
|
||||||
|
weekday_sin,
|
||||||
|
weekday_cos,
|
||||||
|
month_sin,
|
||||||
|
month_cos,
|
||||||
|
feature_version,
|
||||||
|
created_at
|
||||||
|
) VALUES (
|
||||||
|
:country_code,
|
||||||
|
:delivery_start,
|
||||||
|
:day_ahead_price,
|
||||||
|
:load_forecast,
|
||||||
|
:wind_forecast,
|
||||||
|
:solar_forecast,
|
||||||
|
:residual_load,
|
||||||
|
:lagged_price,
|
||||||
|
:lagged_residual_load,
|
||||||
|
:hour_of_day_sin,
|
||||||
|
:hour_of_day_cos,
|
||||||
|
:weekday_sin,
|
||||||
|
:weekday_cos,
|
||||||
|
:month_sin,
|
||||||
|
:month_cos,
|
||||||
|
:feature_version,
|
||||||
|
:created_at
|
||||||
|
)
|
||||||
|
ON CONFLICT (country_code, delivery_start, feature_version) DO UPDATE
|
||||||
|
SET day_ahead_price = EXCLUDED.day_ahead_price,
|
||||||
|
load_forecast = EXCLUDED.load_forecast,
|
||||||
|
wind_forecast = EXCLUDED.wind_forecast,
|
||||||
|
solar_forecast = EXCLUDED.solar_forecast,
|
||||||
|
residual_load = EXCLUDED.residual_load,
|
||||||
|
lagged_price = EXCLUDED.lagged_price,
|
||||||
|
lagged_residual_load = EXCLUDED.lagged_residual_load,
|
||||||
|
hour_of_day_sin = EXCLUDED.hour_of_day_sin,
|
||||||
|
hour_of_day_cos = EXCLUDED.hour_of_day_cos,
|
||||||
|
weekday_sin = EXCLUDED.weekday_sin,
|
||||||
|
weekday_cos = EXCLUDED.weekday_cos,
|
||||||
|
month_sin = EXCLUDED.month_sin,
|
||||||
|
month_cos = EXCLUDED.month_cos,
|
||||||
|
created_at = EXCLUDED.created_at
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
rows,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def run_feature_pipeline(
|
||||||
|
engine: Engine,
|
||||||
|
entsoe_api_key: str,
|
||||||
|
country_code: str,
|
||||||
|
start: pd.Timestamp,
|
||||||
|
end: pd.Timestamp,
|
||||||
|
cache_ttl_hours: int = 24,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
client = EntsoePandasClient(api_key=entsoe_api_key)
|
||||||
|
service = EntsoeDataService(client=client, engine=engine, cache_ttl_hours=cache_ttl_hours)
|
||||||
|
resolved_country_code = service.resolve_country_code(country_code)
|
||||||
|
inputs = service.fetch_inputs(country_code=resolved_country_code, start=start, end=end)
|
||||||
|
_validate_inputs_have_signal(inputs, resolved_country_code)
|
||||||
|
service.upsert_raw_data(country_code=resolved_country_code, frame=inputs)
|
||||||
|
features = build_feature_frame(inputs)
|
||||||
|
persist_feature_frame(engine, country_code=resolved_country_code, feature_frame=features)
|
||||||
|
return features
|
||||||
Reference in New Issue
Block a user