2 Commits

Author SHA1 Message Date
0174a9b38c Update README.md
Some checks failed
C++ CI / build (push) Has been cancelled
Add references to further analysis
2026-04-02 23:02:39 +00:00
edda985fc1 Update README.md
Some checks failed
C++ CI / build (push) Has been cancelled
Add a precise project description
2026-04-02 15:50:18 +00:00
17 changed files with 114 additions and 1835 deletions

7
.gitignore vendored
View File

@@ -1,10 +1,3 @@
.venv/
__pycache__/
*.py[cod]
*.pyo
.ipynb_checkpoints/
# Built Python extension dropped next to qengine/__init__.py for local dev
/qengine/*.so
/qengine/*.dylib

116
README.md
View File

@@ -1,6 +1,100 @@
# option_pricing
# Option Pricing Engine with Market Data Pipeline
## 📌 Project Description
C++/Python quantitative finance engine for option pricing, implied-volatility analysis, and market-data ingestion.
This repository implements a **production-style quantitative valuation pipeline** for equity options, combining high-performance pricing models with a full data and calibration workflow.
The system goes beyond a standalone pricer: it integrates **market data ingestion, structured storage, numerical pricing, and volatility surface calibration** into a single reproducible framework.
### The goal of this project
The goal of this project is to serve as a **modular foundation for quantitative modeling and experimentation** in option pricing and financial time series.
Rather than implementing a single model, the system is designed to support:
- benchmarking different pricing approaches (analytical, simulation-based, and data-driven),
- comparing numerical methods under realistic market data conditions,
- and extending toward more advanced workflows such as statistical learning and model calibration.
A key objective is to create an environment where **new ideas from research can be implemented, tested, and evaluated within a consistent pipeline**, rather than in isolated scripts or notebooks.
This includes:
- integrating alternative pricing methodologies into a shared framework,
- analyzing model behavior across time and market regimes,
- and building reproducible pipelines for both numerical and data-driven approaches.
Ultimately, the project aims to bridge:
- **theoretical models** (e.g. stochastic processes, volatility parameterizations),
- **numerical methods** (simulation, calibration),
- and **data-driven techniques** (time-series analysis, machine learning),
within a single, extensible system. Moving closer to a production-grade pipeline.
### What the system does
The system supports the following workflow:
- Ingest listed option market data (Yahoo Finance)
- Normalize and store it in a relational database (PostgreSQL)
- Compute implied volatilities from observed prices
- Calibrate parametric volatility surfaces (SVI)
- Run pricing models (Black-Scholes, Monte Carlo)
- Expose fast pricing routines via Python for analysis and research
---
This project aims to **unify these components into a coherent system**, with clear interfaces between:
- **Data layer** (ingestion, storage, schema)
- **Model layer** (C++ pricing engines)
- **Analytics layer** (Python calibration and diagnostics)
- **Execution layer** (reproducible pipelines)
---
### Technology choices
The architecture deliberately combines multiple technologies, each chosen for a specific role:
- **C++ (C++20)**
Used for performance-critical pricing components (Monte Carlo, closed-form models) and clean domain modeling.
- **Python**
Used for orchestration, data processing, calibration (SVI), and rapid experimentation.
- **pybind11**
Bridges C++ and Python, enabling high-performance models to be used in flexible workflows.
- **PostgreSQL + SQLAlchemy**
Provides structured, queryable storage for market data and supports reproducible calibration pipelines.
---
### Key challenges addressed
This project tackles several non-trivial challenges:
- **Bridging performance and usability**
Integrating a C++ pricing engine into a Python-driven research pipeline.
- **Data consistency and reproducibility**
Designing a schema and ingestion process that supports reliable downstream calibration.
- **Implied volatility inversion and calibration**
Implementing stable numerical inversion and robust SVI fitting under noisy market data.
- **System design over isolated models**
Ensuring that data, models, and workflows interact cleanly as a unified system.
---
### Future directions
Planned improvements focus on moving further toward production-grade systems:
- Arbitrage-free implied volatility surface construction
- More robust calibration and smoothing techniques
- Performance optimization (parallel Monte Carlo, batching)
- Extension to additional data sources and APIs
- Improved testing of end-to-end data and calibration pipelines
- comparing classical stochastic models vs data-driven approaches for pricing or volatility forecasting
## What is included
@@ -63,17 +157,19 @@ python src/data/ingestion/ingest_yahoo_options.py
`PIPELINE_SYMBOLS` in `.env` controls which symbols are ingested (comma-separated, e.g. `SPY,AAPL,QQQ`).
## Security and publication notes
- No credentials are stored in source code.
- `.env` files are git-ignored; only `.env.example` is committed.
- Before publishing, rotate any credentials that were ever committed in the past.
- Prefer least-privilege DB users for runtime ingestion jobs.
## Generating C++ API docs
```bash
cmake --build build --target docs
```
## 📚 Further Analysis
Generated output goes to `docs/html/` and is ignored in version control.
A more detailed discussion of numerial stability, implied volatility inversion, and calibration challenges is available here
👉 [Project blog](https://notes.ddoebel.de/public-folder/Option-Pricing-Engine)
This includes deeper analysis of:
- implied volatility instability from raw market data
- calibration challenges under noisy inputs
- numerical experiments and diagnostics
(see in particular [Observations and further analysis](https://notes.ddoebel.de/public-folder/Option-Pricing-Engine#-observations-and-further-analysis))

View File

@@ -1,164 +0,0 @@
# Electricity Price Predictor
Standalone module for ENTSO-E ingestion and feature-store creation in `quant_db`.
## Documentation
- High-level architecture: `docs/architecture.md`
- Detailed developer documentation (file-by-file + UML): `docs/developer_guide.md`
## What it builds
- Input columns:
- `day_ahead_price`
- `load_forecast`
- `wind_forecast`
- `solar_forecast`
- Derived columns:
- `residual_load`
- `lagged_price` (`t-1` ... `t-24`, stored as array length 24)
- `lagged_residual_load` (`t-1` ... `t-24`, stored as array length 24)
- `hour_of_day_sin`, `hour_of_day_cos`
- `weekday_sin`, `weekday_cos`
- `month_sin`, `month_cos`
## Column units (data dictionary)
All timestamps are hourly and stored in UTC (`delivery_start`).
- `day_ahead_price`
- Unit: `EUR/MWh` (euros per megawatt-hour).
- `load_forecast`
- Unit: `MW` (megawatts).
- `wind_forecast`
- Unit: `MW` (megawatts).
- `solar_forecast`
- Unit: `MW` (megawatts).
- `residual_load = load_forecast - wind_forecast - solar_forecast`
- Unit: `MW` (megawatts).
- `lagged_price` (array of 24 values for `t-1..t-24`)
- Unit: `EUR/MWh`.
- `lagged_residual_load` (array of 24 values for `t-1..t-24`)
- Unit: `MW`.
- `hour_of_day_sin`, `hour_of_day_cos`
- Unit: dimensionless in `[-1, 1]`.
- `weekday_sin`, `weekday_cos`
- Unit: dimensionless in `[-1, 1]`.
- `month_sin`, `month_cos`
- Unit: dimensionless in `[-1, 1]`.
## Missing-data semantics
The pipeline intentionally distinguishes **missing** from **measured zero**:
- Forecast columns (`load_forecast`, `wind_forecast`, `solar_forecast`) remain `NaN` when ENTSO-E has no value.
- `residual_load` is computed directly from source columns and remains `NaN` when any source component is missing.
- Feature engineering drops rows only for lag warmup requirements (`day_ahead_price` and `lagged_price_1..24`), not for every nullable forecast column.
- During DB persistence to `electricity_price_features`, rows with nulls in NOT NULL core columns are skipped (to satisfy schema constraints) while still being available in the returned in-memory DataFrame.
## Data contracts
### In-memory contract (`run_feature_pipeline(...)` return value)
- Index:
- Type: timezone-aware `DatetimeIndex`
- Granularity: hourly
- Timezone: UTC
- Uniqueness: unique timestamps expected
- Columns:
- Base signals: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
- Derived: `residual_load`
- Lag vectors (expanded): `lagged_price_1..24`, `lagged_residual_load_1..24`
- Cyclical: `hour_of_day_sin`, `hour_of_day_cos`, `weekday_sin`, `weekday_cos`, `month_sin`, `month_cos`
- Nullability:
- `day_ahead_price`: required for returned rows.
- `lagged_price_1..24`: required for returned rows.
- `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`: nullable (`NaN` allowed).
### Persistence contract (`electricity_price_features`)
- Persisted row key:
- (`country_code`, `delivery_start`, `feature_version`)
- NOT NULL core columns in schema:
- `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`
- `lagged_price`, `lagged_residual_load`
- cyclical columns (`hour_of_day_*`, `weekday_*`, `month_*`)
- Write behavior:
- The persistence layer filters out non-conforming rows (nulls in NOT NULL core columns) before UPSERT.
- Persisted lag arrays are fixed length 24 and map to `t-1..t-24`.
### Raw observations contract (`electricity_market_observations`)
- Indexing/key semantics:
- one row per (`country_code`, `delivery_start`)
- Update semantics:
- partial refreshes do not overwrite existing non-null values with nulls (`COALESCE` merge policy)
## Country-code and API behavior notes
- Use ENTSO-E bidding-zone identifiers (for example `DE_LU` rather than `DE`) when querying across all endpoints.
- The pipeline includes a bidding-zone resolver for common country aliases:
- `DE -> DE_LU`
- `IT -> IT_NORD`
- Persistence uses the resolved bidding-zone code so DB keys match the queried market zone.
- Some ENTSO-E endpoints may return no matches for specific windows/countries. These are handled as empty hourly frames for that endpoint rather than hard-failing the whole fetch.
- Wind/solar responses can include duplicate semantic columns after normalization; the service coalesces duplicates by taking the first non-null value per timestamp.
## Database objects
`sql/001_electricity_price_schema.sql` creates:
- `entsoe_api_cache`: generic decorator cache table (pickled payloads, TTL support)
- `electricity_market_observations`: raw hourly ENTSO-E observations
- `electricity_price_features`: model-ready feature store
## Setup
```bash
cd electricity_price_predictor
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Set env vars:
```bash
export ENTSOE_API_KEY="your_entsoe_key"
export QUANT_DB_HOST="localhost"
export QUANT_DB_PORT="5432"
export QUANT_DB_NAME="quant_db"
export QUANT_DB_USER="quant_user"
export QUANT_DB_PASSWORD="strong_password"
```
## Initialize schema
```bash
PYTHONPATH=src python scripts/init_db.py
```
## Build feature store
```bash
PYTHONPATH=src python scripts/build_feature_store.py \
--country-code DE_LU \
--start 2026-01-01T00:00:00Z \
--end 2026-02-01T00:00:00Z \
--cache-ttl-hours 24
```
## Decorator caching behavior
The `cache_to_db` decorator:
- hashes function name + arguments into deterministic `cache_key`
- checks `entsoe_api_cache` first
- returns cached payload if key exists and not expired
- otherwise uses `electricity_market_observations` as secondary cache at timestamp level
- only calls ENTSO-E for missing hourly intervals, then upserts those rows
- stores final returned object in `entsoe_api_cache`
This gives a two-layer cache:
1) fast function-result cache (`entsoe_api_cache`) and
2) canonical timestamp cache (`electricity_market_observations`).

View File

@@ -1,73 +0,0 @@
# Architecture Notes
This document is the quick architecture map. For full file-by-file implementation details, see `docs/developer_guide.md`.
## End-to-end data flow
1. `scripts/build_feature_store.py` parses CLI arguments and validates env vars.
2. It calls `pipeline.run_feature_pipeline(...)`.
3. `EntsoeDataService.fetch_inputs(...)` loads:
- `day_ahead_price`
- `load_forecast`
- `wind_forecast`
- `solar_forecast`
4. Each ENTSO-E call is wrapped by `cache_to_db(...)` and either:
- serves a hit from `entsoe_api_cache`, or
- falls back to `electricity_market_observations` for already-known timestamps,
- and performs API calls only for missing hourly intervals.
5. Missing intervals returned from API are upserted into `electricity_market_observations`.
6. The final merged result is cached in `entsoe_api_cache`.
7. Raw merged series are upserted to `electricity_market_observations`.
8. `features.build_feature_frame(...)` computes:
- `residual_load`
- lagged arrays (24 values each)
- cyclical encodings for hour/weekday/month
- preserves `NaN` for missing forecast-derived values.
9. `pipeline.persist_feature_frame(...)` upserts model-ready rows to `electricity_price_features`.
- filters out rows that violate feature-table NOT NULL constraints.
## Process diagram
```mermaid
flowchart TD
A[build_feature_store.py CLI] --> B[run_feature_pipeline]
B --> C[EntsoeDataService.fetch_inputs]
C --> D{Hit in entsoe_api_cache?}
D -->|Yes| E[Load payload from entsoe_api_cache]
D -->|No| F[Read electricity_market_observations]
F --> G{Missing hourly timestamps?}
G -->|No| H[Reuse DB observation rows]
G -->|Yes| I[Call ENTSO-E only for missing ranges]
I --> I2{NoMatchingDataError?}
I2 -->|Yes| I3[Use empty hourly frame for endpoint]
I2 -->|No| I4[Normalize payload]
I4 --> I5[Coalesce duplicate columns by first non-null]
I3 --> J[Upsert missing rows to electricity_market_observations]
I5 --> J
H --> K[Build merged input DataFrame]
J --> K
K --> L[Store payload in entsoe_api_cache]
E --> M[Use cached input DataFrame]
L --> N[Upsert electricity_market_observations]
M --> N
N --> O[build_feature_frame]
O --> P[Create lags + cyclical features]
P --> P2[Preserve NaN in forecast-derived columns]
P2 --> P3[Drop rows missing day_ahead_price or lagged_price_1..24]
P3 --> Q[Upsert persistable subset into electricity_price_features]
```
## Key design reasons
- DB cache avoids repeated ENTSO-E calls during iterative model work.
- Observation-table fallback avoids re-fetching timestamps already persisted once.
- Pickled payloads preserve exact pandas object shape and index information.
- Feature table stores fixed-size lag arrays so one row corresponds to one prediction timestamp.
- Missing forecasts are kept as `NaN` in analysis outputs, avoiding misleading zero-imputation.
- Persistence layer enforces schema compatibility by skipping rows with nulls in NOT NULL feature columns.
## Extension points
- Add label/target tables (`t+1`, `t+24`, etc.).
- Add training metadata + model registry tables.
- Add partitioning strategy for multi-year production-scale data.

View File

@@ -1,362 +0,0 @@
# Developer Guide (Deep Dive)
This guide explains each file in the module, execution order, control flow, and data/state transitions so you can reason about behavior without reading source code.
## 1) Directory map and responsibilities
### Top-level
- `requirements.txt`
- Python dependencies for ingestion and DB persistence.
- `README.md`
- Operator-focused setup and run commands.
- `sql/001_electricity_price_schema.sql`
- DDL for cache, raw observations, and feature store.
- `scripts/init_db.py`
- Applies the SQL schema to `quant_db`.
- `scripts/build_feature_store.py`
- CLI entrypoint for data fetch + feature persistence.
- `docs/architecture.md`
- High-level architecture summary.
- `docs/developer_guide.md`
- This detailed developer-facing explanation.
### Python package (`src/electricity_price_predictor`)
- `__init__.py`
- Public package exports (`get_engine`, `EntsoeDataService`, `build_feature_frame`).
- `db.py`
- Builds DB URL from env vars and creates SQLAlchemy `Engine`.
- `cache.py`
- Implements decorator-based DB cache with deterministic keying.
- `entsoe_api.py`
- Wraps ENTSO-E API calls, normalizes data, and writes raw observations.
- `features.py`
- Pure feature engineering logic (residual load, lags, cyclical encoding).
- `pipeline.py`
- Orchestration layer for end-to-end fetch -> raw persist -> feature build -> feature persist.
## 2) Runtime execution path (step-by-step)
When you run:
```bash
PYTHONPATH=src python3 scripts/build_feature_store.py --country-code ... --start ... --end ...
```
Execution sequence:
1. **Argument parsing**
- `build_feature_store.py` reads country code/time range/TTL.
2. **Credential/connection bootstrap**
- checks `ENTSOE_API_KEY`.
- calls `get_engine()` from `db.py`.
3. **Pipeline orchestration**
- `run_feature_pipeline(...)` in `pipeline.py` starts.
4. **API service creation**
- initializes `EntsoePandasClient`.
- creates `EntsoeDataService(client, engine, cache_ttl_hours)`.
5. **Decorator wrapping**
- in `EntsoeDataService.__post_init__`, API methods are wrapped by `cache_to_db(...)`.
6. **Data retrieval**
- `fetch_inputs(...)` calls:
- `get_day_ahead_prices(...)`
- `get_load_forecast(...)`
- `get_wind_solar_forecast(...)`
- country aliases are normalized to bidding zones before queries (currently `DE -> DE_LU`, `IT -> IT_NORD`).
7. **Cache check/compute loop (per call)**
- decorator computes hash key from function + args.
- if non-expired row exists in `entsoe_api_cache`: returns payload.
- else: reads `electricity_market_observations` for requested timestamps.
- if timestamps are missing there, only missing hourly ranges are requested from ENTSO-E.
- `NoMatchingDataError` from ENTSO-E is converted to an empty hourly frame for that endpoint/range.
- normalized responses coalesce duplicate semantic columns (for example multiple wind/solar columns) via first non-null-per-row.
- missing rows are upserted into `electricity_market_observations`.
- final merged dataset is stored in `entsoe_api_cache` and returned.
8. **Raw persistence**
- merged inputs are upserted to `electricity_market_observations`.
9. **Feature engineering**
- `build_feature_frame(...)` computes:
- `residual_load = load - wind - solar`
- `lagged_price_1..24`
- `lagged_residual_load_1..24`
- `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
- preserves source missingness as `NaN` (no 0.0 imputation).
- drops rows only when `day_ahead_price` / `lagged_price_1..24` are missing (lag warmup requirement).
10. **Feature-store persistence**
- lags are materialized into PostgreSQL arrays (`DOUBLE PRECISION[]`, length 24).
- rows violating NOT NULL core feature constraints are filtered out before upsert.
- persistable rows are upserted to `electricity_price_features`.
11. **CLI completion**
- prints persisted row count.
## 3) UML diagrams
## 3.1 Component diagram
```mermaid
flowchart LR
CLI[scripts/build_feature_store.py] --> PIPE[pipeline.run_feature_pipeline]
PIPE --> DBMOD[db.get_engine]
PIPE --> SERVICE[EntsoeDataService]
SERVICE --> CACHEDEC[cache_to_db decorator]
SERVICE --> ENTSOE[EntsoePandasClient]
SERVICE --> SECONDARY[electricity_market_observations secondary cache]
PIPE --> FEAT[features.build_feature_frame NaN-preserving]
FEAT --> PERSIST[pipeline.persist_feature_frame null-filtered]
CACHEDEC --> DB[(quant_db.entsoe_api_cache)]
SECONDARY --> RAW[(quant_db.electricity_market_observations)]
PERSIST --> STORE[(quant_db.electricity_price_features)]
```
## 3.2 Class diagram (logical)
```mermaid
classDiagram
class EntsoeDataService {
+client: EntsoePandasClient
+engine: Engine
+cache_ttl_hours: Optional[int]
+fetch_inputs(country_code, start, end) DataFrame
+upsert_raw_data(country_code, frame) None
-_get_day_ahead_prices_impl(country_code, start, end) Series
-_get_load_forecast_impl(country_code, start, end) Series
-_get_wind_solar_forecast_impl(country_code, start, end) DataFrame
}
class CacheDecorator {
+cache_to_db(engine, namespace, ttl_hours) decorator
-_build_cache_key(function_name, args, kwargs) str
}
class FeatureBuilder {
+build_feature_frame(inputs, max_lag=24) DataFrame
-_cyclical_encode(values, period, prefix) DataFrame
}
class Pipeline {
+run_feature_pipeline(engine, entsoe_api_key, country_code, start, end, cache_ttl_hours) DataFrame
+persist_feature_frame(engine, country_code, feature_frame) None
}
Pipeline --> EntsoeDataService : uses
Pipeline --> FeatureBuilder : uses
EntsoeDataService --> CacheDecorator : wraps methods
```
## 3.3 Sequence diagram (single API method with cache)
```mermaid
sequenceDiagram
participant Caller as fetch_inputs()
participant Decorator as cache_to_db wrapper
participant CacheTable as entsoe_api_cache (L1)
participant ObsTable as electricity_market_observations (L2)
participant API as ENTSO-E API
Caller->>Decorator: get_day_ahead_prices(country, start, end)
Decorator->>CacheTable: SELECT by cache_key and expires_at
alt L1 cache hit
CacheTable-->>Decorator: payload
Decorator-->>Caller: unpickled pandas object
else L1 cache miss/expired
Decorator->>ObsTable: SELECT existing timestamps
alt L2 fully covers range
ObsTable-->>Decorator: pandas-compatible rows
else L2 has gaps
Decorator->>API: query only missing ranges
alt API returns data
API-->>Decorator: missing rows
Decorator->>Decorator: normalize columns + coalesce duplicates
Decorator->>ObsTable: UPSERT missing rows
else NoMatchingDataError
Decorator->>Decorator: synthesize empty hourly frame
end
end
Decorator->>CacheTable: INSERT/UPSERT merged payload
Decorator-->>Caller: fresh result
end
```
## 3.4 State diagram (cache entry lifecycle)
```mermaid
stateDiagram-v2
[*] --> L1Missing
L1Missing --> L2Check: cache miss/expiry
L2Check --> Fresh: observation table fully covers range
L2Check --> Partial: observation table has gaps
Partial --> Fresh: fetch missing ranges, upsert L2, upsert L1
Fresh --> Fresh: reused before expiry
Fresh --> Expired: TTL passes for L1 entry
Expired --> L2Check: next call
Fresh --> Overwritten: Same key, new payload upsert
Overwritten --> Fresh
```
## 3.5 ER diagram (database schema)
```mermaid
erDiagram
entsoe_api_cache {
text cache_key PK
text namespace
text function_name
jsonb args_json
bytea payload
timestamptz created_at
timestamptz expires_at
}
electricity_market_observations {
text country_code PK
timestamptz delivery_start PK
float day_ahead_price
float load_forecast
float wind_forecast
float solar_forecast
timestamptz ingested_at
}
electricity_price_features {
text country_code PK
timestamptz delivery_start PK
text feature_version PK
float day_ahead_price
float load_forecast
float wind_forecast
float solar_forecast
float residual_load
float[] lagged_price
float[] lagged_residual_load
float hour_of_day_sin
float hour_of_day_cos
float weekday_sin
float weekday_cos
float month_sin
float month_cos
timestamptz created_at
}
```
## 4) How files collaborate
## 4.1 `db.py` + scripts
- Scripts never hardcode DB URI; they call `get_engine()`.
- `get_engine()` centralizes environment-driven connectivity.
## 4.2 `cache.py` + `entsoe_api.py`
- `cache_to_db()` is generic and independent of ENTSO-E specifics.
- `EntsoeDataService.__post_init__` binds that generic decorator to each API-fetch method.
- Result: all expensive API calls automatically become cache-aware without changing call sites.
## 4.3 `entsoe_api.py` + `features.py`
- `entsoe_api.py` guarantees normalized timestamp index and expected source columns.
- `features.py` assumes these columns and transforms them to model features only (no DB side effects).
## 4.4 `features.py` + `pipeline.py`
- `build_feature_frame()` returns wide DataFrame with `lagged_*_1..24`.
- `persist_feature_frame()` converts those to PostgreSQL arrays so table rows stay compact and versioned.
## 5) Important implementation details
- **Cache keys are deterministic**
- Built from JSON of function name + args + kwargs with stable sorting.
- **Cache payload type**
- `pickle` stored in `BYTEA` to preserve pandas objects.
- **TTL logic**
- `expires_at IS NULL` means never expires.
- Otherwise must be greater than current UTC time to be considered valid.
- **Two-layer cache order**
- Layer 1: `entsoe_api_cache` (function-result cache).
- Layer 2: `electricity_market_observations` (timestamp-level raw cache).
- API calls happen only for Layer-2 gaps.
- **Upsert strategy**
- Raw and feature tables use `ON CONFLICT ... DO UPDATE` for idempotent reruns.
- Raw upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting previously stored values during partial refreshes.
- Feature upsert operates on a filtered persistable subset where core NOT NULL columns are present.
- **Missingness semantics**
- Forecast and derived residual columns preserve `NaN` in memory.
- No zero-imputation is performed for missing forecast values.
- **Bidding-zone normalization**
- `resolve_bidding_zone_code(...)` maps common country aliases to ENTSO-E zone codes.
- Pipeline persistence uses the resolved code, ensuring DB keys match actual queried zones.
- **Timezone handling**
- API index is normalized to UTC to avoid DST ambiguity in lag features.
- **Feature warmup**
- Rows missing `day_ahead_price` or any `lagged_price_1..24` are dropped because lag history is incomplete.
## 6) Failure modes and expected behavior
- Missing `ENTSOE_API_KEY` -> CLI raises early runtime error.
- Missing required input columns -> feature builder raises `ValueError`.
- Duplicate normalized columns from ENTSO-E payloads -> coalesced before reindexing to avoid pandas duplicate-label reindex errors.
- ENTSO-E no-data responses for an endpoint/range -> transformed to empty hourly frames and merged safely.
- Empty data frame -> raw/feature persistence functions no-op safely.
- Repeated identical request -> cache hit (no API roundtrip).
- Expired L1 cache row + full L2 coverage -> no API call required.
- Expired L1 cache row + partial L2 coverage -> API called only for missing ranges.
## 7) Data contracts
### 7.1 In-memory features contract
Producer: `run_feature_pipeline(...)` return value (`pd.DataFrame`).
- **Index contract**
- hourly UTC `DatetimeIndex`, sorted ascending.
- unique timestamps expected after deduplication.
- **Column contract**
- base: `day_ahead_price`, `load_forecast`, `wind_forecast`, `solar_forecast`
- derived: `residual_load`
- lag columns: `lagged_price_1..24`, `lagged_residual_load_1..24`
- cyclical: `hour_of_day_sin/cos`, `weekday_sin/cos`, `month_sin/cos`
- **Nullability contract**
- required non-null in returned rows: `day_ahead_price`, `lagged_price_1..24`
- nullable: `load_forecast`, `wind_forecast`, `solar_forecast`, `residual_load`, and `lagged_residual_load_*`
- rationale: preserve upstream missingness semantics for analysis and QC.
### 7.2 Feature-store persistence contract
Consumer: `electricity_price_features` table.
- **Primary key contract**
- (`country_code`, `delivery_start`, `feature_version`)
- **Schema constraint contract**
- core numeric columns are `NOT NULL`.
- lag arrays are `DOUBLE PRECISION[]` and expected length 24.
- **Write-time contract**
- `persist_feature_frame(...)` filters rows that violate NOT NULL core columns before UPSERT.
- retained rows are idempotently upserted via `ON CONFLICT ... DO UPDATE`.
### 7.3 Raw-observation contract
Consumer: `electricity_market_observations` table.
- **Primary key contract**
- (`country_code`, `delivery_start`)
- **Merge contract**
- upsert uses `COALESCE(EXCLUDED.col, existing.col)` to avoid null-overwriting prior known values.
- **Coverage contract**
- secondary cache guarantees fetched payloads are aligned to expected hourly index for the requested `[start, end)` range.
## 8) Practical debugging checklist
1. Run `scripts/init_db.py` and ensure tables exist.
2. Run one short-range fetch window (1-2 days) first.
3. Verify cache growth:
- `SELECT namespace, function_name, COUNT(*) FROM entsoe_api_cache GROUP BY 1,2;`
4. Verify raw persistence:
- `SELECT COUNT(*) FROM electricity_market_observations WHERE country_code = '...';`
5. Verify feature persistence:
- check lag array sizes are 24 and row count is lower than raw by about 24.
## 9) Suggested next developer docs to add
- Data quality rules (acceptable missingness, clipping policy, anomaly handling).
- Training-set contract (target definition, split strategy, leakage constraints).
- Backfill/replay policy for reprocessing historical periods.

View File

@@ -1,4 +0,0 @@
entsoe-py
pandas
sqlalchemy
psycopg2-binary

View File

@@ -1,41 +0,0 @@
import argparse
import os
import pandas as pd
from electricity_price_predictor.db import get_engine
from electricity_price_predictor.pipeline import run_feature_pipeline
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Fetch ENTSO-E inputs and build feature store.")
parser.add_argument("--country-code", required=True, help="ENTSO-E bidding zone code, e.g. DE_LU")
parser.add_argument("--start", required=True, help="Inclusive start datetime, e.g. 2026-01-01T00:00:00Z")
parser.add_argument("--end", required=True, help="Exclusive end datetime, e.g. 2026-02-01T00:00:00Z")
parser.add_argument("--cache-ttl-hours", type=int, default=24, help="Decorator cache TTL in hours")
return parser.parse_args()
def main() -> None:
args = parse_args()
api_key = os.getenv("ENTSOE_API_KEY")
if not api_key:
raise RuntimeError("ENTSOE_API_KEY environment variable is required.")
engine = get_engine()
start = pd.Timestamp(args.start, tz="UTC")
end = pd.Timestamp(args.end, tz="UTC")
features = run_feature_pipeline(
engine=engine,
entsoe_api_key=api_key,
country_code=args.country_code,
start=start,
end=end,
cache_ttl_hours=args.cache_ttl_hours,
)
print(f"Persisted {len(features)} feature rows for {args.country_code}.")
if __name__ == "__main__":
main()

View File

@@ -1,23 +0,0 @@
from pathlib import Path
from sqlalchemy import text
from electricity_price_predictor.db import get_engine
def main() -> None:
engine = get_engine()
schema_path = Path(__file__).resolve().parents[1] / "sql" / "001_electricity_price_schema.sql"
sql = schema_path.read_text(encoding="utf-8")
with engine.begin() as conn:
for statement in sql.split(";"):
stmt = statement.strip()
if stmt:
conn.execute(text(stmt))
print("Schema initialized for electricity price predictor.")
if __name__ == "__main__":
main()

View File

@@ -1,55 +0,0 @@
CREATE TABLE IF NOT EXISTS entsoe_api_cache (
cache_key TEXT PRIMARY KEY,
namespace TEXT NOT NULL,
function_name TEXT NOT NULL,
args_json JSONB NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_namespace_fn
ON entsoe_api_cache(namespace, function_name);
CREATE INDEX IF NOT EXISTS idx_entsoe_api_cache_expires_at
ON entsoe_api_cache(expires_at);
CREATE TABLE IF NOT EXISTS electricity_market_observations (
country_code TEXT NOT NULL,
delivery_start TIMESTAMPTZ NOT NULL,
day_ahead_price DOUBLE PRECISION,
load_forecast DOUBLE PRECISION,
wind_forecast DOUBLE PRECISION,
solar_forecast DOUBLE PRECISION,
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (country_code, delivery_start)
);
CREATE INDEX IF NOT EXISTS idx_electricity_market_observations_delivery
ON electricity_market_observations(delivery_start);
CREATE TABLE IF NOT EXISTS electricity_price_features (
country_code TEXT NOT NULL,
delivery_start TIMESTAMPTZ NOT NULL,
day_ahead_price DOUBLE PRECISION NOT NULL,
load_forecast DOUBLE PRECISION NOT NULL,
wind_forecast DOUBLE PRECISION NOT NULL,
solar_forecast DOUBLE PRECISION NOT NULL,
residual_load DOUBLE PRECISION NOT NULL,
lagged_price DOUBLE PRECISION[] NOT NULL,
lagged_residual_load DOUBLE PRECISION[] NOT NULL,
hour_of_day_sin DOUBLE PRECISION NOT NULL,
hour_of_day_cos DOUBLE PRECISION NOT NULL,
weekday_sin DOUBLE PRECISION NOT NULL,
weekday_cos DOUBLE PRECISION NOT NULL,
month_sin DOUBLE PRECISION NOT NULL,
month_cos DOUBLE PRECISION NOT NULL,
feature_version TEXT NOT NULL DEFAULT 'v1',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (country_code, delivery_start, feature_version),
CONSTRAINT chk_lagged_price_len CHECK (CARDINALITY(lagged_price) = 24),
CONSTRAINT chk_lagged_residual_load_len CHECK (CARDINALITY(lagged_residual_load) = 24)
);
CREATE INDEX IF NOT EXISTS idx_electricity_price_features_delivery
ON electricity_price_features(delivery_start);

File diff suppressed because one or more lines are too long

View File

@@ -1,7 +0,0 @@
"""Electricity price forecasting data pipeline package."""
from .db import get_engine
from .entsoe_api import EntsoeDataService
from .features import build_feature_frame
__all__ = ["EntsoeDataService", "build_feature_frame", "get_engine"]

View File

@@ -1,112 +0,0 @@
import functools
import hashlib
import json
import pickle
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Optional
from sqlalchemy import text
from sqlalchemy.engine import Engine
def _json_fallback_serializer(value: Any) -> str:
"""Serializer for values that aren't directly JSON serializable."""
if hasattr(value, "isoformat"):
return value.isoformat()
return repr(value)
def _build_cache_key(function_name: str, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
payload = json.dumps(
{"function_name": function_name, "args": args, "kwargs": kwargs},
sort_keys=True,
default=_json_fallback_serializer,
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def cache_to_db(
engine: Engine,
namespace: str,
ttl_hours: Optional[int] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Cache function output in quant_db.entsoe_api_cache table.
Notes:
- TTL is optional. If omitted, cached values do not expire.
- Cached payload uses pickle so pandas objects can be restored losslessly.
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
cache_key = _build_cache_key(f"{namespace}.{func.__name__}", args, kwargs)
now = datetime.now(timezone.utc)
with engine.begin() as conn:
result = conn.execute(
text(
"""
SELECT payload
FROM entsoe_api_cache
WHERE cache_key = :cache_key
AND (expires_at IS NULL OR expires_at > :now_utc)
"""
),
{"cache_key": cache_key, "now_utc": now},
).fetchone()
if result:
return pickle.loads(result[0])
data = func(*args, **kwargs)
expires_at = None
if ttl_hours is not None:
expires_at = now + timedelta(hours=ttl_hours)
conn.execute(
text(
"""
INSERT INTO entsoe_api_cache (
cache_key,
namespace,
function_name,
args_json,
payload,
created_at,
expires_at
) VALUES (
:cache_key,
:namespace,
:function_name,
CAST(:args_json AS JSONB),
:payload,
:created_at,
:expires_at
)
ON CONFLICT (cache_key) DO UPDATE
SET payload = EXCLUDED.payload,
created_at = EXCLUDED.created_at,
expires_at = EXCLUDED.expires_at,
args_json = EXCLUDED.args_json
"""
),
{
"cache_key": cache_key,
"namespace": namespace,
"function_name": func.__name__,
"args_json": json.dumps(
{"args": args, "kwargs": kwargs},
default=_json_fallback_serializer,
),
"payload": pickle.dumps(data),
"created_at": now,
"expires_at": expires_at,
},
)
return data
return wrapper
return decorator

View File

@@ -1,24 +0,0 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
def get_database_url() -> str:
"""Build database URL from env or fallback defaults."""
explicit_url = os.getenv("QUANT_DB_URL")
if explicit_url:
return explicit_url
host = os.getenv("QUANT_DB_HOST", "localhost")
port = os.getenv("QUANT_DB_PORT", "5432")
database = os.getenv("QUANT_DB_NAME", "quant_db")
user = os.getenv("QUANT_DB_USER", "quant_user")
password = os.getenv("QUANT_DB_PASSWORD", "strong_password")
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
def get_engine(echo: bool = False) -> Engine:
"""Create SQLAlchemy engine for quant_db."""
return create_engine(get_database_url(), future=True, echo=echo)

View File

@@ -1,393 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from entsoe import EntsoePandasClient
from entsoe.exceptions import NoMatchingDataError
from sqlalchemy import text
from sqlalchemy.engine import Engine
from .cache import cache_to_db
OBSERVATION_COLUMNS = {
"day_ahead_price",
"load_forecast",
"wind_forecast",
"solar_forecast",
}
BIDDING_ZONE_ALIASES = {
# ENTSO-E often expects bidding-zone EIC aliases instead of plain country codes.
"DE": "DE_LU",
"IT": "IT_NORD",
}
def _as_utc_index(series_or_df: pd.Series | pd.DataFrame) -> pd.Series | pd.DataFrame:
if series_or_df.index.tz is None:
series_or_df.index = series_or_df.index.tz_localize("UTC")
else:
series_or_df.index = series_or_df.index.tz_convert("UTC")
return series_or_df.sort_index()
def _safe_float(value) -> Optional[float]:
if pd.isna(value):
return None
return float(value)
def resolve_bidding_zone_code(country_code: str) -> str:
code = str(country_code).strip().upper()
return BIDDING_ZONE_ALIASES.get(code, code)
def _coerce_single_column_frame(
data: pd.Series | pd.DataFrame,
target_column: str,
preferred_tokens: tuple[str, ...] = (),
) -> pd.DataFrame:
"""
Normalize ENTSO-E responses that can be either a Series or DataFrame.
"""
if isinstance(data, pd.Series):
series = _as_utc_index(data)
series.name = target_column
return series.to_frame()
frame = _as_utc_index(data.copy())
if target_column in frame.columns:
return frame[[target_column]]
if len(frame.columns) == 1:
return frame.rename(columns={frame.columns[0]: target_column})[[target_column]]
def _best_column(candidates: list) -> Optional[str]:
if not candidates:
return None
# Prefer the candidate with most available points.
return max(candidates, key=lambda col: int(frame[col].notna().sum()))
lowered = {col: str(col).lower() for col in frame.columns}
preferred_candidates = []
for token in preferred_tokens:
preferred_candidates.extend([col for col, col_lc in lowered.items() if token in col_lc])
preferred_col = _best_column(preferred_candidates)
if preferred_col is not None:
return frame.rename(columns={preferred_col: target_column})[[target_column]]
any_col = _best_column(list(frame.columns))
if any_col is not None:
return frame.rename(columns={any_col: target_column})[[target_column]]
first_col = frame.columns[0]
return frame.rename(columns={first_col: target_column})[[target_column]]
def _normalize_utc_bounds(start: pd.Timestamp, end: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]:
start_ts = pd.Timestamp(start)
end_ts = pd.Timestamp(end)
if start_ts.tz is None:
start_ts = start_ts.tz_localize("UTC")
else:
start_ts = start_ts.tz_convert("UTC")
if end_ts.tz is None:
end_ts = end_ts.tz_localize("UTC")
else:
end_ts = end_ts.tz_convert("UTC")
return start_ts, end_ts
def _empty_hourly_frame(
columns: list[str], start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
start_ts, end_ts = _normalize_utc_bounds(start, end)
idx = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
return pd.DataFrame(index=idx, columns=columns)
def _coalesce_duplicate_columns(frame: pd.DataFrame) -> pd.DataFrame:
"""
Collapse duplicate column labels by taking the first non-null per row.
"""
if frame.columns.is_unique:
return frame
merged: dict[str, pd.Series] = {}
for col in frame.columns.unique():
same_name = frame.loc[:, frame.columns == col]
if isinstance(same_name, pd.Series):
merged[str(col)] = same_name
else:
merged[str(col)] = same_name.bfill(axis=1).iloc[:, 0]
return pd.DataFrame(merged, index=frame.index)
def _missing_ranges(missing_index: pd.DatetimeIndex) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
if missing_index.empty:
return []
missing_index = missing_index.sort_values().unique()
ranges: list[tuple[pd.Timestamp, pd.Timestamp]] = []
current_start = missing_index[0]
prev = missing_index[0]
step = pd.Timedelta(hours=1)
for ts in missing_index[1:]:
if ts - prev != step:
ranges.append((current_start, prev + step))
current_start = ts
prev = ts
ranges.append((current_start, prev + step))
return ranges
@dataclass
class EntsoeDataService:
client: EntsoePandasClient
engine: Engine
cache_ttl_hours: Optional[int] = 24
def __post_init__(self) -> None:
self.get_day_ahead_prices = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_day_ahead_prices_impl)
self.get_load_forecast = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_load_forecast_impl)
self.get_wind_solar_forecast = cache_to_db(
self.engine, "entsoe", ttl_hours=self.cache_ttl_hours
)(self._get_wind_solar_forecast_impl)
def resolve_country_code(self, country_code: str) -> str:
return resolve_bidding_zone_code(country_code)
def _get_day_ahead_prices_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.Series:
df = self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["day_ahead_price"],
api_fetcher=self._query_day_ahead_prices,
)
return df["day_ahead_price"]
def _get_load_forecast_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.Series:
df = self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["load_forecast"],
api_fetcher=self._query_load_forecast,
)
return df["load_forecast"]
def _get_wind_solar_forecast_impl(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
return self._fetch_inputs_with_secondary_cache(
country_code=country_code,
start=start,
end=end,
required_columns=["wind_forecast", "solar_forecast"],
api_fetcher=self._query_wind_solar_forecast,
)
def _query_day_ahead_prices(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
raw = self.client.query_day_ahead_prices(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["day_ahead_price"], start, end)
return _coerce_single_column_frame(
raw,
target_column="day_ahead_price",
preferred_tokens=("price", "ahead"),
)
def _query_load_forecast(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
raw = self.client.query_load_forecast(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["load_forecast"], start, end)
return _coerce_single_column_frame(
raw,
target_column="load_forecast",
preferred_tokens=("load",),
)
def _query_wind_solar_forecast(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
try:
df = self.client.query_wind_and_solar_forecast(country_code, start=start, end=end)
except NoMatchingDataError:
return _empty_hourly_frame(["wind_forecast", "solar_forecast"], start, end)
df = _as_utc_index(df)
if isinstance(df, pd.Series):
df = df.to_frame()
renamed = {}
for column in df.columns:
lc = str(column).lower()
if "wind" in lc:
renamed[column] = "wind_forecast"
elif "solar" in lc:
renamed[column] = "solar_forecast"
df = df.rename(columns=renamed)
df = _coalesce_duplicate_columns(df)
if "wind_forecast" not in df.columns:
df["wind_forecast"] = None
if "solar_forecast" not in df.columns:
df["solar_forecast"] = None
return df[["wind_forecast", "solar_forecast"]]
def _load_observations(
self,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
columns: list[str],
) -> pd.DataFrame:
invalid = [col for col in columns if col not in OBSERVATION_COLUMNS]
if invalid:
raise ValueError(f"Unsupported observation columns requested: {invalid}")
sql_columns = ", ".join(columns)
query = text(
f"""
SELECT delivery_start, {sql_columns}
FROM electricity_market_observations
WHERE country_code = :country_code
AND delivery_start >= :start_ts
AND delivery_start < :end_ts
ORDER BY delivery_start
"""
)
with self.engine.begin() as conn:
rows = conn.execute(
query,
{"country_code": country_code, "start_ts": start, "end_ts": end},
).fetchall()
if not rows:
return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], tz="UTC"))
db_frame = pd.DataFrame(rows, columns=["delivery_start", *columns])
db_frame["delivery_start"] = pd.to_datetime(db_frame["delivery_start"], utc=True)
db_frame = db_frame.set_index("delivery_start").sort_index()
return db_frame
def _fetch_inputs_with_secondary_cache(
self,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
required_columns: list[str],
api_fetcher,
) -> pd.DataFrame:
start_ts, end_ts = _normalize_utc_bounds(start, end)
expected_index = pd.date_range(start=start_ts, end=end_ts, freq="h", inclusive="left")
from_observations = self._load_observations(country_code, start_ts, end_ts, required_columns)
if from_observations.empty:
complete_index = pd.DatetimeIndex([], tz="UTC")
else:
complete_mask = from_observations[required_columns].notna().all(axis=1)
complete_index = from_observations.index[complete_mask]
missing_index = expected_index.difference(complete_index)
fetched_parts: list[pd.DataFrame] = []
for missing_start, missing_end in _missing_ranges(missing_index):
fetched = api_fetcher(country_code, missing_start, missing_end)
fetched = fetched.reindex(columns=required_columns)
fetched_parts.append(fetched)
if fetched_parts:
fetched_frame = pd.concat(fetched_parts).sort_index()
self.upsert_raw_data(country_code=country_code, frame=fetched_frame)
else:
fetched_frame = pd.DataFrame(columns=required_columns, index=pd.DatetimeIndex([], tz="UTC"))
combined = pd.concat([from_observations, fetched_frame]).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
combined = combined.reindex(expected_index)
return combined[required_columns]
def fetch_inputs(
self, country_code: str, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
resolved_country_code = self.resolve_country_code(country_code)
price = _coerce_single_column_frame(
self.get_day_ahead_prices(resolved_country_code, start, end),
target_column="day_ahead_price",
preferred_tokens=("price", "ahead"),
)
load = _coerce_single_column_frame(
self.get_load_forecast(resolved_country_code, start, end),
target_column="load_forecast",
preferred_tokens=("load",),
)
wind_solar = self.get_wind_solar_forecast(resolved_country_code, start, end)
df = price.join(load, how="outer").join(wind_solar, how="outer").sort_index()
return df
def upsert_raw_data(self, country_code: str, frame: pd.DataFrame) -> None:
rows = []
for ts, row in frame.iterrows():
rows.append(
{
"country_code": country_code,
"delivery_start": ts.to_pydatetime(),
"day_ahead_price": _safe_float(row.get("day_ahead_price")),
"load_forecast": _safe_float(row.get("load_forecast")),
"wind_forecast": _safe_float(row.get("wind_forecast")),
"solar_forecast": _safe_float(row.get("solar_forecast")),
}
)
if not rows:
return
with self.engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO electricity_market_observations (
country_code,
delivery_start,
day_ahead_price,
load_forecast,
wind_forecast,
solar_forecast
) VALUES (
:country_code,
:delivery_start,
:day_ahead_price,
:load_forecast,
:wind_forecast,
:solar_forecast
)
ON CONFLICT (country_code, delivery_start) DO UPDATE
SET day_ahead_price = COALESCE(EXCLUDED.day_ahead_price, electricity_market_observations.day_ahead_price),
load_forecast = COALESCE(EXCLUDED.load_forecast, electricity_market_observations.load_forecast),
wind_forecast = COALESCE(EXCLUDED.wind_forecast, electricity_market_observations.wind_forecast),
solar_forecast = COALESCE(EXCLUDED.solar_forecast, electricity_market_observations.solar_forecast),
ingested_at = NOW()
"""
),
rows,
)

View File

@@ -1,58 +0,0 @@
from __future__ import annotations
import math
import pandas as pd
def _cyclical_encode(values: pd.Series, period: int, prefix: str) -> pd.DataFrame:
angle = 2.0 * math.pi * values / period
return pd.DataFrame(
{f"{prefix}_sin": angle.apply(math.sin), f"{prefix}_cos": angle.apply(math.cos)},
index=values.index,
)
def build_feature_frame(inputs: pd.DataFrame, max_lag: int = 24) -> pd.DataFrame:
"""
Build feature set for electricity price forecasting.
Included:
- day_ahead_price, load_forecast, wind_forecast, solar_forecast
- residual_load
- lagged_price(t-1..t-24)
- lagged_residual_load(t-1..t-24)
- hour/week_day/month cyclical encodings
"""
df = inputs.copy().sort_index()
required = {"day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"}
missing = required.difference(df.columns)
if missing:
raise ValueError(f"Missing required input columns: {sorted(missing)}")
# Preserve source missingness semantics for downstream users.
# We keep NaNs instead of imputing to 0.0 so missing data is explicit.
df["residual_load"] = df["load_forecast"] - df["wind_forecast"] - df["solar_forecast"]
for lag in range(1, max_lag + 1):
df[f"lagged_price_{lag}"] = df["day_ahead_price"].shift(lag)
df[f"lagged_residual_load_{lag}"] = df["residual_load"].shift(lag)
time_index = df.index
if time_index.tz is None:
time_index = time_index.tz_localize("UTC")
else:
time_index = time_index.tz_convert("UTC")
cyclical = [
_cyclical_encode(pd.Series(time_index.hour, index=df.index), 24, "hour_of_day"),
_cyclical_encode(pd.Series(time_index.weekday, index=df.index), 7, "weekday"),
_cyclical_encode(pd.Series(time_index.month - 1, index=df.index), 12, "month"),
]
for cyc in cyclical:
df = df.join(cyc)
# Only enforce warmup/history constraints for price lags.
# Other feature columns can remain NaN when source data is missing.
required_for_row = ["day_ahead_price", *[f"lagged_price_{lag}" for lag in range(1, max_lag + 1)]]
return df.dropna(subset=required_for_row).sort_index()

View File

@@ -1,146 +0,0 @@
from __future__ import annotations
from datetime import datetime, timezone
import pandas as pd
from entsoe import EntsoePandasClient
from sqlalchemy import text
from sqlalchemy.engine import Engine
from .entsoe_api import EntsoeDataService
from .features import build_feature_frame
def _safe_float(value) -> float | None:
if pd.isna(value):
return None
return float(value)
def _validate_inputs_have_signal(inputs: pd.DataFrame, country_code: str) -> None:
required = ["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast"]
non_null_counts = {col: int(inputs[col].notna().sum()) for col in required if col in inputs.columns}
if non_null_counts and all(count == 0 for count in non_null_counts.values()):
raise ValueError(
"No ENTSO-E data available for "
f"'{country_code}' in the requested time window. "
"Try another bidding zone/time range, and ensure the API key has access."
)
def persist_feature_frame(engine: Engine, country_code: str, feature_frame: pd.DataFrame) -> None:
# Feature-store schema expects core numeric fields to be non-null.
# Keep NaNs in the returned DataFrame, but skip incomplete rows on DB write.
persistable = feature_frame.dropna(
subset=["day_ahead_price", "load_forecast", "wind_forecast", "solar_forecast", "residual_load"]
)
rows = []
for ts, row in persistable.iterrows():
lag_price = [float(row[f"lagged_price_{lag}"]) for lag in range(1, 25)]
lag_residual = [float(row[f"lagged_residual_load_{lag}"]) for lag in range(1, 25)]
rows.append(
{
"country_code": country_code,
"delivery_start": ts.to_pydatetime(),
"day_ahead_price": float(row["day_ahead_price"]),
"load_forecast": _safe_float(row["load_forecast"]),
"wind_forecast": _safe_float(row["wind_forecast"]),
"solar_forecast": _safe_float(row["solar_forecast"]),
"residual_load": _safe_float(row["residual_load"]),
"lagged_price": lag_price,
"lagged_residual_load": lag_residual,
"hour_of_day_sin": float(row["hour_of_day_sin"]),
"hour_of_day_cos": float(row["hour_of_day_cos"]),
"weekday_sin": float(row["weekday_sin"]),
"weekday_cos": float(row["weekday_cos"]),
"month_sin": float(row["month_sin"]),
"month_cos": float(row["month_cos"]),
"feature_version": "v1",
"created_at": datetime.now(timezone.utc),
}
)
if not rows:
return
with engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO electricity_price_features (
country_code,
delivery_start,
day_ahead_price,
load_forecast,
wind_forecast,
solar_forecast,
residual_load,
lagged_price,
lagged_residual_load,
hour_of_day_sin,
hour_of_day_cos,
weekday_sin,
weekday_cos,
month_sin,
month_cos,
feature_version,
created_at
) VALUES (
:country_code,
:delivery_start,
:day_ahead_price,
:load_forecast,
:wind_forecast,
:solar_forecast,
:residual_load,
:lagged_price,
:lagged_residual_load,
:hour_of_day_sin,
:hour_of_day_cos,
:weekday_sin,
:weekday_cos,
:month_sin,
:month_cos,
:feature_version,
:created_at
)
ON CONFLICT (country_code, delivery_start, feature_version) DO UPDATE
SET day_ahead_price = EXCLUDED.day_ahead_price,
load_forecast = EXCLUDED.load_forecast,
wind_forecast = EXCLUDED.wind_forecast,
solar_forecast = EXCLUDED.solar_forecast,
residual_load = EXCLUDED.residual_load,
lagged_price = EXCLUDED.lagged_price,
lagged_residual_load = EXCLUDED.lagged_residual_load,
hour_of_day_sin = EXCLUDED.hour_of_day_sin,
hour_of_day_cos = EXCLUDED.hour_of_day_cos,
weekday_sin = EXCLUDED.weekday_sin,
weekday_cos = EXCLUDED.weekday_cos,
month_sin = EXCLUDED.month_sin,
month_cos = EXCLUDED.month_cos,
created_at = EXCLUDED.created_at
"""
),
rows,
)
def run_feature_pipeline(
engine: Engine,
entsoe_api_key: str,
country_code: str,
start: pd.Timestamp,
end: pd.Timestamp,
cache_ttl_hours: int = 24,
) -> pd.DataFrame:
client = EntsoePandasClient(api_key=entsoe_api_key)
service = EntsoeDataService(client=client, engine=engine, cache_ttl_hours=cache_ttl_hours)
resolved_country_code = service.resolve_country_code(country_code)
inputs = service.fetch_inputs(country_code=resolved_country_code, start=start, end=end)
_validate_inputs_have_signal(inputs, resolved_country_code)
service.upsert_raw_data(country_code=resolved_country_code, frame=inputs)
features = build_feature_frame(inputs)
persist_feature_frame(engine, country_code=resolved_country_code, feature_frame=features)
return features

View File

@@ -3,13 +3,10 @@ import pandas as pd
import yfinance as yf
from db_connect import db_engine
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import MetaData, Table
# --- CONFIG ---
TICKERS = ["UBS", "^GSPC"]
DAYS_BACK = 31 # ~3 weeks
DAYS_BACK = 21 # ~3 weeks
TABLE_NAME = "prices"
def fetch_data(tickers, start_date, end_date):
@@ -23,11 +20,6 @@ def fetch_data(tickers, start_date, end_date):
)
return data
def get_asset_map(engine):
query = "SELECT id, ticker FROM assets"
df = pd.read_sql(query, engine)
return dict(zip(df["ticker"], df["id"]))
def transform_data(raw_data):
frames = []
@@ -50,37 +42,17 @@ def transform_data(raw_data):
df["return"] = df["close"].pct_change()
frames.append(df)
return pd.concat(frames, ignore_index=True)
def load_to_postgres(df, engine):
asset_map = get_asset_map(engine)
df["asset_id"] = df["ticker"].map(asset_map)
df = df.drop(columns=["ticker"])
metadata = MetaData()
prices = Table(TABLE_NAME, metadata, autoload_with=engine)
with engine.begin() as conn:
for _, row in df.iterrows():
stmt = insert(prices).values({
"asset_id": row["asset_id"],
"date": row["date"],
"close": row["close"],
"volume": row["volume"],
"return": row["return"]
})
stmt = stmt.on_conflict_do_update(
index_elements=["asset_id", "date"],
set_={
"close": stmt.excluded.close,
"volume": stmt.excluded.volume,
"return": stmt.excluded["return"], # important change
}
)
conn.execute(stmt)
df.to_sql(
TABLE_NAME,
engine,
if_exists="append",
index=False
)
def main():