Compare commits

4 Commits

Author SHA1 Message Date
ddoebel
9e9eef21a5 Merge branch 'option_pricing_structure' of http://ddoebel.de:3000/ddoebel/pricing into dev
Some checks failed
C++ CI / build (push) Has been cancelled
2026-04-15 11:48:27 +02:00
ddoebel
b2427eaf9d Notebook exploring UBS data 2026-04-15 11:43:37 +02:00
ddoebel
1dd5d8657a Ignore local Python and notebook metadata.
Prevent accidental commits of virtual environments, bytecode caches, and notebook checkpoint artifacts.

Made-with: Cursor
2026-04-15 11:40:31 +02:00
ddoebel
73641b7e5b Add electricity price ingestion and feature pipeline.
Introduce ENTSO-E data retrieval with layered caching, robust bidding-zone and missing-data handling, and persist model-ready features with detailed architecture/developer documentation.

Made-with: Cursor
2026-04-15 11:40:14 +02:00
15 changed files with 1505 additions and 8 deletions

7
.gitignore vendored
View File

@@ -1,3 +1,10 @@
.venv/
__pycache__/
*.py[cod]
*.pyo
.ipynb_checkpoints/
# Built Python extension dropped next to qengine/__init__.py for local dev
/qengine/*.so
/qengine/*.dylib

View File

@@ -0,0 +1,164 @@
# Electricity Price Predictor
Standalone module for ENTSO-E ingestion and feature-store creation in `quant_db`.
## Documentation
- High-level architecture: `docs/architecture.md`
- Detailed developer documentation (file-by-file + UML): `docs/developer_guide.md`
## What it builds
- Input columns:
- `day_ahead_price`
- `load_forecast`
- `wind_forecast`
- `solar_forecast`
- Derived columns:
- `residual_load`
- `lagged_price` (`t-1` ... `t-24`, stored as array length 24)
- `lagged_residual_load` (`t-1` ... `t-24`, stored as array length 24)
- `hour_of_day_sin`, `hour_of_day_cos`
- `weekday_sin`, `weekday_cos`
- `month_sin`, `month_cos`
## Column units (data dictionary)
All timestamps are hourly and stored in UTC (`delivery_start`).
- `day_ahead_price`
- Unit: `EUR/MWh` (euros per megawatt-hour).
- `load_forecast`
- Unit: `MW` (megawatts).
- `wind_forecast`
- Unit: `MW` (megawatts).
- `solar_forecast`
- Unit: `MW` (megawatts).
- `residual_load = load_forecast - wind_forecast - solar_forecast`
- Unit: `MW` (megawatts).
- `lagged_price` (array of 24 values for `t-1..t-24`)
- Unit: `EUR/MWh`.
- `lagged_residual_load` (array of 24 values for `t-1..t-24`)
- Unit: `MW`.
- `hour_of_day_sin`, `hour_of_day_cos`
- Unit: dimensionless in `[-1, 1]`.
- `weekday_sin`, `weekday_cos`
- Unit: dimensionless in `[-1, 1]`.
- `month_sin`, `month_cos`
- Unit: dimensionless in `[-1, 1]`.
## Missing-data semantics
The pipeline intentionally distinguishes **missing** from **measured zero**:
- Forecast columns (`load_forecast`, `wind_forecast`, `solar_forecast`) remain `NaN` when ENTSO-E has no value.
- `residual_load` is computed directly from source columns and remains `NaN` when any source component is missing.
- Feature engineering drops rows only for lag warmup requirements (`day_ahead_price` and `lagged_price_1..24`), not for every nullable forecast column.
- During DB persistence to `electricity_price_features`, rows with nulls in NOT NULL core columns are skipped (to satisfy schema constraints) while still being available in the returned in-memory DataFrame.
## Data contracts
### In-memory contract (`run_feature_pipeline(...)` return value)
- Index:
- Type: timezone-aware `DatetimeIndex`
- Granularity: hourly
- Timezone: UTC
- Uniqueness: unique timestamps expected
- Columns:
- Base signals: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
- Derived: `residual_load`
- Lag vectors (expanded): `lagged_price_1..24`, `lagged_residual_load_1..24`
- Cyclical: `hour_of_day_sin`, `hour_of_day_cos`, `weekday_sin`, `weekday_cos`, `month_sin`, `month_cos`
- Nullability:
- `day_ahead_price`: required for returned rows.
- `lagged_price_1..24`: required for returned rows.
- `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`: nullable (`NaN` allowed).
### Persistence contract (`electricity_price_features`)
- Persisted row key:
- (`country_code`, `delivery_start`, `feature_version`)
- NOT NULL core columns in schema:
- `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`
- `lagged_price`, `lagged_residual_load`
- cyclical columns (`hour_of_day_*`, `weekday_*`, `month_*`)
- Write behavior:
- The persistence layer filters out non-conforming rows (nulls in NOT NULL core columns) before UPSERT.
- Persisted lag arrays are fixed length 24 and map to `t-1..t-24`.
### Raw observations contract (`electricity_market_observations`)
- Indexing/key semantics:
- one row per (`country_code`, `delivery_start`)
- Update semantics:
- partial refreshes do not overwrite existing non-null values with nulls (`COALESCE` merge policy)
## Country-code and API behavior notes
- Use ENTSO-E bidding-zone identifiers (for example `DE_LU` rather than `DE`) when querying across all endpoints.
- The pipeline includes a bidding-zone resolver for common country aliases:
- `DE -> DE_LU`
- `IT -> IT_NORD`
- Persistence uses the resolved bidding-zone code so DB keys match the queried market zone.
- Some ENTSO-E endpoints may return no matches for specific windows/countries. These are handled as empty hourly frames for that endpoint rather than hard-failing the whole fetch.
- Wind/solar responses can include duplicate semantic columns after normalization; the service coalesces duplicates by taking the first non-null value per timestamp.
## Database objects
`sql/001_electricity_price_schema.sql` creates:
- `entsoe_api_cache`: generic decorator cache table (pickled payloads, TTL support)
- `electricity_market_observations`: raw hourly ENTSO-E observations
- `electricity_price_features`: model-ready feature store
## Setup
```bash
cd electricity_price_predictor
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Set env vars:
```bash
export ENTSOE_API_KEY="your_entsoe_key"
export QUANT_DB_HOST="localhost"
export QUANT_DB_PORT="5432"
export QUANT_DB_NAME="quant_db"
export QUANT_DB_USER="quant_user"
export QUANT_DB_PASSWORD="strong_password"
```
## Initialize schema
```bash
PYTHONPATH=src python scripts/init_db.py
```
## Build feature store
```bash
PYTHONPATH=src python scripts/build_feature_store.py \
--country-code DE_LU \
--start 2026-01-01T00:00:00Z \
--end 2026-02-01T00:00:00Z \
--cache-ttl-hours 24
```
## Decorator caching behavior
The `cache_to_db` decorator:
- hashes function name + arguments into deterministic `cache_key`
- checks `entsoe_api_cache` first
- returns cached payload if key exists and not expired
- otherwise uses `electricity_market_observations` as secondary cache at timestamp level
- only calls ENTSO-E for missing hourly intervals, then upserts those rows
- stores final returned object in `entsoe_api_cache`
This gives a two-layer cache:
1) fast function-result cache (`entsoe_api_cache`) and
2) canonical timestamp cache (`electricity_market_observations`).

View File

@@ -0,0 +1,73 @@
# Architecture Notes
This document is the quick architecture map. For full file-by-file implementation details, see `docs/developer_guide.md`.
## End-to-end data flow
1. `scripts/build_feature_store.py` parses CLI arguments and validates env vars.
2. It calls `pipeline.run_feature_pipeline(...)`.
3. `EntsoeDataService.fetch_inputs(...)` loads:
- `day_ahead_price`
- `load_forecast`
- `wind_forecast`
- `solar_forecast`
4. Each ENTSO-E call is wrapped by `cache_to_db(...)` and either:
- serves a hit from `entsoe_api_cache`, or
- falls back to `electricity_market_observations` for already-known timestamps,
- and performs API calls only for missing hourly intervals.
5. Missing intervals returned from API are upserted into `electricity_market_observations`.
6. The final merged result is cached in `entsoe_api_cache`.
7. Raw merged series are upserted to `electricity_market_observations`.
8. `features.build_feature_frame(...)` computes:
- `residual_load`
- lagged arrays (24 values each)
- cyclical encodings for hour/weekday/month
- preserves `NaN` for missing forecast-derived values.
9. `pipeline.persist_feature_frame(...)` upserts model-ready rows to `electricity_price_features`.
- filters out rows that violate feature-table NOT NULL constraints.
## Process diagram
```mermaid
flowchart TD
A[build_feature_store.py CLI] --> B[run_feature_pipeline]
B --> C[EntsoeDataService.fetch_inputs]
C --> D{Hit in entsoe_api_cache?}
D -->|Yes| E[Load payload from entsoe_api_cache]
D -->|No| F[Read electricity_market_observations]
F --> G{Missing hourly timestamps?}
G -->|No| H[Reuse DB observation rows]
G -->|Yes| I[Call ENTSO-E only for missing ranges]
I --> I2{NoMatchingDataError?}
I2 -->|Yes| I3[Use empty hourly frame for endpoint]
I2 -->|No| I4[Normalize payload]
I4 --> I5[Coalesce duplicate columns by first non-null]
I3 --> J[Upsert missing rows to electricity_market_observations]
I5 --> J
H --> K[Build merged input DataFrame]
J --> K
K --> L[Store payload in entsoe_api_cache]
E --> M[Use cached input DataFrame]
L --> N[Upsert electricity_market_observations]
M --> N
N --> O[build_feature_frame]
O --> P[Create lags + cyclical features]
P --> P2[Preserve NaN in forecast-derived columns]
P2 --> P3[Drop rows missing day_ahead_price or lagged_price_1..24]
P3 --> Q[Upsert persistable subset into electricity_price_features]
```
## Key design reasons
- DB cache avoids repeated ENTSO-E calls during iterative model work.
- Observation-table fallback avoids re-fetching timestamps already persisted once.
- Pickled payloads preserve exact pandas object shape and index information.
- Feature table stores fixed-size lag arrays so one row corresponds to one prediction timestamp.
- Missing forecasts are kept as `NaN` in analysis outputs, avoiding misleading zero-imputation.
- Persistence layer enforces schema compatibility by skipping rows with nulls in NOT NULL feature columns.
## Extension points
- Add label/target tables (`t+1`, `t+24`, etc.).
- Add training metadata + model registry tables.
- Add partitioning strategy for multi-year production-scale data.

View File

@@ -0,0 +1,362 @@
# Developer Guide (Deep Dive)
This guide explains each file in the module, execution order, control flow, and data/state transitions so you can reason about behavior without reading source code.
## 1) Directory map and responsibilities
### Top-level
- `requirements.txt`
- Python dependencies for ingestion and DB persistence.
- `README.md`
- Operator-focused setup and run commands.
- `sql/001_electricity_price_schema.sql`
- DDL for cache, raw observations, and feature store.
- `scripts/init_db.py`
- Applies the SQL schema to `quant_db`.
- `scripts/build_feature_store.py`
- CLI entrypoint for data fetch + feature persistence.
- `docs/architecture.md`
- High-level architecture summary.
- `docs/developer_guide.md`
- This detailed developer-facing explanation.
### Python package (`src/electricity_price_predictor`)
- `__init__.py`
- Public package exports (`get_engine`, `EntsoeDataService`, `build_feature_frame`).
- `db.py`
- Builds DB URL from env vars and creates SQLAlchemy `Engine`.
- `cache.py`
- Implements decorator-based DB cache with deterministic keying.
- `entsoe_api.py`
- Wraps ENTSO-E API calls, normalizes data, and writes raw observations.
- `features.py`
- Pure feature engineering logic (residual load, lags, cyclical encoding).
- `pipeline.py`
- Orchestration layer for end-to-end fetch -> raw persist -> feature build -> feature persist.
## 2) Runtime execution path (step-by-step)
When you run:
```bash
PYTHONPATH=src python3 scripts/build_feature_store.py --country-code ... --start ... --end ...
```
Execution sequence:
1. **Argument parsing**
- `build_feature_store.py` reads country code/time range/TTL.
2. **Credential/connection bootstrap**
- checks `ENTSOE_API_KEY`.
- calls `get_engine()` from `db.py`.
3. **Pipeline orchestration**
- `run_feature_pipeline(...)` in `pipeline.py` starts.
4. **API service creation**
- initializes `EntsoePandasClient`.
- creates `EntsoeDataService(client, engine, cache_ttl_hours)`.
5. **Decorator wrapping**
- in `EntsoeDataService.__post_init__`, API methods are wrapped by `cache_to_db(...)`.
6. **Data retrieval**
- `fetch_inputs(...)` calls:
- `get_day_ahead_prices(...)`
- `get_load_forecast(...)`
- `get_wind_solar_forecast(...)`
- country aliases are normalized to bidding zones before queries (currently `DE -> DE_LU`, `IT -> IT_NORD`).
7. **Cache check/compute loop (per call)**
- decorator computes hash key from function + args.
- if non-expired row exists in `entsoe_api_cache`: returns payload.
- else: reads `electricity_market_observations` for requested timestamps.
- if timestamps are missing there, only missing hourly ranges are requested from ENTSO-E.
- `NoMatchingDataError` from ENTSO-E is converted to an empty hourly frame for that endpoint/range.
- normalized responses coalesce duplicate semantic columns (for example multiple wind/solar columns) via first non-null-per-row.
- missing rows are upserted into `electricity_market_observations`.
- final merged dataset is stored in `entsoe_api_cache` and returned.
8. **Raw persistence**
- merged inputs are upserted to `electricity_market_observations`.
9. **Feature engineering**
- `build_feature_frame(...)` computes:
- `residual_load = load - wind - solar`
- `lagged_price_1..24`
- `lagged_residual_load_1..24`
- `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
- preserves source missingness as `NaN` (no 0.0 imputation).
- drops rows only when `day_ahead_price` / `lagged_price_1..24` are missing (lag warmup requirement).
10. **Feature-store persistence**
- lags are materialized into PostgreSQL arrays (`DOUBLE PRECISION[]`, length 24).
- rows violating NOT NULL core feature constraints are filtered out before upsert.
- persistable rows are upserted to `electricity_price_features`.
11. **CLI completion**
- prints persisted row count.
## 3) UML diagrams
## 3.1 Component diagram
```mermaid
flowchart LR
CLI[scripts/build_feature_store.py] --> PIPE[pipeline.run_feature_pipeline]
PIPE --> DBMOD[db.get_engine]
PIPE --> SERVICE[EntsoeDataService]
SERVICE --> CACHEDEC[cache_to_db decorator]
SERVICE --> ENTSOE[EntsoePandasClient]
SERVICE --> SECONDARY[electricity_market_observations secondary cache]
PIPE --> FEAT[features.build_feature_frame NaN-preserving]
FEAT --> PERSIST[pipeline.persist_feature_frame null-filtered]
CACHEDEC --> DB[(quant_db.entsoe_api_cache)]
SECONDARY --> RAW[(quant_db.electricity_market_observations)]
PERSIST --> STORE[(quant_db.electricity_price_features)]
```
## 3.2 Class diagram (logical)
```mermaid
classDiagram
class EntsoeDataService {
+client: EntsoePandasClient
+engine: Engine
+cache_ttl_hours: Optional[int]
+fetch_inputs(country_code, start, end) DataFrame
+upsert_raw_data(country_code, frame) None
-_get_day_ahead_prices_impl(country_code, start, end) Series
-_get_load_forecast_impl(country_code, start, end) Series
-_get_wind_solar_forecast_impl(country_code, start, end) DataFrame
}
class CacheDecorator {
+cache_to_db(engine, namespace, ttl_hours) decorator
-_build_cache_key(function_name, args, kwargs) str
}
class FeatureBuilder {
+build_feature_frame(inputs, max_lag=24) DataFrame
-_cyclical_encode(values, period, prefix) DataFrame
}
class Pipeline {
+run_feature_pipeline(engine, entsoe_api_key, country_code, start, end, cache_ttl_hours) DataFrame
+persist_feature_frame(engine, country_code, feature_frame) None
}
Pipeline --> EntsoeDataService : uses
Pipeline --> FeatureBuilder : uses
EntsoeDataService --> CacheDecorator : wraps methods
```
## 3.3 Sequence diagram (single API method with cache)
```mermaid
sequenceDiagram
participant Caller as fetch_inputs()
participant Decorator as cache_to_db wrapper
participant CacheTable as entsoe_api_cache (L1)
participant ObsTable as electricity_market_observations (L2)
participant API as ENTSO-E API
Caller->>Decorator: get_day_ahead_prices(country, start, end)
Decorator->>CacheTable: SELECT by cache_key and expires_at
alt L1 cache hit
CacheTable-->>Decorator: payload
Decorator-->>Caller: unpickled pandas object
else L1 cache miss/expired
Decorator->>ObsTable: SELECT existing timestamps
alt L2 fully covers range
ObsTable-->>Decorator: pandas-compatible rows
else L2 has gaps
Decorator->>API: query only missing ranges
alt API returns data
API-->>Decorator: missing rows
Decorator->>Decorator: normalize columns + coalesce duplicates
Decorator->>ObsTable: UPSERT missing rows
else NoMatchingDataError
Decorator->>Decorator: synthesize empty hourly frame
end
end
Decorator->>CacheTable: INSERT/UPSERT merged payload
Decorator-->>Caller: fresh result
end
```
## 3.4 State diagram (cache entry lifecycle)
```mermaid
stateDiagram-v2
[*] --> L1Missing
L1Missing --> L2Check: cache miss/expiry
L2Check --> Fresh: observation table fully covers range
L2Check --> Partial: observation table has gaps
Partial --> Fresh: fetch missing ranges, upsert L2, upsert L1
Fresh --> Fresh: reused before expiry
Fresh --> Expired: TTL passes for L1 entry
Expired --> L2Check: next call
Fresh --> Overwritten: Same key, new payload upsert
Overwritten --> Fresh
```
## 3.5 ER diagram (database schema)
```mermaid
erDiagram
entsoe_api_cache {
text cache_key PK
text namespace
text function_name
jsonb args_json
bytea payload
timestamptz created_at
timestamptz expires_at
}
electricity_market_observations {
text country_code PK
timestamptz delivery_start PK
float day_ahead_price
float load_forecast
float wind_forecast
float solar_forecast
timestamptz ingested_at
}
electricity_price_features {
text country_code PK
timestamptz delivery_start PK
text feature_version PK
float day_ahead_price
float load_forecast
float wind_forecast
float solar_forecast
float residual_load
float[] lagged_price
float[] lagged_residual_load
float hour_of_day_sin
float hour_of_day_cos
float weekday_sin
float weekday_cos
float month_sin
float month_cos
timestamptz created_at
}
```
## 4) How files collaborate
## 4.1 `db.py` + scripts
- Scripts never hardcode DB URI; they call `get_engine()`.
- `get_engine()` centralizes environment-driven connectivity.
## 4.2 `cache.py` + `entsoe_api.py`
- `cache_to_db()` is generic and independent of ENTSO-E specifics.
- `EntsoeDataService.__post_init__` binds that generic decorator to each API-fetch method.
- Result: all expensive API calls automatically become cache-aware without changing call sites.
## 4.3 `entsoe_api.py` + `features.py`
- `entsoe_api.py` guarantees normalized timestamp index and expected source columns.
- `features.py` assumes these columns and transforms them to model features only (no DB side effects).
## 4.4 `features.py` + `pipeline.py`
- `build_feature_frame()` returns wide DataFrame with `lagged_*_1..24`.
- `persist_feature_frame()` converts those to PostgreSQL arrays so table rows stay compact and versioned.
## 5) Important implementation details
- **Cache keys are deterministic**
- Built from JSON of function name + args + kwargs with stable sorting.
- **Cache payload type**
- `pickle` stored in `BYTEA` to preserve pandas objects.
- **TTL logic**
- `expires_at IS NULL` means never expires.
- Otherwise must be greater than current UTC time to be considered valid.
- **Two-layer cache order**
- Layer 1: `entsoe_api_cache` (function-result cache).
- Layer 2: `electricity_market_observations` (timestamp-level raw cache).
- API calls happen only for Layer-2 gaps.
- **Upsert strategy**
- Raw and feature tables use `ON CONFLICT ... DO UPDATE` for idempotent reruns.
- Raw upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting previously stored values during partial refreshes.
- Feature upsert operates on a filtered persistable subset where core NOT NULL columns are present.
- **Missingness semantics**
- Forecast and derived residual columns preserve `NaN` in memory.
- No zero-imputation is performed for missing forecast values.
- **Bidding-zone normalization**
- `resolve_bidding_zone_code(...)` maps common country aliases to ENTSO-E zone codes.
- Pipeline persistence uses the resolved code, ensuring DB keys match actual queried zones.
- **Timezone handling**
- API index is normalized to UTC to avoid DST ambiguity in lag features.
- **Feature warmup**
- Rows missing `day_ahead_price` or any `lagged_price_1..24` are dropped because lag history is incomplete.
## 6) Failure modes and expected behavior
- Missing `ENTSOE_API_KEY` -> CLI raises early runtime error.
- Missing required input columns -> feature builder raises `ValueError`.
- Duplicate normalized columns from ENTSO-E payloads -> coalesced before reindexing to avoid pandas duplicate-label reindex errors.
- ENTSO-E no-data responses for an endpoint/range -> transformed to empty hourly frames and merged safely.
- Empty data frame -> raw/feature persistence functions no-op safely.
- Repeated identical request -> cache hit (no API roundtrip).
- Expired L1 cache row + full L2 coverage -> no API call required.
- Expired L1 cache row + partial L2 coverage -> API called only for missing ranges.
## 7) Data contracts
### 7.1 In-memory features contract
Producer: `run_feature_pipeline(...)` return value (`pd.DataFrame`).
- **Index contract**
- hourly UTC `DatetimeIndex`, sorted ascending.
- unique timestamps expected after deduplication.
- **Column contract**
- base: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
- derived: `residual_load`
- lag columns: `lagged_price_1..24`, `lagged_residual_load_1..24`
- cyclical: `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
- **Nullability contract**
- required non-null in returned rows: `day_ahead_price`, `lagged_price_1..24`
- nullable: `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`
- rationale: preserve upstream missingness semantics for analysis and QC.
### 7.2 Feature-store persistence contract
Consumer: `electricity_price_features` table.
- **Primary key contract**
- (`country_code`, `delivery_start`, `feature_version`)
- **Schema constraint contract**
- core numeric columns are `NOT NULL`.
- lag arrays are `DOUBLE PRECISION[]` and expected length 24.
- **Write-time contract**
- `persist_feature_frame(...)` filters rows that violate NOT NULL core columns before UPSERT.
- retained rows are idempotently upserted via `ON CONFLICT ... DO UPDATE`.
### 7.3 Raw-observation contract
Consumer: `electricity_market_observations` table.
- **Primary key contract**
- (`country_code`, `delivery_start`)
- **Merge contract**
- upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting prior known values.
- **Coverage contract**
- secondary cache guarantees fetched payloads are aligned to expected hourly index for the requested `[start, end)` range.
## 8) Practical debugging checklist
1. Run `scripts/init_db.py` and ensure tables exist.
2. Run one short-range fetch window (1-2 days) first.
3. Verify cache growth:
- `SELECT namespace, function_name, COUNT(*) FROM entsoe_api_cache GROUP BY 1,2;`
4. Verify raw persistence:
- `SELECT COUNT(*) FROM electricity_market_observations WHERE country_code = '...';`
5. Verify feature persistence:
- check lag array sizes are 24 and row count is lower than raw by about 24.
## 9) Suggested next developer docs to add
- Data quality rules (acceptable missingness, clipping policy, anomaly handling).
- Training-set contract (target definition, split strategy, leakage constraints).
- Backfill/replay policy for reprocessing historical periods.

View File

@@ -0,0 +1,4 @@
entsoe-py
pandas
sqlalchemy
psycopg2-binary

View File

@@ -0,0 +1,41 @@
import argparse
import os
import pandas as pd
from electricity_price_predictor.db import get_engine
from electricity_price_predictor.pipeline import run_feature_pipeline
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Fetch ENTSO-E inputs and build feature store.")
parser.add_argument("--country-code", required=True, help="ENTSO-E bidding zone code, e.g. DE_LU")
parser.add_argument("--start", required=True, help="Inclusive start datetime, e.g. 2026-01-01T00:00:00Z")
parser.add_argument("--end", required=True, help="Exclusive end datetime, e.g. 2026-02-01T00:00:00Z")
parser.add_argument("--cache-ttl-hours", type=int, default=24, help="Decorator cache TTL in hours")
return parser.parse_args()
def main() -> None:
args = parse_args()
api_key = os.getenv("ENTSOE_API_KEY")
if not api_key:
raise RuntimeError("ENTSOE_API_KEY environment variable is required.")
engine = get_engine()
start = pd.Timestamp(args.start, tz="UTC")
end = pd.Timestamp(args.end, tz="UTC")
features = run_feature_pipeline(
engine=engine,
entsoe_api_key=api_key,
country_code=args.country_code,
start=start,
end=end,
cache_ttl_hours=args.cache_ttl_hours,
)
print(f"Persisted {len(features)} feature rows for {args.country_code}.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,23 @@
from pathlib import Path
from sqlalchemy import text
from electricity_price_predictor.db import get_engine
def main() -> None:
engine = get_engine()
schema_path = Path(__file__).resolve().parents[1] / "sql" / "001_electricity_price_schema.sql"
sql = schema_path.read_text(encoding="utf-8")
with engine.begin() as conn:
for statement in sql.split(";"):
stmt = statement.strip()
if stmt:
conn.execute(text(stmt))
print("Schema initialized for electricity price predictor.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,55 @@
CREATE TABLE IF NOT EXISTS entsoe_api_cache (
cache_key TEXT PRIMARY KEY,
namespace TEXT NOT NULL,
function_name TEXT NOT NULL,
args_json JSONB NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_namespace_fn
ON entsoe_api_cache(namespace, function_name);
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_expires_at
ON entsoe_api_cache(expires_at);
CREATE TABLE IF NOT EXISTS electricity_market_observations (
country_code TEXT NOT NULL,
delivery_start TIMESTAMPTZ NOT NULL,
day_ahead_price DOUBLE PRECISION,
load_forecast DOUBLE PRECISION,
wind_forecast DOUBLE PRECISION,
solar_forecast DOUBLE PRECISION,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (country_code, delivery_start)
);
CREATE INDEX IF NOT EXISTS idx_electricity_market_observations_delivery
ON electricity_market_observations(delivery_start);
CREATE TABLE IF NOT EXISTS electricity_price_features (
country_code TEXT NOT NULL,
delivery_start TIMESTAMPTZ NOT NULL,
day_ahead_price DOUBLE PRECISION NOT NULL,
load_forecast DOUBLE PRECISION NOT NULL,
wind_forecast DOUBLE PRECISION NOT NULL,
solar_forecast DOUBLE PRECISION NOT NULL,
residual_load DOUBLE PRECISION NOT NULL,
lagged_price DOUBLE PRECISION[] NOT NULL,
lagged_residual_load DOUBLE PRECISION[] NOT NULL,
hour_of_day_sin DOUBLE PRECISION NOT NULL,
hour_of_day_cos DOUBLE PRECISION NOT NULL,
weekday_sin DOUBLE PRECISION NOT NULL,
weekday_cos DOUBLE PRECISION NOT NULL,
month_sin DOUBLE PRECISION NOT NULL,
month_cos DOUBLE PRECISION NOT NULL,
feature_version TEXT NOT NULL DEFAULT 'v1',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (country_code, delivery_start, feature_version),
CONSTRAINT chk_lagged_price_len CHECK (CARDINALITY(lagged_price) = 24),
CONSTRAINT chk_lagged_residual_load_len CHECK (CARDINALITY(lagged_residual_load) = 24)
);
CREATE INDEX IF NOT EXISTS idx_electricity_price_features_delivery
ON electricity_price_features(delivery_start);

View File

@@ -0,0 +1,7 @@
"""Electricity price forecasting data pipeline package."""
from .db import get_engine
from .entsoe_api import EntsoeDataService
from .features import build_feature_frame
__all__ = ["EntsoeDataService", "build_feature_frame", "get_engine"]

View File

@@ -0,0 +1,112 @@
import functools
import hashlib
import json
import pickle
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Optional
from sqlalchemy import text
from sqlalchemy.engine import Engine
def _json_fallback_serializer(value: Any) -> str:
"""Serializer for values that aren't directly JSON serializable."""
if hasattr(value, "isoformat"):
return value.isoformat()
return repr(value)
def _build_cache_key(function_name: str, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
payload = json.dumps(
{"function_name": function_name, "args": args, "kwargs": kwargs},
sort_keys=True,
default=_json_fallback_serializer,
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def cache_to_db(
engine: Engine,
namespace: str,
ttl_hours: Optional[int] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Cache function output in quant_db.entsoe_api_cache table.
Notes:
- TTL is optional. If omitted, cached values do not expire.
- Cached payload uses pickle so pandas objects can be restored losslessly.
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
cache_key = _build_cache_key(f"{namespace}.{func.__name__}", args, kwargs)
now = datetime.now(timezone.utc)
with engine.begin() as conn:
result = conn.execute(
text(
"""
SELECT payload
FROM entsoe_api_cache
WHERE cache_key = :cache_key
AND (expires_at IS NULL OR expires_at > :now_utc)
"""
),
{"cache_key": cache_key, "now_utc": now},
).fetchone()
if result:
return pickle.loads(result[0])
data = func(*args, **kwargs)
expires_at = None
if ttl_hours is not None:
expires_at = now + timedelta(hours=ttl_hours)
conn.execute(
text(
"""
INSERT INTO entsoe_api_cache (
cache_key,
namespace,
function_name,
args_json,
payload,
created_at,
expires_at
) VALUES (
:cache_key,
:namespace,
:function_name,
CAST(:args_json AS JSONB),
:payload,
:created_at,
:expires_at
)
ON CONFLICT (cache_key) DO UPDATE
SET payload = EXCLUDED.payload,
created_at = EXCLUDED.created_at,
expires_at = EXCLUDED.expires_at,
args_json = EXCLUDED.args_json
"""
),
{
"cache_key": cache_key,
"namespace": namespace,
"function_name": func.__name__,
"args_json": json.dumps(
{"args": args, "kwargs": kwargs},
default=_json_fallback_serializer,
),
"payload": pickle.dumps(data),
"created_at": now,
"expires_at": expires_at,
},
)
return data
return wrapper
return decorator

View File

@@ -0,0 +1,24 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
def get_database_url() -> str:
"""Build database URL from env or fallback defaults."""
explicit_url = os.getenv("QUANT_DB_URL")
if explicit_url:
return explicit_url
host = os.getenv("QUANT_DB_HOST", "localhost")
port = os.getenv("QUANT_DB_PORT", "5432")
database = os.getenv("QUANT_DB_NAME", "quant_db")
user = os.getenv("QUANT_DB_USER", "quant_user")
password = os.getenv("QUANT_DB_PASSWORD", "strong_password")
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
def get_engine(echo: bool = False) -> Engine:
"""Create SQLAlchemy engine for quant_db."""
return create_engine(get_database_url(), future=True, echo=echo)

View File

@@ -0,0 +1,393 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from entsoe import EntsoePandasClient
from entsoe.exceptions import NoMatchingDataError
from sqlalchemy import text
from sqlalchemy.engine import Engine
from .cache import cache_to_db
OBSERVATION_COLUMNS = {
"day_ahead_price",
"load_forecast",
"wind_forecast",
"solar_forecast",
}
BIDDING_ZONE_ALIASES = {
# ENTSO-E often expects bidding-zone EIC aliases instead of plain country codes.
"DE": "DE_LU",
"IT": "IT_NORD",
}
def _as_utc_index(series_or_df: pd.Series | pd.DataFrame) -> pd.Series | pd.DataFrame:
if series_or_df.index.tz is None:
series_or_df.index = series_or_df.index.tz_localize("UTC")
else:
series_or_df.index = series_or_df.index.tz_convert("UTC")
return series_or_df.sort_index()
def _safe_float(value) -> Optional[float]:
if pd.isna(value):
return None
return float(value)
def resolve_bidding_zone_code(country_code: str) -> str:
code = str(country_code).strip().upper()
return BIDDING_ZONE_ALIASES.get(code, code)
def _coerce_single_column_frame(
data: pd.Series | pd.DataFrame,
target_column: str,
preferred_tokens: tuple[str, ...] = (),
) -> pd.DataFrame:
"""
Normalize ENTSO-E responses that can be either a Series or DataFrame.
"""
if isinstance(data, pd.Series):
series = _as_utc_index(data)
series.name = target_column
return series.to_frame()
frame = _as_utc_index(data.copy())
if target_column in frame.columns:
return frame[[target_column]]
if len(frame.columns) == 1:
return frame.rename(columns={frame.columns[0]: target_column})[[target_column]]
def _best_column(candidates: list) -> Optional[str]:
if not candidates:
return None
# Prefer the candidate with most available points.
return max(candidates, key=lambda col: int(frame[col].notna().sum()))
lowered = {col: str(col).lower() for col in frame.columns}
preferred_candidates = []
for token in preferred_tokens:
preferred_candidates.extend([col for col, col_lc in lowered.items() if token in col_lc])
preferred_col = _best_column(preferred_candidates)
if preferred_col is not None:
return frame.rename(columns={preferred_col: target_column})[[target_column]]
any_col = _best_column(list(frame.columns))
if any_col is not None:
return frame.rename(columns={any_col: target_column})[[target_column]]
first_col = frame.columns[0]
return frame.rename(columns={first_col: target_column})[[target_column]]
def _normalize_utc_bounds(start: pd.Timestamp, end: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]:
start_ts = pd.Timestamp(start)
end_ts = pd.Timestamp(end)
if start_ts.tz is None:
start_ts = start_ts.tz_localize("UTC")
else:
start_ts = start_ts.tz_convert("UTC")
if end_ts.tz is None:
end_ts = end_ts.tz_localize("UTC")
else:
end_ts = end_ts.tz_convert("UTC")
return start_ts, end_ts
def _empty_hourly_frame(
columns: list[str], start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
start_ts, end_ts = _normalize_utc_bounds(start, end)
idx = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
return pd.DataFrame(index=idx, columns=columns)
def _coalesce_duplicate_columns(frame: pd.DataFrame) -> pd.DataFrame:
"""
Collapse duplicate column labels by taking the first non-null per row.
"""
if frame.columns.is_unique:
return frame
merged: dict[str, pd.Series] = {}
for col in frame.columns.unique():
same_name = frame.loc[:, frame.columns == col]
if isinstance(same_name, pd.Series):
merged[str(col)] = same_name
else:
merged[str(col)] = same_name.bfill(axis=1).iloc[:, 0]
return pd.DataFrame(merged, index=frame.index)
def _missing_ranges(missing_index: pd.DatetimeIndex) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
if missing_index.empty:
return []
missing_index = missing_index.sort_values().unique()
ranges: list[tuple[pd.Timestamp, pd.Timestamp]] = []
current_start = missing_index[0]
prev = missing_index[0]
step = pd.Timedelta(hours=1)
for ts in missing_index[1:]:
if ts - prev != step:
ranges.append((current_start, prev + step))
current_start = ts
prev = ts
ranges.append((current_start, prev + step))
return ranges
@dataclass
class EntsoeDataService:
client: EntsoePandasClient
engine: Engine
cache_ttl_hours: Optional[int] = 24
def __post_init__(self) -> None:
self.get_day_ahead_prices = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_day_ahead_prices_impl)
self.get_load_forecast = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_load_forecast_impl)
self.get_wind_solar_forecast = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_wind_solar_forecast_impl)
def resolve_country_code(self, country_code: str) -> str:
return resolve_bidding_zone_code(country_code)
def _get_day_ahead_prices_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.Series:
df = self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["day_ahead_price"],
api_fetcher=self._query_day_ahead_prices,
)
return df["day_ahead_price"]
def _get_load_forecast_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.Series:
df = self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["load_forecast"],
api_fetcher=self._query_load_forecast,
)
return df["load_forecast"]
def _get_wind_solar_forecast_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
return self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["wind_forecast", "solar_forecast"],
api_fetcher=self._query_wind_solar_forecast,
)
def _query_day_ahead_prices(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
raw = self.client.query_day_ahead_prices(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["day_ahead_price"], start, end)
return _coerce_single_column_frame(
raw,
target_column="day_ahead_price",
preferred_tokens=("price", "ahead"),
)
def _query_load_forecast(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
raw = self.client.query_load_forecast(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["load_forecast"], start, end)
return _coerce_single_column_frame(
raw,
target_column="load_forecast",
preferred_tokens=("load",),
)
def _query_wind_solar_forecast(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
df = self.client.query_wind_and_solar_forecast(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["wind_forecast", "solar_forecast"], start, end)
df = _as_utc_index(df)
if isinstance(df, pd.Series):
df = df.to_frame()
renamed = {}
for column in df.columns:
lc = str(column).lower()
if "wind" in lc:
renamed[column] = "wind_forecast"
elif "solar" in lc:
renamed[column] = "solar_forecast"
df = df.rename(columns=renamed)
df = _coalesce_duplicate_columns(df)
if "wind_forecast" not in df.columns:
df["wind_forecast"] = None
if "solar_forecast" not in df.columns:
df["solar_forecast"] = None
return df[["wind_forecast", "solar_forecast"]]
def _load_observations(
self,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
columns: list[str],
) -> pd.DataFrame:
invalid = [col for col in columns if col not in OBSERVATION_COLUMNS]
if invalid:
raise ValueError(f"Unsupported observation columns requested: {invalid}")
sql_columns = ", ".join(columns)
query = text(
f"""
SELECT delivery_start, {sql_columns}
FROM electricity_market_observations
WHERE country_code = :country_code
AND delivery_start >= :start_ts
AND delivery_start < :end_ts
ORDER BY delivery_start
"""
)
with self.engine.begin() as conn:
rows = conn.execute(
query,
{"country_code": country_code, "start_ts": start, "end_ts": end},
).fetchall()
if not rows:
return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], tz="UTC"))
db_frame = pd.DataFrame(rows, columns=["delivery_start", *columns])
db_frame["delivery_start"] = pd.to_datetime(db_frame["delivery_start"], utc=True)
db_frame = db_frame.set_index("delivery_start").sort_index()
return db_frame
def _fetch_inputs_with_secondary_cache(
self,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
required_columns: list[str],
api_fetcher,
) -> pd.DataFrame:
start_ts, end_ts = _normalize_utc_bounds(start, end)
expected_index = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
from_observations = self._load_observations(country_code, start_ts, end_ts, required_columns)
if from_observations.empty:
complete_index = pd.DatetimeIndex([], tz="UTC")
else:
complete_mask = from_observations[required_columns].notna().all(axis=1)
complete_index = from_observations.index[complete_mask]
missing_index = expected_index.difference(complete_index)
fetched_parts: list[pd.DataFrame] = []
for missing_start, missing_end in _missing_ranges(missing_index):
fetched = api_fetcher(country_code, missing_start, missing_end)
fetched = fetched.reindex(columns=required_columns)
fetched_parts.append(fetched)
if fetched_parts:
fetched_frame = pd.concat(fetched_parts).sort_index()
self.upsert_raw_data(country_code=country_code, frame=fetched_frame)
else:
fetched_frame = pd.DataFrame(columns=required_columns, index=pd.DatetimeIndex([], tz="UTC"))
combined = pd.concat([from_observations, fetched_frame]).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
combined = combined.reindex(expected_index)
return combined[required_columns]
def fetch_inputs(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
resolved_country_code = self.resolve_country_code(country_code)
price = _coerce_single_column_frame(
self.get_day_ahead_prices(resolved_country_code, start, end),
target_column="day_ahead_price",
preferred_tokens=("price", "ahead"),
)
load = _coerce_single_column_frame(
self.get_load_forecast(resolved_country_code, start, end),
target_column="load_forecast",
preferred_tokens=("load",),
)
wind_solar = self.get_wind_solar_forecast(resolved_country_code, start, end)
df = price.join(load, how="outer").join(wind_solar, how="outer").sort_index()
return df
def upsert_raw_data(self, country_code: str, frame: pd.DataFrame) -> None:
rows = []
for ts, row in frame.iterrows():
rows.append(
{
"country_code": country_code,
"delivery_start": ts.to_pydatetime(),
"day_ahead_price": _safe_float(row.get("day_ahead_price")),
"load_forecast": _safe_float(row.get("load_forecast")),
"wind_forecast": _safe_float(row.get("wind_forecast")),
"solar_forecast": _safe_float(row.get("solar_forecast")),
}
)
if not rows:
return
with self.engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO electricity_market_observations (
country_code,
delivery_start,
day_ahead_price,
load_forecast,
wind_forecast,
solar_forecast
) VALUES (
:country_code,
:delivery_start,
:day_ahead_price,
:load_forecast,
:wind_forecast,
:solar_forecast
)
ON CONFLICT (country_code, delivery_start) DO UPDATE
SET day_ahead_price = COALESCE(EXCLUDED.day_ahead_price, electricity_market_observations.day_ahead_price),
load_forecast = COALESCE(EXCLUDED.load_forecast, electricity_market_observations.load_forecast),
wind_forecast = COALESCE(EXCLUDED.wind_forecast, electricity_market_observations.wind_forecast),
solar_forecast = COALESCE(EXCLUDED.solar_forecast, electricity_market_observations.solar_forecast),
ingested_at = NOW()
"""
),
rows,
)

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
import math
import pandas as pd
def _cyclical_encode(values: pd.Series, period: int, prefix: str) -> pd.DataFrame:
angle = 2.0 * math.pi * values / period
return pd.DataFrame(
{f"{prefix}_sin": angle.apply(math.sin), f"{prefix}_cos": angle.apply(math.cos)},
index=values.index,
)
def build_feature_frame(inputs: pd.DataFrame, max_lag: int = 24) -> pd.DataFrame:
"""
Build feature set for electricity price forecasting.
Included:
- day_ahead_price, load_forecast, wind_forecast, solar_forecast
- residual_load
- lagged_price(t-1..t-24)
- lagged_residual_load(t-1..t-24)
- hour/week_day/month cyclical encodings
"""
df = inputs.copy().sort_index()
required = {"day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"}
missing = required.difference(df.columns)
if missing:
raise ValueError(f"Missing required input columns: {sorted(missing)}")
# Preserve source missingness semantics for downstream users.
# We keep NaNs instead of imputing to 0.0 so missing data is explicit.
df["residual_load"] = df["load_forecast"] - df["wind_forecast"] - df["solar_forecast"]
for lag in range(1, max_lag + 1):
df[f"lagged_price_{lag}"] = df["day_ahead_price"].shift(lag)
df[f"lagged_residual_load_{lag}"] = df["residual_load"].shift(lag)
time_index = df.index
if time_index.tz is None:
time_index = time_index.tz_localize("UTC")
else:
time_index = time_index.tz_convert("UTC")
cyclical = [
_cyclical_encode(pd.Series(time_index.hour, index=df.index), 24, "hour_of_day"),
_cyclical_encode(pd.Series(time_index.weekday, index=df.index), 7, "weekday"),
_cyclical_encode(pd.Series(time_index.month - 1, index=df.index), 12, "month"),
]
for cyc in cyclical:
df = df.join(cyc)
# Only enforce warmup/history constraints for price lags.
# Other feature columns can remain NaN when source data is missing.
required_for_row = ["day_ahead_price", *[f"lagged_price_{lag}" for lag in range(1, max_lag + 1)]]
return df.dropna(subset=required_for_row).sort_index()

View File

@@ -0,0 +1,146 @@
from __future__ import annotations
from datetime import datetime, timezone
import pandas as pd
from entsoe import EntsoePandasClient
from sqlalchemy import text
from sqlalchemy.engine import Engine
from .entsoe_api import EntsoeDataService
from .features import build_feature_frame
def _safe_float(value) -> float | None:
if pd.isna(value):
return None
return float(value)
def _validate_inputs_have_signal(inputs: pd.DataFrame, country_code: str) -> None:
required = ["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"]
non_null_counts = {col: int(inputs[col].notna().sum()) for col in required if col in inputs.columns}
if non_null_counts and all(count == 0 for count in non_null_counts.values()):
raise ValueError(
"No ENTSO-E data available for "
f"'{country_code}' in the requested time window. "
"Try another bidding zone/time range, and ensure the API key has access."
)
def persist_feature_frame(engine: Engine, country_code: str, feature_frame: pd.DataFrame) -> None:
# Feature-store schema expects core numeric fields to be non-null.
# Keep NaNs in the returned DataFrame, but skip incomplete rows on DB write.
persistable = feature_frame.dropna(
subset=["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast", "residual_load"]
)
rows = []
for ts, row in persistable.iterrows():
lag_price = [float(row[f"lagged_price_{lag}"]) for lag in range(1, 25)]
lag_residual = [float(row[f"lagged_residual_load_{lag}"]) for lag in range(1, 25)]
rows.append(
{
"country_code": country_code,
"delivery_start": ts.to_pydatetime(),
"day_ahead_price": float(row["day_ahead_price"]),
"load_forecast": _safe_float(row["load_forecast"]),
"wind_forecast": _safe_float(row["wind_forecast"]),
"solar_forecast": _safe_float(row["solar_forecast"]),
"residual_load": _safe_float(row["residual_load"]),
"lagged_price": lag_price,
"lagged_residual_load": lag_residual,
"hour_of_day_sin": float(row["hour_of_day_sin"]),
"hour_of_day_cos": float(row["hour_of_day_cos"]),
"weekday_sin": float(row["weekday_sin"]),
"weekday_cos": float(row["weekday_cos"]),
"month_sin": float(row["month_sin"]),
"month_cos": float(row["month_cos"]),
"feature_version": "v1",
"created_at": datetime.now(timezone.utc),
}
)
if not rows:
return
with engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO electricity_price_features (
country_code,
delivery_start,
day_ahead_price,
load_forecast,
wind_forecast,
solar_forecast,
residual_load,
lagged_price,
lagged_residual_load,
hour_of_day_sin,
hour_of_day_cos,
weekday_sin,
weekday_cos,
month_sin,
month_cos,
feature_version,
created_at
) VALUES (
:country_code,
:delivery_start,
:day_ahead_price,
:load_forecast,
:wind_forecast,
:solar_forecast,
:residual_load,
:lagged_price,
:lagged_residual_load,
:hour_of_day_sin,
:hour_of_day_cos,
:weekday_sin,
:weekday_cos,
:month_sin,
:month_cos,
:feature_version,
:created_at
)
ON CONFLICT (country_code, delivery_start, feature_version) DO UPDATE
SET day_ahead_price = EXCLUDED.day_ahead_price,
load_forecast = EXCLUDED.load_forecast,
wind_forecast = EXCLUDED.wind_forecast,
solar_forecast = EXCLUDED.solar_forecast,
residual_load = EXCLUDED.residual_load,
lagged_price = EXCLUDED.lagged_price,
lagged_residual_load = EXCLUDED.lagged_residual_load,
hour_of_day_sin = EXCLUDED.hour_of_day_sin,
hour_of_day_cos = EXCLUDED.hour_of_day_cos,
weekday_sin = EXCLUDED.weekday_sin,
weekday_cos = EXCLUDED.weekday_cos,
month_sin = EXCLUDED.month_sin,
month_cos = EXCLUDED.month_cos,
created_at = EXCLUDED.created_at
"""
),
rows,
)
def run_feature_pipeline(
engine: Engine,
entsoe_api_key: str,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
cache_ttl_hours: int = 24,
) -> pd.DataFrame:
client = EntsoePandasClient(api_key=entsoe_api_key)
service = EntsoeDataService(client=client, engine=engine, cache_ttl_hours=cache_ttl_hours)
resolved_country_code = service.resolve_country_code(country_code)
inputs = service.fetch_inputs(country_code=resolved_country_code, start=start, end=end)
_validate_inputs_have_signal(inputs, resolved_country_code)
service.upsert_raw_data(country_code=resolved_country_code, frame=inputs)
features = build_feature_frame(inputs)
persist_feature_frame(engine, country_code=resolved_country_code, feature_frame=features)
return features

View File

@@ -3,10 +3,13 @@ import pandas as pd
import yfinance as yf
from db_connect import db_engine
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import MetaData, Table
# --- CONFIG ---
TICKERS = ["UBS", "^GSPC"]
DAYS_BACK = 21 # ~3 weeks
DAYS_BACK = 31 # ~3 weeks
TABLE_NAME = "prices"
def fetch_data(tickers, start_date, end_date):
@@ -20,6 +23,11 @@ def fetch_data(tickers, start_date, end_date):
)
return data
def get_asset_map(engine):
query = "SELECT id, ticker FROM assets"
df = pd.read_sql(query, engine)
return dict(zip(df["ticker"], df["id"]))
def transform_data(raw_data):
frames = []
@@ -42,17 +50,37 @@ def transform_data(raw_data):
df["return"] = df["close"].pct_change()
frames.append(df)
return pd.concat(frames, ignore_index=True)
def load_to_postgres(df, engine):
df.to_sql(
TABLE_NAME,
engine,
if_exists="append",
index=False
)
asset_map = get_asset_map(engine)
df["asset_id"] = df["ticker"].map(asset_map)
df = df.drop(columns=["ticker"])
metadata = MetaData()
prices = Table(TABLE_NAME, metadata, autoload_with=engine)
with engine.begin() as conn:
for _, row in df.iterrows():
stmt = insert(prices).values({
"asset_id": row["asset_id"],
"date": row["date"],
"close": row["close"],
"volume": row["volume"],
"return": row["return"]
})
stmt = stmt.on_conflict_do_update(
index_elements=["asset_id", "date"],
set_={
"close": stmt.excluded.close,
"volume": stmt.excluded.volume,
"return": stmt.excluded["return"], # important change
}
)
conn.execute(stmt)
def main():