From a503514bf58389a310e41da92f1d14455c3f8f44 Mon Sep 17 00:00:00 2001 From: David Doebel Date: Thu, 12 Mar 2026 13:43:35 +0100 Subject: [PATCH] Write first data ingestion and SQL support --- src/data/config/settings.py | 14 ++ src/data/database_interaction.py | 13 + src/data/ingestion/ingest_yahoo_options.py | 280 +++++++++++++++++++++ src/data/sql/schema.sql | 49 ++++ src/data/yfinance_pull.py | 11 + 5 files changed, 367 insertions(+) create mode 100644 src/data/config/settings.py create mode 100644 src/data/database_interaction.py create mode 100644 src/data/ingestion/ingest_yahoo_options.py create mode 100644 src/data/sql/schema.sql create mode 100644 src/data/yfinance_pull.py diff --git a/src/data/config/settings.py b/src/data/config/settings.py new file mode 100644 index 0000000..49cb6a0 --- /dev/null +++ b/src/data/config/settings.py @@ -0,0 +1,14 @@ +DB_CONFIG = { + "host": "localhost", + "port": 5432, + "database": "options_db", + "user": "quant_user", + "password": "strong_password", +} + +PIPELINE_CONFIG = { + "symbols": [ + "SPY" + # Example: "SPY" + ] +} \ No newline at end of file diff --git a/src/data/database_interaction.py b/src/data/database_interaction.py new file mode 100644 index 0000000..0d0f9c5 --- /dev/null +++ b/src/data/database_interaction.py @@ -0,0 +1,13 @@ +import psycopg2 + +conn = psycopg2.connect( + dbname="options_db", + user="quant_user", + password="strong_password", + host="144.91.73.49", + port="5432" +) + +cursor = conn.cursor() +cursor.execute("SELECT * FROM underlyings;") +print(cursor.fetchall()) \ No newline at end of file diff --git a/src/data/ingestion/ingest_yahoo_options.py b/src/data/ingestion/ingest_yahoo_options.py new file mode 100644 index 0000000..e87e279 --- /dev/null +++ b/src/data/ingestion/ingest_yahoo_options.py @@ -0,0 +1,280 @@ +from datetime import datetime, timezone +from decimal import Decimal, InvalidOperation + +import pandas as pd +import yfinance as yf +from sqlalchemy import create_engine, text + +from config.settings import DB_CONFIG, PIPELINE_CONFIG + + +def build_db_url() -> str: + return ( + f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}" + f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" + ) + + +def to_python_number(value): + """Convert pandas/numpy values to plain Python values or None.""" + if pd.isna(value): + return None + return value + + +def compute_mid(bid, ask): + bid = to_python_number(bid) + ask = to_python_number(ask) + + if bid is None or ask is None: + return None + try: + return float((bid + ask) / 2.0) + except Exception: + return None + + +def infer_option_style(symbol: str) -> str: + """ + Very rough default convention: + - US equities / ETFs from Yahoo are usually American style + """ + # TODO: If later you ingest index options like SPX, adapt this logic. + return "american" + + +def get_or_create_underlying(conn, symbol: str) -> int: + query_insert = text(""" + INSERT INTO underlyings (symbol, exchange, currency) + VALUES (:symbol, :exchange, :currency) + ON CONFLICT (symbol) DO NOTHING + """) + + query_select = text(""" + SELECT id FROM underlyings WHERE symbol = :symbol + """) + + # TODO: improve exchange/currency detection if you want richer metadata + conn.execute(query_insert, { + "symbol": symbol, + "exchange": None, + "currency": "USD", + }) + + result = conn.execute(query_select, {"symbol": symbol}).fetchone() + return result[0] + + +def get_or_create_contract( + conn, + underlying_id: int, + option_type: str, + strike: float, + expiration_date, + style: str, + contract_symbol: str, +) -> int: + query_insert = text(""" + INSERT INTO option_contracts ( + underlying_id, option_type, strike, expiration_date, style, contract_symbol + ) + VALUES ( + :underlying_id, :option_type, :strike, :expiration_date, :style, :contract_symbol + ) + ON CONFLICT (underlying_id, option_type, strike, expiration_date) + DO NOTHING + """) + + query_select = text(""" + SELECT id + FROM option_contracts + WHERE underlying_id = :underlying_id + AND option_type = :option_type + AND strike = :strike + AND expiration_date = :expiration_date + """) + + conn.execute(query_insert, { + "underlying_id": underlying_id, + "option_type": option_type, + "strike": strike, + "expiration_date": expiration_date, + "style": style, + "contract_symbol": contract_symbol, + }) + + result = conn.execute(query_select, { + "underlying_id": underlying_id, + "option_type": option_type, + "strike": strike, + "expiration_date": expiration_date, + }).fetchone() + + return result[0] + + +def insert_underlying_price(conn, underlying_id: int, price_timestamp: datetime, price: float): + query = text(""" + INSERT INTO underlying_prices (underlying_id, price_timestamp, price) + VALUES (:underlying_id, :price_timestamp, :price) + ON CONFLICT (underlying_id, price_timestamp) DO NOTHING + """) + conn.execute(query, { + "underlying_id": underlying_id, + "price_timestamp": price_timestamp, + "price": price, + }) + + +def insert_option_quote( + conn, + contract_id: int, + quote_timestamp: datetime, + bid, + ask, + mid, + last_price, + implied_vol, + volume, + open_interest, +): + query = text(""" + INSERT INTO option_quotes ( + contract_id, quote_timestamp, bid, ask, mid, + last_price, implied_vol, volume, open_interest + ) + VALUES ( + :contract_id, :quote_timestamp, :bid, :ask, :mid, + :last_price, :implied_vol, :volume, :open_interest + ) + ON CONFLICT (contract_id, quote_timestamp) DO NOTHING + """) + + conn.execute(query, { + "contract_id": contract_id, + "quote_timestamp": quote_timestamp, + "bid": bid, + "ask": ask, + "mid": mid, + "last_price": last_price, + "implied_vol": implied_vol, + "volume": volume, + "open_interest": open_interest, + }) + + +def process_option_dataframe(conn, df: pd.DataFrame, underlying_id: int, option_type: str, symbol: str, expiration_date, quote_timestamp: datetime): + style = infer_option_style(symbol) + + for _, row in df.iterrows(): + strike = to_python_number(row.get("strike")) + contract_symbol = to_python_number(row.get("contractSymbol")) + bid = to_python_number(row.get("bid")) + ask = to_python_number(row.get("ask")) + last_price = to_python_number(row.get("lastPrice")) + implied_vol = to_python_number(row.get("impliedVolatility")) + volume = to_python_number(row.get("volume")) + open_interest = to_python_number(row.get("openInterest")) + + if strike is None: + continue + + contract_id = get_or_create_contract( + conn=conn, + underlying_id=underlying_id, + option_type=option_type, + strike=float(strike), + expiration_date=expiration_date, + style=style, + contract_symbol=contract_symbol, + ) + + mid = compute_mid(bid, ask) + + insert_option_quote( + conn=conn, + contract_id=contract_id, + quote_timestamp=quote_timestamp, + bid=bid, + ask=ask, + mid=mid, + last_price=last_price, + implied_vol=implied_vol, + volume=int(volume) if volume is not None else None, + open_interest=int(open_interest) if open_interest is not None else None, + ) + + +def ingest_symbol(symbol: str, engine): + print(f"Starting ingestion for {symbol}...") + + ticker = yf.Ticker(symbol) + expirations = ticker.options + + if not expirations: + print(f"No options found for {symbol}") + return + + quote_timestamp = datetime.now(timezone.utc) + + # Try to get spot price + info = {} + try: + info = ticker.fast_info + except Exception: + pass + + spot_price = None + if info: + spot_price = info.get("lastPrice") or info.get("last_price") + + with engine.begin() as conn: + underlying_id = get_or_create_underlying(conn, symbol) + + if spot_price is not None: + insert_underlying_price( + conn=conn, + underlying_id=underlying_id, + price_timestamp=quote_timestamp, + price=float(spot_price), + ) + + for expiry in expirations: + print(f" Fetching expiry {expiry} ...") + chain = ticker.option_chain(expiry) + + expiration_date = pd.to_datetime(expiry).date() + + process_option_dataframe( + conn=conn, + df=chain.calls, + underlying_id=underlying_id, + option_type="call", + symbol=symbol, + expiration_date=expiration_date, + quote_timestamp=quote_timestamp, + ) + + process_option_dataframe( + conn=conn, + df=chain.puts, + underlying_id=underlying_id, + option_type="put", + symbol=symbol, + expiration_date=expiration_date, + quote_timestamp=quote_timestamp, + ) + + print(f"Finished ingestion for {symbol}.") + + +def main(): + db_url = build_db_url() + engine = create_engine(db_url, future=True) + + for symbol in PIPELINE_CONFIG["symbols"]: + ingest_symbol(symbol, engine) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/data/sql/schema.sql b/src/data/sql/schema.sql new file mode 100644 index 0000000..1260cb1 --- /dev/null +++ b/src/data/sql/schema.sql @@ -0,0 +1,49 @@ +CREATE TABLE IF NOT EXISTS underlyings ( + id SERIAL PRIMARY KEY, + symbol TEXT UNIQUE NOT NULL, + exchange TEXT, + currency TEXT, + created_at TIMESTAMP DEFAULT NOW() + ); + +CREATE TABLE IF NOT EXISTS option_contracts ( + id SERIAL PRIMARY KEY, + underlying_id INTEGER NOT NULL REFERENCES underlyings(id), + option_type TEXT NOT NULL CHECK (option_type IN ('call', 'put')), + strike NUMERIC NOT NULL, + expiration_date DATE NOT NULL, + style TEXT, + contract_symbol TEXT, + UNIQUE (underlying_id, option_type, strike, expiration_date) + ); + +CREATE TABLE IF NOT EXISTS option_quotes ( + id SERIAL PRIMARY KEY, + contract_id INTEGER NOT NULL REFERENCES option_contracts(id), + quote_timestamp TIMESTAMP NOT NULL, + bid NUMERIC, + ask NUMERIC, + mid NUMERIC, + last_price NUMERIC, + implied_vol NUMERIC, + volume INTEGER, + open_interest INTEGER, + UNIQUE (contract_id, quote_timestamp) + ); + +CREATE TABLE IF NOT EXISTS underlying_prices ( + id SERIAL PRIMARY KEY, + underlying_id INTEGER NOT NULL REFERENCES underlyings(id), + price_timestamp TIMESTAMP NOT NULL, + price NUMERIC NOT NULL, + UNIQUE (underlying_id, price_timestamp) + ); + +CREATE INDEX IF NOT EXISTS idx_option_quotes_timestamp + ON option_quotes(quote_timestamp); + +CREATE INDEX IF NOT EXISTS idx_option_quotes_contract_id + ON option_quotes(contract_id); + +CREATE INDEX IF NOT EXISTS idx_option_contracts_underlying_expiry + ON option_contracts(underlying_id, expiration_date); \ No newline at end of file diff --git a/src/data/yfinance_pull.py b/src/data/yfinance_pull.py new file mode 100644 index 0000000..ebd6ab3 --- /dev/null +++ b/src/data/yfinance_pull.py @@ -0,0 +1,11 @@ +import yfinance as yf + +ticker = yf.Ticker("AAPL") + +expirations = ticker.options +print(expirations) + +chain = ticker.option_chain(expirations[0]) + +calls = chain.calls +puts = chain.puts \ No newline at end of file