Compare commits

..

2 Commits

Author SHA1 Message Date
David Doebel
5008becd15 Write first data ingestion and SQL support
Some checks failed
C++ CI / build (push) Has been cancelled
2026-03-12 14:50:47 +01:00
David Doebel
a503514bf5 Write first data ingestion and SQL support 2026-03-12 13:43:35 +01:00
5 changed files with 367 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
DB_CONFIG = {
"host": "localhost",
"port": 5432,
"database": "options_db",
"user": "quant_user",
"password": "strong_password",
}
PIPELINE_CONFIG = {
"symbols": [
"SPY"
# Example: "SPY"
]
}

View File

@@ -0,0 +1,13 @@
import psycopg2
conn = psycopg2.connect(
dbname="options_db",
user="quant_user",
password="strong_password",
host="144.91.73.49",
port="5432"
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM underlyings;")
print(cursor.fetchall())

View File

@@ -0,0 +1,280 @@
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from config.settings import DB_CONFIG, PIPELINE_CONFIG
def build_db_url() -> str:
return (
f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}"
f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)
def to_python_number(value):
"""Convert pandas/numpy values to plain Python values or None."""
if pd.isna(value):
return None
return value
def compute_mid(bid, ask):
bid = to_python_number(bid)
ask = to_python_number(ask)
if bid is None or ask is None:
return None
try:
return float((bid + ask) / 2.0)
except Exception:
return None
def infer_option_style(symbol: str) -> str:
"""
Very rough default convention:
- US equities / ETFs from Yahoo are usually American style
"""
# TODO: If later you ingest index options like SPX, adapt this logic.
return "american"
def get_or_create_underlying(conn, symbol: str) -> int:
query_insert = text("""
INSERT INTO underlyings (symbol, exchange, currency)
VALUES (:symbol, :exchange, :currency)
ON CONFLICT (symbol) DO NOTHING
""")
query_select = text("""
SELECT id FROM underlyings WHERE symbol = :symbol
""")
# TODO: improve exchange/currency detection if you want richer metadata
conn.execute(query_insert, {
"symbol": symbol,
"exchange": None,
"currency": "USD",
})
result = conn.execute(query_select, {"symbol": symbol}).fetchone()
return result[0] #h
def get_or_create_contract(
conn,
underlying_id: int,
option_type: str,
strike: float,
expiration_date,
style: str,
contract_symbol: str,
) -> int:
query_insert = text("""
INSERT INTO option_contracts (
underlying_id, option_type, strike, expiration_date, style, contract_symbol
)
VALUES (
:underlying_id, :option_type, :strike, :expiration_date, :style, :contract_symbol
)
ON CONFLICT (underlying_id, option_type, strike, expiration_date)
DO NOTHING
""")
query_select = text("""
SELECT id
FROM option_contracts
WHERE underlying_id = :underlying_id
AND option_type = :option_type
AND strike = :strike
AND expiration_date = :expiration_date
""")
conn.execute(query_insert, {
"underlying_id": underlying_id,
"option_type": option_type,
"strike": strike,
"expiration_date": expiration_date,
"style": style,
"contract_symbol": contract_symbol,
})
result = conn.execute(query_select, {
"underlying_id": underlying_id,
"option_type": option_type,
"strike": strike,
"expiration_date": expiration_date,
}).fetchone()
return result[0]
def insert_underlying_price(conn, underlying_id: int, price_timestamp: datetime, price: float):
query = text("""
INSERT INTO underlying_prices (underlying_id, price_timestamp, price)
VALUES (:underlying_id, :price_timestamp, :price)
ON CONFLICT (underlying_id, price_timestamp) DO NOTHING
""")
conn.execute(query, {
"underlying_id": underlying_id,
"price_timestamp": price_timestamp,
"price": price,
})
def insert_option_quote(
conn,
contract_id: int,
quote_timestamp: datetime,
bid,
ask,
mid,
last_price,
implied_vol,
volume,
open_interest,
):
query = text("""
INSERT INTO option_quotes (
contract_id, quote_timestamp, bid, ask, mid,
last_price, implied_vol, volume, open_interest
)
VALUES (
:contract_id, :quote_timestamp, :bid, :ask, :mid,
:last_price, :implied_vol, :volume, :open_interest
)
ON CONFLICT (contract_id, quote_timestamp) DO NOTHING
""")
conn.execute(query, {
"contract_id": contract_id,
"quote_timestamp": quote_timestamp,
"bid": bid,
"ask": ask,
"mid": mid,
"last_price": last_price,
"implied_vol": implied_vol,
"volume": volume,
"open_interest": open_interest,
})
def process_option_dataframe(conn, df: pd.DataFrame, underlying_id: int, option_type: str, symbol: str, expiration_date, quote_timestamp: datetime):
style = infer_option_style(symbol)
for _, row in df.iterrows():
strike = to_python_number(row.get("strike"))
contract_symbol = to_python_number(row.get("contractSymbol"))
bid = to_python_number(row.get("bid"))
ask = to_python_number(row.get("ask"))
last_price = to_python_number(row.get("lastPrice"))
implied_vol = to_python_number(row.get("impliedVolatility"))
volume = to_python_number(row.get("volume"))
open_interest = to_python_number(row.get("openInterest"))
if strike is None:
continue
contract_id = get_or_create_contract(
conn=conn,
underlying_id=underlying_id,
option_type=option_type,
strike=float(strike),
expiration_date=expiration_date,
style=style,
contract_symbol=contract_symbol,
)
mid = compute_mid(bid, ask)
insert_option_quote(
conn=conn,
contract_id=contract_id,
quote_timestamp=quote_timestamp,
bid=bid,
ask=ask,
mid=mid,
last_price=last_price,
implied_vol=implied_vol,
volume=int(volume) if volume is not None else None,
open_interest=int(open_interest) if open_interest is not None else None,
)
def ingest_symbol(symbol: str, engine):
print(f"Starting ingestion for {symbol}...")
ticker = yf.Ticker(symbol)
expirations = ticker.options
if not expirations:
print(f"No options found for {symbol}")
return
quote_timestamp = datetime.now(timezone.utc)
# Try to get spot price
info = {}
try:
info = ticker.fast_info
except Exception:
pass
spot_price = None
if info:
spot_price = info.get("lastPrice") or info.get("last_price")
with engine.begin() as conn:
underlying_id = get_or_create_underlying(conn, symbol)
if spot_price is not None:
insert_underlying_price(
conn=conn,
underlying_id=underlying_id,
price_timestamp=quote_timestamp,
price=float(spot_price),
)
for expiry in expirations:
print(f" Fetching expiry {expiry} ...")
chain = ticker.option_chain(expiry)
expiration_date = pd.to_datetime(expiry).date()
process_option_dataframe(
conn=conn,
df=chain.calls,
underlying_id=underlying_id,
option_type="call",
symbol=symbol,
expiration_date=expiration_date,
quote_timestamp=quote_timestamp,
)
process_option_dataframe(
conn=conn,
df=chain.puts,
underlying_id=underlying_id,
option_type="put",
symbol=symbol,
expiration_date=expiration_date,
quote_timestamp=quote_timestamp,
)
print(f"Finished ingestion for {symbol}.")
def main():
db_url = build_db_url()
engine = create_engine(db_url, future=True)
for symbol in PIPELINE_CONFIG["symbols"]:
ingest_symbol(symbol, engine)
if __name__ == "__main__":
main()

49
src/data/sql/schema.sql Normal file
View File

@@ -0,0 +1,49 @@
CREATE TABLE IF NOT EXISTS underlyings (
id SERIAL PRIMARY KEY,
symbol TEXT UNIQUE NOT NULL,
exchange TEXT,
currency TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS option_contracts (
id SERIAL PRIMARY KEY,
underlying_id INTEGER NOT NULL REFERENCES underlyings(id),
option_type TEXT NOT NULL CHECK (option_type IN ('call', 'put')),
strike NUMERIC NOT NULL,
expiration_date DATE NOT NULL,
style TEXT,
contract_symbol TEXT,
UNIQUE (underlying_id, option_type, strike, expiration_date)
);
CREATE TABLE IF NOT EXISTS option_quotes (
id SERIAL PRIMARY KEY,
contract_id INTEGER NOT NULL REFERENCES option_contracts(id),
quote_timestamp TIMESTAMP NOT NULL,
bid NUMERIC,
ask NUMERIC,
mid NUMERIC,
last_price NUMERIC,
implied_vol NUMERIC,
volume INTEGER,
open_interest INTEGER,
UNIQUE (contract_id, quote_timestamp)
);
CREATE TABLE IF NOT EXISTS underlying_prices (
id SERIAL PRIMARY KEY,
underlying_id INTEGER NOT NULL REFERENCES underlyings(id),
price_timestamp TIMESTAMP NOT NULL,
price NUMERIC NOT NULL,
UNIQUE (underlying_id, price_timestamp)
);
CREATE INDEX IF NOT EXISTS idx_option_quotes_timestamp
ON option_quotes(quote_timestamp);
CREATE INDEX IF NOT EXISTS idx_option_quotes_contract_id
ON option_quotes(contract_id);
CREATE INDEX IF NOT EXISTS idx_option_contracts_underlying_expiry
ON option_contracts(underlying_id, expiration_date);

11
src/data/yfinance_pull.py Normal file
View File

@@ -0,0 +1,11 @@
import yfinance as yf
ticker = yf.Ticker("AAPL")
expirations = ticker.options
print(expirations)
chain = ticker.option_chain(expirations[0])
calls = chain.calls
puts = chain.puts