From e9b3a4aac3f30754ca7ea0e54db8e61d46dc2920 Mon Sep 17 00:00:00 2001 From: David Doebel Date: Thu, 2 Apr 2026 16:30:43 +0200 Subject: [PATCH] Add Python implied-volatility package and data analysis pipeline. Introduce the SVI/implied-vol modules as a package, update project metadata, and add loading/processing utilities that connect database snapshots to calibration workflows. Made-with: Cursor --- pyproject.toml | 24 + src/ImpliedVolatility/__init__.py | 0 src/ImpliedVolatility/compute_vls.py | 49 ++ src/ImpliedVolatility/setup.py | 0 src/ImpliedVolatility/svi.py | 1023 ++++++++++++++++++++++++++ src/__init__.py | 0 src/data/load_data.py | 436 +++++++++++ 7 files changed, 1532 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/ImpliedVolatility/__init__.py create mode 100644 src/ImpliedVolatility/compute_vls.py create mode 100644 src/ImpliedVolatility/setup.py create mode 100644 src/ImpliedVolatility/svi.py create mode 100644 src/__init__.py create mode 100644 src/data/load_data.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..cc96816 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["scikit-build-core>=0.5", "pybind11"] +build-backend = "scikit_build_core.build" + +[project] +name = "qengine" +version = "0.1.0" +description = "Quant engine with C++ backend" +authors = [{name = "David"}] +requires-python = ">=3.10" +dependencies = [ + "numpy", + "pandas", + "sqlalchemy", + "psycopg2-binary", + "yfinance", +] + +[tool.scikit-build] +# Keep separate from a local `cmake -B build` tree (different generators: Ninja vs Makefiles). +build-dir = "skbuild-build" +cmake.version = ">=3.16" +cmake.build-type = "Release" +cmake.define.BUILD_TESTING = "OFF" diff --git a/src/ImpliedVolatility/__init__.py b/src/ImpliedVolatility/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ImpliedVolatility/compute_vls.py b/src/ImpliedVolatility/compute_vls.py new file mode 100644 index 0000000..ef68ed2 --- /dev/null +++ b/src/ImpliedVolatility/compute_vls.py @@ -0,0 +1,49 @@ +import numpy as np +import qengine +from scipy.optimize import brentq + + +def implied_vol(price, S, K, T, r, call): + """ + Implied vol for each row. Arguments may be scalars or 1-D arrays-like (same length). + """ + price = np.asarray(price, dtype=np.float64) + S = np.asarray(S, dtype=np.float64) + K = np.asarray(K, dtype=np.float64) + T = np.asarray(T, dtype=np.float64) + call = np.asarray(call, dtype=bool) + r = float(r) + + scalar_in = price.ndim == 0 + if scalar_in: + price = np.atleast_1d(price) + S = np.atleast_1d(S) + K = np.atleast_1d(K) + T = np.atleast_1d(T) + call = np.atleast_1d(call) + + n = price.shape[0] + if (S.shape[0] != n or K.shape[0] != n or T.shape[0] != n or call.shape[0] != n): + raise ValueError( + f"implied_vol: length mismatch price={n}, S={S.shape[0]}, K={K.shape[0]}, " + f"T={T.shape[0]}, call={call.shape[0]}" + ) + + out = np.full(n, np.nan, dtype=np.float64) + for i in range(n): + p, s, k, t, c = float(price[i]), float(S[i]), float(K[i]), float(T[i]), bool(call[i]) + if not np.isfinite(p) or not np.isfinite(s) or not np.isfinite(k) or not np.isfinite(t): + continue + if s <= 0 or k <= 0 or t <= 0: + continue + try: + def f(sig: float) -> float: + return qengine.bs_price(s, k, t, r, sig, c) - p + + out[i] = brentq(f, 1e-6, 5.0) + except (ValueError, RuntimeError): + out[i] = np.nan + + if scalar_in: + return float(out[0]) + return out diff --git a/src/ImpliedVolatility/setup.py b/src/ImpliedVolatility/setup.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ImpliedVolatility/svi.py b/src/ImpliedVolatility/svi.py new file mode 100644 index 0000000..dab56f5 --- /dev/null +++ b/src/ImpliedVolatility/svi.py @@ -0,0 +1,1023 @@ +import numpy as np +import pandas as pd +from scipy.optimize import least_squares, minimize + +# --------------------------------------------------------------------------- +# Core SVI math +# --------------------------------------------------------------------------- + + +def svi_total_variance( + k: np.ndarray, + a: float, + b: float, + rho: float, + m: float, + sigma: float, +) -> np.ndarray: + """Total variance w(k) = a + b * (rho * (k - m) + sqrt((k - m)^2 + sigma^2)).""" + km = k - m + root = np.sqrt(km**2 + sigma**2) + return a + b * (rho * km + root) + + +def svi_jacobian_params( + k: np.ndarray, a: float, b: float, rho: float, m: float, sigma: float +) -> np.ndarray: + """Jacobian (n, 5): columns [da, db, drho, dm, dsigma] of w(k).""" + km = k - m + root = np.maximum(np.sqrt(km**2 + sigma**2), 1e-14) + return np.column_stack( + [ + np.ones_like(k), + rho * km + root, + b * km, + -b * (rho + km / root), + b * (sigma / root), + ] + ) + + +def log_forward_moneyness( + strike: np.ndarray, + spot: np.ndarray, + T: np.ndarray, + r: float, +) -> np.ndarray: + """k = log(K / F) with F = S * exp(r * T).""" + fwd = spot * np.exp(r * np.asarray(T, dtype=np.float64)) + return np.log(np.asarray(strike, dtype=np.float64) / fwd) + + +def total_variance_from_iv(iv: np.ndarray, T: np.ndarray) -> np.ndarray: + """w = sigma^2 * T.""" + iv = np.asarray(iv, dtype=np.float64) + T = np.asarray(T, dtype=np.float64) + return iv**2 * T + + +# --------------------------------------------------------------------------- +# Data preparation (load_data.merge + compute_iv output) +# --------------------------------------------------------------------------- + + +def prepare_svi_inputs( + df: pd.DataFrame, + *, + spot_col: str = "spot", + strike_col: str = "strike", + T_col: str = "T", + iv_col: str = "iv", + r: float = 0.05, + spread_col: Optional[str] = "spread", +) -> pd.DataFrame: + """ + Add columns ``log_moneyness`` (k = log K/F) and ``total_var`` (w = iv^2 T). + If ``spread_col`` is present, adds ``svi_weight`` ~ 1 / spread^2 for weighted fits. + Drops rows with non-finite inputs or non-positive T. + """ + out = df.copy() + S = out[spot_col].to_numpy(dtype=np.float64) + K = out[strike_col].to_numpy(dtype=np.float64) + T = out[T_col].to_numpy(dtype=np.float64) + iv = out[iv_col].to_numpy(dtype=np.float64) + + valid = ( + np.isfinite(S) + & np.isfinite(K) + & np.isfinite(T) + & np.isfinite(iv) + & (S > 0) + & (K > 0) + & (T > 0) + & (iv > 0) + ) + out = out.loc[valid].copy() + S = out[spot_col].to_numpy(dtype=np.float64) + K = out[strike_col].to_numpy(dtype=np.float64) + T = out[T_col].to_numpy(dtype=np.float64) + iv = out[iv_col].to_numpy(dtype=np.float64) + + out["log_moneyness"] = log_forward_moneyness(K, S, T, r) + out["total_var"] = total_variance_from_iv(iv, T) + if spread_col and spread_col in out.columns: + sp = out[spread_col].to_numpy(dtype=np.float64) + out["svi_weight"] = 1.0 / np.maximum(sp**2, 1e-12) + return out + + +# --------------------------------------------------------------------------- +# Loss helpers (optional smooth Huber-style path via L-BFGS-B) +# --------------------------------------------------------------------------- + + +def huber_loss(residual: np.ndarray, delta: float = 0.01) -> np.ndarray: + ar = np.abs(residual) + return np.where(ar < delta, 0.5 * residual**2, delta * (ar - 0.5 * delta)) + + +def svi_huber_objective( + x: np.ndarray, + k: np.ndarray, + w_obs: np.ndarray, + sqrt_wts: np.ndarray, + lam: float, +) -> float: + a, b, rho, m, sigma = x + if b <= 0 or sigma <= 0 or abs(rho) >= 1.0: + return 1e12 + w_fit = svi_total_variance(k, a, b, rho, m, sigma) + resid = w_fit - w_obs + loss = np.mean(huber_loss(sqrt_wts * resid / (np.mean(sqrt_wts) + 1e-12))) + reg = lam * (a**2 + b**2 + m**2 + sigma**2) + return float(loss + reg) + + +# --------------------------------------------------------------------------- +# Fitting +# --------------------------------------------------------------------------- + + +@dataclass +class SVIParams: + a: float + b: float + rho: float + m: float + sigma: float + + def total_var(self, k: np.ndarray) -> np.ndarray: + return svi_total_variance(k, self.a, self.b, self.rho, self.m, self.sigma) + + +@dataclass +class SVISliceFit: + """Single-expiry calibration result.""" + + params: SVIParams + success: bool + cost: float + message: str + n_points: int + T_mean: float + group_key: str + + +@dataclass +class SVISurfaceFit: + slices: list[SVISliceFit] + meta: Mapping[str, object] + + +def _butterfly_constraints_ok(a: float, b: float, rho: float, m: float, sigma: float) -> bool: + """ + Gatheral-style no-butterfly constraints for raw SVI: + + - b > 0, sigma > 0, |rho| < 1 + - a + b*sigma*sqrt(1-rho^2) >= 0 (minimum variance >= 0) + - b*(1+|rho|) < 2 (wing slopes controlled) + """ + if not (b > 0.0 and sigma > 0.0 and abs(rho) < 1.0): + return False + if a + b * sigma * np.sqrt(max(1.0 - rho * rho, 0.0)) < 0.0: + return False + if b * (1.0 + abs(rho)) >= 2.0: + return False + return True + + +def _butterfly_violation_terms(a: float, b: float, rho: float, m: float, sigma: float) -> np.ndarray: + """ + Soft butterfly / wing violations as non-negative terms v_i such that + sum(v_i**2) is the arbitrage penalty used in the loss. + """ + # minimum variance >= 0 + min_var = a + b * sigma * np.sqrt(max(1.0 - rho * rho, 0.0)) + v_min = max(0.0, -min_var) + # wing slope constraint b(1+|rho|) < 2 + wing = b * (1.0 + abs(rho)) - 2.0 + v_wing = max(0.0, wing) + return np.array([v_min, v_wing], dtype=np.float64) + + +def _initial_guess(k: np.ndarray, w: np.ndarray) -> np.ndarray: + m0 = float(np.average(k, weights=np.clip(w, 1e-6, None))) + a0 = float(np.clip(np.percentile(w, 35), 1e-6, None)) + b0 = 0.25 + rho0 = -0.4 + sigma0 = 0.15 + return np.array([a0, b0, rho0, m0, sigma0], dtype=np.float64) + + +def _bounds(k: np.ndarray, w: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + w_max = float(np.max(w) * 1.5 + 0.1) + km = float(np.max(np.abs(k)) + 0.25) + lo = np.array([0.0, 1e-5, -0.999, -km, 1e-4], dtype=np.float64) + hi = np.array([w_max, 10.0, 0.999, km, 5.0], dtype=np.float64) + return lo, hi + + +def fit_svi_slice( + k: np.ndarray, + w_obs: np.ndarray, + *, + weights: Optional[np.ndarray] = None, + sqrt_weights: Optional[np.ndarray] = None, + huber_delta: float = 0.02, + reg_lambda: float = 1e-6, + method: str = "least_squares", + verbose: int = 0, +) -> tuple[SVIParams, object]: + """ + Calibrate one SVI slice. + + Parameters + ---------- + method + ``least_squares`` — recommended (soft_l1 + analytic Jacobian). + ``lbfgs`` — smooth Huber objective with L-BFGS-B (bounds). + """ + k = np.asarray(k, dtype=np.float64).ravel() + w_obs = np.asarray(w_obs, dtype=np.float64).ravel() + if k.shape != w_obs.shape: + raise ValueError("k and w_obs must have the same shape") + + if sqrt_weights is not None: + sw = np.asarray(sqrt_weights, dtype=np.float64).ravel() + elif weights is not None: + wts = np.asarray(weights, dtype=np.float64).ravel() + sw = np.sqrt(np.maximum(wts, 1e-12)) + else: + sw = np.ones_like(w_obs) + + mask = np.isfinite(k) & np.isfinite(w_obs) & (w_obs > 0) & np.isfinite(sw) + k, w_obs, sw = k[mask], w_obs[mask], sw[mask] + if k.size < 5: + raise ValueError("Need at least 5 valid points to fit SVI") + + x0 = _initial_guess(k, w_obs) + lo, hi = _bounds(k, w_obs) + + if method == "least_squares": + + def residuals(x: np.ndarray) -> np.ndarray: + a, b, rho, m, sig = x + wf = svi_total_variance(k, a, b, rho, m, sig) + if not np.all(np.isfinite(wf)) or np.any(wf <= 0.0): + return np.full_like(w_obs, 1e3) + return sw * (wf - w_obs) + + def jac(x: np.ndarray) -> np.ndarray: + a, b, rho, m, sig = x + jw = svi_jacobian_params(k, a, b, rho, m, sig) + return sw[:, None] * jw + + sol = least_squares( + lambda x: np.concatenate( + [ + residuals(x), + ( + np.sqrt(reg_lambda) * _butterfly_violation_terms(*x) + if reg_lambda > 0.0 + else np.zeros(2, dtype=np.float64) + ), + ] + ), + x0, + bounds=(lo, hi), + loss="soft_l1", + f_scale=huber_delta, + ftol=1e-10, + xtol=1e-10, + gtol=1e-10, + verbose=verbose, + max_nfev=2000, + ) + x = sol.x + params = SVIParams(float(x[0]), float(x[1]), float(x[2]), float(x[3]), float(x[4])) + return params, sol + + if method == "lbfgs": + + def obj_lbfgs(x: np.ndarray) -> float: + a, b, rho, m, sig = x + base = svi_huber_objective(x, k, w_obs, sw, reg_lambda) + # soft butterfly / wing penalty (same reg_lambda weight) + if reg_lambda > 0.0: + v = _butterfly_violation_terms(a, b, rho, m, sig) + base += reg_lambda * float(np.sum(v * v)) + return base + + sol = minimize( + obj_lbfgs, + x0, + method="L-BFGS-B", + bounds=list(zip(lo, hi)), + options={"ftol": 1e-12, "maxiter": 1500}, + ) + x = sol.x + params = SVIParams(float(x[0]), float(x[1]), float(x[2]), float(x[3]), float(x[4])) + return params, sol + + raise ValueError(f"Unknown method: {method}") + + +def fit_svi_surface( + df: pd.DataFrame, + *, + group_col: str = "expiration_date", + T_col: str = "T", + weight_col: Optional[str] = "svi_weight", + min_points: int = 5, + fit_kwargs: Optional[dict] = None, +) -> SVISurfaceFit: + """ + Fit one SVI slice per expiry (or other grouping column). + + Expects columns from :func:`prepare_svi_inputs`: ``log_moneyness``, ``total_var``. + """ + fit_kwargs = fit_kwargs or {} + need = {"log_moneyness", "total_var"} + missing = need - set(df.columns) + if missing: + raise KeyError(f"DataFrame missing columns {missing}; run prepare_svi_inputs first") + + slices: list[SVISliceFit] = [] + + # sort groups by average maturity (for convenience only; no calendar coupling here) + grouped: list[tuple[object, pd.DataFrame, float]] = [] + for key, g in df.groupby(group_col, sort=True): + g = g.sort_values("log_moneyness") + T_mean = float(g[T_col].mean()) if T_col in g.columns else float("nan") + grouped.append((key, g, T_mean)) + + grouped.sort(key=lambda tup: tup[2]) + + for key, g, T_mean in grouped: + k = g["log_moneyness"].to_numpy(dtype=np.float64) + w = g["total_var"].to_numpy(dtype=np.float64) + if len(g) < min_points: + continue + try: + slice_kwargs = dict(fit_kwargs) if fit_kwargs else {} + if weight_col and weight_col in g.columns: + slice_kwargs = {**slice_kwargs, "weights": g[weight_col].to_numpy(dtype=np.float64)} + params, raw = fit_svi_slice(k, w, **slice_kwargs) + if hasattr(raw, "success"): + success = bool(raw.success) # type: ignore[attr-defined] + msg = str(getattr(raw, "message", "")) + cost = float(getattr(raw, "cost", np.nan)) # type: ignore[arg-type] + else: + success = True + msg = "" + cost = float("nan") + except ValueError as e: + slices.append( + SVISliceFit( + params=SVIParams(0, 0, 0, 0, 1), + success=False, + cost=float("nan"), + message=str(e), + n_points=len(g), + T_mean=T_mean, + group_key=str(key), + ) + ) + continue + + slices.append( + SVISliceFit( + params=params, + success=success, + cost=cost, + message=msg, + n_points=len(g), + T_mean=T_mean, + group_key=str(key), + ) + ) + + meta = {"group_col": group_col, "n_slices_attempted": df[group_col].nunique()} + return SVISurfaceFit(slices=slices, meta=meta) + + +# --------------------------------------------------------------------------- +# Output tables +# --------------------------------------------------------------------------- + + +def surface_params_dataframe(fit: SVISurfaceFit) -> pd.DataFrame: + """Wide table of calibrated parameters per slice.""" + rows = [] + for s in fit.slices: + p = s.params + rows.append( + { + "group_key": s.group_key, + "T_mean": s.T_mean, + "n_points": s.n_points, + "success": s.success, + "cost": s.cost, + "a": p.a, + "b": p.b, + "rho": p.rho, + "m": p.m, + "sigma": p.sigma, + "message": s.message, + } + ) + return pd.DataFrame(rows) + + +# --------------------------------------------------------------------------- +# Parameter smoothing across maturity (for calendar consistency diagnostics) +# --------------------------------------------------------------------------- + + +@dataclass +class SmoothedSVICurves: + T_knots: np.ndarray + a_spline: "UnivariateSpline" + logb_spline: "UnivariateSpline" + u_spline: "UnivariateSpline" # atanh(rho) + m_spline: "UnivariateSpline" + logsig_spline: "UnivariateSpline" + + def params_at(self, T: np.ndarray | float) -> SVIParams | list[SVIParams]: + T_arr = np.asarray(T, dtype=np.float64).ravel() + a = self.a_spline(T_arr) + b = np.exp(self.logb_spline(T_arr)) + rho = np.tanh(self.u_spline(T_arr)) + m = self.m_spline(T_arr) + sigma = np.exp(self.logsig_spline(T_arr)) + if T_arr.size == 1: + # a, b, rho, m, sigma are 1D arrays here; index the single element + return SVIParams( + float(a[0]), + float(b[0]), + float(rho[0]), + float(m[0]), + float(sigma[0]), + ) + return [SVIParams(float(ai), float(bi), float(ri), float(mi), float(si)) for ai, bi, ri, mi, si in zip(a, b, rho, m, sigma)] + + def total_var(self, k: np.ndarray, T: np.ndarray | float) -> np.ndarray: + T_arr = np.asarray(T, dtype=np.float64) + if T_arr.ndim == 0: + p = self.params_at(float(T_arr)) + return p.total_var(np.asarray(k, dtype=np.float64)) + # broadcast over grid (T_i, k_j) + k_arr = np.asarray(k, dtype=np.float64).ravel() + out = np.empty((T_arr.size, k_arr.size), dtype=np.float64) + for i, Ti in enumerate(T_arr.ravel()): + p = self.params_at(float(Ti)) + out[i, :] = p.total_var(k_arr) + return out + + +def smooth_svi_parameters( + params_df: pd.DataFrame, + *, + T_col: str = "T_mean", + smooth_factor_a: float = 1e-4, + smooth_factor_m: float = 1e-4, + smooth_factor_others: float = 0.0, + min_T: float = 0.0, + weight_col: str = "n_points", +) -> SmoothedSVICurves: + """ + Phase 4–5 from the note: + + - take per-slice parameter table (T, a, b, rho, m, sigma) + - apply transformations (log b, atanh rho, log sigma) + - spline-smooth each vs T. + """ + from scipy.interpolate import UnivariateSpline + + df = params_df.copy() + df = df[df["success"]].copy() + df = df[np.isfinite(df[T_col])] + df = df[df[T_col] > min_T] + if df.empty: + raise ValueError("smooth_svi_parameters: no valid slices after filtering.") + + df = df.sort_values(T_col).drop_duplicates(T_col) + T = df[T_col].to_numpy(dtype=np.float64) + a = df["a"].to_numpy(dtype=np.float64) + b = df["b"].to_numpy(dtype=np.float64) + rho = df["rho"].to_numpy(dtype=np.float64) + m = df["m"].to_numpy(dtype=np.float64) + sigma = df["sigma"].to_numpy(dtype=np.float64) + + # transformed parameters + # normalize b and sigma by sqrt(T) to stabilize term-structure behaviour + sqrtT = np.sqrt(np.maximum(T, 1e-8)) + logb = np.log(np.maximum(b / sqrtT, 1e-8)) + u = np.arctanh(np.clip(rho, -0.999, 0.999)) + logsig = np.log(np.maximum(sigma / sqrtT, 1e-6)) + + w = df[weight_col].to_numpy(dtype=np.float64) if weight_col in df.columns else None + + # first-pass light smoothing per parameter + a_spl_1 = UnivariateSpline(T, a, w=w, s=smooth_factor_a) + logb_spl_1 = UnivariateSpline(T, logb, w=w, s=smooth_factor_others) + u_spl_1 = UnivariateSpline(T, u, w=w, s=smooth_factor_others) + m_spl_1 = UnivariateSpline(T, m, w=w, s=smooth_factor_m) + logsig_spl_1 = UnivariateSpline(T, logsig, w=w, s=smooth_factor_others) + + a_s = a_spl_1(T) + logb_s = logb_spl_1(T) + u_s = u_spl_1(T) + m_s = m_spl_1(T) + logsig_s = logsig_spl_1(T) + + # enforce monotone ATM total variance by adjusting a(T) only + atm_w_raw = [] + for i, (ai, lbi, ui, mi, lsi) in enumerate(zip(a_s, logb_s, u_s, m_s, logsig_s)): + Ti = float(T[i]) + scale = np.sqrt(max(1e-8, Ti)) + pi = SVIParams( + float(ai), + float(np.exp(lbi) * scale), + float(np.tanh(ui)), + float(mi), + float(np.exp(lsi) * scale), + ) + atm_w_raw.append(pi.total_var(np.array([0.0], dtype=np.float64))[0]) + atm_w_raw = np.asarray(atm_w_raw, dtype=np.float64) + atm_w_mono = np.maximum.accumulate(atm_w_raw) + delta_a = atm_w_mono - atm_w_raw + a_corr = a_s + delta_a + + # final splines built from corrected / smoothed arrays (with very small or zero extra smoothing) + a_spl = UnivariateSpline(T, a_corr, w=w, s=0.0) + logb_spl = UnivariateSpline(T, logb_s, w=w, s=0.0) + u_spl = UnivariateSpline(T, u_s, w=w, s=0.0) + m_spl = UnivariateSpline(T, m_s, w=w, s=0.0) + logsig_spl = UnivariateSpline(T, logsig_s, w=w, s=0.0) + + return SmoothedSVICurves( + T_knots=T, + a_spline=a_spl, + logb_spline=logb_spl, + u_spline=u_spl, + m_spline=m_spl, + logsig_spline=logsig_spl, + ) + + +def calendar_violation_matrix( + curves: SmoothedSVICurves, + T_grid: np.ndarray, + k_grid: np.ndarray, +) -> np.ndarray: + """ + Phase 6–7 diagnostic: + + On a (T, k) grid, compute w(T_{j+1}, k) - w(T_j, k). + Negative entries indicate calendar violations. + """ + T_grid = np.asarray(T_grid, dtype=np.float64).ravel() + k_grid = np.asarray(k_grid, dtype=np.float64).ravel() + if T_grid.size < 2: + raise ValueError("Need at least two maturities in T_grid for calendar diagnostics.") + w = curves.total_var(k_grid, T_grid) # shape (nT, nK) + diff = w[1:, :] - w[:-1, :] + return diff + + +def evaluate_surface_on_grid( + fit: SVISurfaceFit, + k_grid: np.ndarray, + *, + valid_only: bool = True, +) -> pd.DataFrame: + """ + Evaluate each successful slice on ``k_grid``. Returns long DataFrame: + ``group_key``, ``T_mean``, ``log_moneyness``, ``total_var_model``. + """ + k_grid = np.asarray(k_grid, dtype=np.float64).ravel() + parts = [] + for s in fit.slices: + if valid_only and not s.success: + continue + w = s.params.total_var(k_grid) + parts.append( + pd.DataFrame( + { + "group_key": s.group_key, + "T_mean": s.T_mean, + "log_moneyness": k_grid, + "total_var_model": w, + "iv_model": np.sqrt(np.maximum(w, 0) / np.maximum(s.T_mean, 1e-12)), + } + ) + ) + if not parts: + return pd.DataFrame( + columns=[ + "group_key", + "T_mean", + "log_moneyness", + "total_var_model", + "iv_model", + ] + ) + return pd.concat(parts, ignore_index=True) + + +# --------------------------------------------------------------------------- +# Plotting +# --------------------------------------------------------------------------- + + +def plot_svi_surface_fit( + df_prep: pd.DataFrame, + fit: SVISurfaceFit, + *, + group_col: str = "expiration_date", + n_grid: int = 120, + iv_space: bool = True, + figsize: tuple[float, float] = (10, 6), + save_path: Optional[str] = None, + show: bool = False, +) -> tuple[object, object]: + """ + Plot market total variance (or IV) vs k with SVI overlays per expiry. + + Parameters + ---------- + df_prep + Output of :func:`prepare_svi_inputs` (must include ``log_moneyness``, ``total_var``, ``T``). + """ + import matplotlib.pyplot as plt + + try: + cmap = plt.colormaps["viridis"] + except (AttributeError, KeyError): + from matplotlib import cm as _cm + + cmap = _cm.get_cmap("viridis") + + fig, ax = plt.subplots(figsize=figsize) + k_all = df_prep["log_moneyness"].to_numpy() + k_min, k_max = float(np.min(k_all)), float(np.max(k_all)) + pad = 0.05 * (k_max - k_min + 1e-6) + k_grid = np.linspace(k_min - pad, k_max + pad, n_grid) + + ok = [s for s in fit.slices if s.success] + if not ok: + ax.set_title("SVI surface fit (no successful slices)") + return fig, ax + + n_ok = max(len(ok), 1) + for i, s in enumerate(sorted(ok, key=lambda x: x.T_mean)): + color = cmap(i / max(n_ok - 1, 1)) if n_ok > 1 else cmap(0.5) + Tm = s.T_mean + if group_col in df_prep.columns: + sub = df_prep[df_prep[group_col].astype(str) == str(s.group_key)] + else: + sub = ( + df_prep[np.isclose(df_prep["T"], Tm, rtol=0.02, atol=1e-4)] + if "T" in df_prep.columns + else df_prep + ) + + k_m = sub["log_moneyness"].to_numpy() + w_m = sub["total_var"].to_numpy() + if iv_space: + iv_m = np.sqrt(np.maximum(w_m, 0) / np.maximum(Tm, 1e-12)) + ax.scatter(k_m, iv_m, s=18, alpha=0.6, color=color, marker="o", label=None) + wg = s.params.total_var(k_grid) + iv_g = np.sqrt(np.maximum(wg, 0) / np.maximum(Tm, 1e-12)) + ax.plot(k_grid, iv_g, color=color, lw=2, label=f"T≈{Tm:.3f} ({s.group_key})") + ax.set_ylabel("implied vol") + else: + ax.scatter(k_m, w_m, s=18, alpha=0.6, color=color, marker="o", label=None) + ax.plot(k_grid, s.params.total_var(k_grid), color=color, lw=2, label=f"T≈{Tm:.3f}") + ax.set_ylabel("total variance w") + + ax.set_xlabel("log moneyness log(K/F)") + ax.legend(loc="best", fontsize=8) + ax.set_title("SVI slices vs market") + ax.grid(True, alpha=0.3) + fig.tight_layout() + if save_path: + fig.savefig(save_path, bbox_inches="tight") + if show: + plt.show() + return fig, ax + + +def plot_residual_heatmap( + df_prep: pd.DataFrame, + fit: SVISurfaceFit, + *, + figsize: tuple[float, float] = (9, 4), + save_path: Optional[str] = None, + show: bool = False, +) -> tuple[object, object]: + """Simple heatmap of (w_model - w_market) / w_market by slice and moneyness bin.""" + import matplotlib.pyplot as plt + + rows = [] + for _, row in df_prep.iterrows(): + k = float(row["log_moneyness"]) + w = float(row["total_var"]) + Tm = float(row["T"]) if "T" in row.index else float("nan") + gkey = str(row.get("expiration_date", "")) + match = None + for s in fit.slices: + if s.success and (str(s.group_key) == gkey or np.isclose(s.T_mean, Tm, rtol=0.05, atol=1e-3)): + match = s + break + if match is None: + continue + w_hat = float(match.params.total_var(np.array([k]))[0]) + rel = (w_hat - w) / max(w, 1e-12) + rows.append({"T_mean": Tm, "k": k, "rel_err": rel, "group_key": gkey}) + + if not rows: + fig, ax = plt.subplots(figsize=figsize) + ax.set_title("No overlap for residual map") + return fig, ax + + rdf = pd.DataFrame(rows) + fig, ax = plt.subplots(figsize=figsize) + sc = ax.scatter(rdf["k"], rdf["T_mean"], c=rdf["rel_err"], cmap="coolwarm", s=35, vmin=-0.2, vmax=0.2) + fig.colorbar(sc, ax=ax, label="relative w error (model - mkt) / mkt") + ax.set_xlabel("log(K/F)") + ax.set_ylabel("T (years)") + ax.set_title("SVI relative variance residuals") + fig.tight_layout() + if save_path: + fig.savefig(save_path, bbox_inches="tight") + if show: + plt.show() + return fig, ax + + +# --------------------------------------------------------------------------- +# Finplot plotting (interactive) +# --------------------------------------------------------------------------- + + +def _rgba_to_hex(rgba: tuple[float, float, float, float]) -> str: + r, g, b, _a = rgba + return "#{:02x}{:02x}{:02x}".format(int(r * 255), int(g * 255), int(b * 255)) + + +def _maybe_import_finplot(): + try: + import finplot as fplt # type: ignore + + return fplt + except Exception: + return None + + +def plot_svi_surface_fit_finplot( + df_prep: pd.DataFrame, + fit: SVISurfaceFit, + *, + group_col: str = "expiration_date", + n_grid: int = 120, + iv_space: bool = True, + show: bool = True, + title: str = "SVI slices (finplot)", +): + """ + Interactive finplot rendering of per-expiry SVI curves. + + Note: finplot is primarily interactive; saving to PDF/PNG is not implemented here. + """ + fplt = _maybe_import_finplot() + if fplt is None: + raise ImportError("finplot is not installed. Use plot_backend='matplotlib' or install finplot.") + + import matplotlib.pyplot as plt + + k_all = df_prep["log_moneyness"].to_numpy(dtype=np.float64) + k_min, k_max = float(np.min(k_all)), float(np.max(k_all)) + pad = 0.05 * (k_max - k_min + 1e-6) + k_grid = np.linspace(k_min - pad, k_max + pad, n_grid) + + ok = [s for s in fit.slices if s.success] + if not ok: + ax = fplt.create_plot(title=title) + if hasattr(ax, "setTitle"): + ax.setTitle(title) + if show: + fplt.show() + return ax, None + + try: + cmap = plt.colormaps["viridis"] + except (AttributeError, KeyError): + cmap = plt.cm.get_cmap("viridis") + + # finplot uses an x/y scatter/plot model; feed numeric x arrays directly. + ax = fplt.create_plot(title=title, rows=1) + + n_ok = max(len(ok), 1) + for i, s in enumerate(sorted(ok, key=lambda x: x.T_mean)): + # pick a stable color per slice index + c = cmap(i / max(n_ok - 1, 1)) if n_ok > 1 else cmap(0.5) + color_hex = _rgba_to_hex(c) + Tm = s.T_mean + + if group_col in df_prep.columns: + sub = df_prep[df_prep[group_col].astype(str) == str(s.group_key)] + else: + # fallback to matching close maturities + sub = df_prep[ + np.isclose(df_prep["T"].to_numpy(dtype=np.float64), Tm, rtol=0.02, atol=1e-4) + ] + + if sub.empty: + continue + + k_m = sub["log_moneyness"].to_numpy(dtype=np.float64) + if iv_space: + y_m = np.sqrt(np.maximum(sub["total_var"].to_numpy(dtype=np.float64), 0) / max(Tm, 1e-12)) + y_g = np.sqrt( + np.maximum(s.params.total_var(k_grid), 0) / max(Tm, 1e-12) + ) + fplt.plot(k_m, y_m, ax=ax, style="o", color=color_hex, width=3) + try: + fplt.plot( + k_grid, + y_g, + ax=ax, + color=color_hex, + width=2, + legend=f"T≈{Tm:.3f}", + ) + except TypeError: + fplt.plot(k_grid, y_g, ax=ax, color=color_hex, width=2) + if hasattr(ax, "setLabel"): + ax.setLabel("left", "implied vol") + else: + y_m = sub["total_var"].to_numpy(dtype=np.float64) + y_g = s.params.total_var(k_grid) + fplt.plot(k_m, y_m, ax=ax, style="o", color=color_hex, width=3) + try: + fplt.plot( + k_grid, + y_g, + ax=ax, + color=color_hex, + width=2, + legend=f"T≈{Tm:.3f}", + ) + except TypeError: + fplt.plot(k_grid, y_g, ax=ax, color=color_hex, width=2) + if hasattr(ax, "setLabel"): + ax.setLabel("left", "total variance w") + + if hasattr(ax, "setLabel"): + ax.setLabel("bottom", "log moneyness log(K/F)") + if show: + fplt.show() + return ax, None + + +def plot_residual_heatmap_finplot( + df_prep: pd.DataFrame, + fit: SVISurfaceFit, + *, + show: bool = True, + title: str = "SVI residuals (finplot)", + vmin: float = -0.2, + vmax: float = 0.2, + n_bins: int = 11, + max_points: int = 8000, +): + """ + Interactive finplot residual visualization. + + Creates a color-binned scatter of relative w error (model - mkt) / mkt. + """ + fplt = _maybe_import_finplot() + if fplt is None: + raise ImportError("finplot is not installed. Use plot_backend='matplotlib' or install finplot.") + + import matplotlib.pyplot as plt + + rows = [] + for _, row in df_prep.iterrows(): + k = float(row["log_moneyness"]) + w = float(row["total_var"]) + if not np.isfinite(k) or not np.isfinite(w) or w <= 0: + continue + Tm = float(row["T"]) if "T" in row.index else float("nan") + gkey = str(row.get("expiration_date", "")) + match = None + for s in fit.slices: + if s.success and (str(s.group_key) == gkey or np.isclose(s.T_mean, Tm, rtol=0.05, atol=1e-3)): + match = s + break + if match is None: + continue + w_hat = float(match.params.total_var(np.array([k], dtype=np.float64))[0]) + rel = (w_hat - w) / max(w, 1e-12) + rows.append((k, Tm, rel)) + + if not rows: + ax = fplt.create_plot(title=title) + if show: + fplt.show() + return ax, None + + # optional downsampling for interactivity + if len(rows) > max_points: + rows = rows[:: int(np.ceil(len(rows) / max_points))] + + xs = np.array([r[0] for r in rows], dtype=np.float64) + ys = np.array([r[1] for r in rows], dtype=np.float64) + zs = np.array([r[2] for r in rows], dtype=np.float64) + + try: + cmap = plt.colormaps["coolwarm"] + except (AttributeError, KeyError): + cmap = plt.cm.get_cmap("coolwarm") + + bins = np.linspace(vmin, vmax, n_bins + 1) + b_idx = np.digitize(zs, bins) - 1 + + ax = fplt.create_plot(title=title, rows=1) + if hasattr(ax, "setLabel"): + ax.setLabel("bottom", "log(K/F)") + ax.setLabel("left", "T (years)") + + for bi in range(n_bins): + msk = b_idx == bi + if not np.any(msk): + continue + z_mid = (bins[bi] + bins[bi + 1]) / 2.0 + # map z_mid into [0,1] + t = (z_mid - vmin) / max(vmax - vmin, 1e-12) + rgba = cmap(np.clip(t, 0.0, 1.0)) + color_hex = _rgba_to_hex(rgba) + fplt.plot(xs[msk], ys[msk], ax=ax, style="o", color=color_hex, width=4) + + if show: + fplt.show() + return ax, None + + +# --------------------------------------------------------------------------- +# Pipeline entrypoint for load_data-style frames +# --------------------------------------------------------------------------- + + +def calibrate_from_option_frame( + option_quotes_contracts: pd.DataFrame, + *, + r: float = 0.05, + group_col: str = "expiration_date", + fit_method: str = "least_squares", + plot: bool = True, + plot_backend: str = "matplotlib", + plot_path: Optional[str] = "svi_surface_fit.pdf", + residual_path: Optional[str] = "svi_residuals.pdf", + finplot_show: bool = True, +) -> tuple[pd.DataFrame, SVISurfaceFit, pd.DataFrame]: + """ + Full pipeline: prepare inputs, fit SVI surface, return (prepared_df, fit, params_table). + + ``option_quotes_contracts`` should be the merged quotes frame after + :func:`compute_iv` (columns ``spot``, ``strike``, ``T``, ``iv``, ``expiration_date``, …). + """ + prep = prepare_svi_inputs(option_quotes_contracts, r=r) + fit = fit_svi_surface( + prep, + group_col=group_col, + fit_kwargs={"method": fit_method, "reg_lambda": 0.01}, + ) + params_df = surface_params_dataframe(fit) + if plot and len(fit.slices) > 0: + backend = plot_backend.lower().strip() + fplt = _maybe_import_finplot() + use_finplot = backend in {"finplot", "auto"} and fplt is not None + if backend in {"finplot"} and fplt is None: + raise ImportError("plot_backend='finplot' requested but finplot is not installed.") + + if use_finplot: + # interactive (finplot) + plot_svi_surface_fit_finplot( + prep, fit, group_col=group_col, show=finplot_show + ) + plot_residual_heatmap_finplot(prep, fit, show=finplot_show) + + # still save PDFs if requested (matplotlib backend) + if plot_path: + plot_svi_surface_fit(prep, fit, group_col=group_col, save_path=plot_path, show=False) + if residual_path: + plot_residual_heatmap(prep, fit, save_path=residual_path, show=False) + else: + # file-based (matplotlib) + plot_svi_surface_fit(prep, fit, group_col=group_col, save_path=plot_path, show=False) + plot_residual_heatmap(prep, fit, save_path=residual_path, show=False) + return prep, fit, params_df + diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/load_data.py b/src/data/load_data.py new file mode 100644 index 0000000..9f04dfe --- /dev/null +++ b/src/data/load_data.py @@ -0,0 +1,436 @@ +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +from option_pricing.src.data.ingestion.db_connect import db_engine +from option_pricing.src.ImpliedVolatility.compute_vls import implied_vol + + +def _normalize_quote_timestamp(df: pd.DataFrame) -> pd.DataFrame: + if "timestamp" not in df.columns and "quote_timestamp" in df.columns: + return df.rename(columns={"quote_timestamp": "timestamp"}) + return df + + +def _normalize_price_timestamp(df: pd.DataFrame) -> pd.DataFrame: + if "timestamp" not in df.columns and "price_timestamp" in df.columns: + return df.rename(columns={"price_timestamp": "timestamp"}) + return df + + +def load_data(): + engine = db_engine() + underlyings = pd.read_sql("SELECT * FROM underlyings;", engine) + underlying_prices = _normalize_price_timestamp( + pd.read_sql("SELECT * FROM underlying_prices;", engine) + ) + option_quotes = _normalize_quote_timestamp(pd.read_sql("SELECT * FROM option_quotes;", engine)) + option_contracts = pd.read_sql("SELECT * FROM option_contracts;", engine) + return underlyings, underlying_prices, option_quotes, option_contracts + + +def clean_data(data: pd.DataFrame): + data.dropna(inplace=True) + data = data[data["volume"] > 0] + data = data[data["open_interest"] > 10] + data["spread"] = data["ask"] - data["bid"] + #data = data[data["spread"] / data["mid"] < 1] + return data + + +def merge_quotes_contracts(option_quotes: pd.DataFrame, option_contracts: pd.DataFrame): + if "timestamp" not in option_quotes.columns: + raise KeyError("option_quotes needs a quote time column ('timestamp' or 'quote_timestamp')") + + option_quotes = option_quotes.groupby(["contract_id", "timestamp"], as_index=False).agg( + { + "bid": "mean", + "ask": "mean", + "mid": "mean", + "last_price": "mean", + "implied_vol": "mean", + "volume": "sum", + "open_interest": "sum", + } + ) + option_quotes = option_quotes.merge( + option_contracts, left_on="contract_id", right_on="id", how="left" + ) + option_quotes["timestamp"] = pd.to_datetime(option_quotes["timestamp"]) + option_quotes["expiration_date"] = pd.to_datetime(option_quotes["expiration_date"]) + option_quotes["T"] = ( + option_quotes["expiration_date"] - option_quotes["timestamp"] + ).dt.total_seconds() / (365 * 24 * 3600) + return option_quotes + + +def compute_iv(option_quotes_contracts, underlying_prices): + df = option_quotes_contracts.copy() + up = _normalize_price_timestamp(underlying_prices.copy()) + + up["timestamp"] = pd.to_datetime(up["timestamp"]) + up = up.sort_values("timestamp").drop_duplicates( + ["underlying_id", "timestamp"], keep="last" + ) + + mask = df["T"] > 0 + if not mask.any(): + df["iv"] = np.nan + return df + + sub = df.loc[mask].copy() + sub["_idx"] = sub.index + + merged = sub.merge( + up[["underlying_id", "timestamp", "price"]], + on=["underlying_id", "timestamp"], + how="left", + validate="many_to_one", + ) + + # assign back using explicit index + df["spot"] = np.nan + df.loc[merged["_idx"], "spot"] = merged["price"].to_numpy() + + price = merged["mid"].to_numpy(dtype=np.float64) + S = merged["price"].to_numpy(dtype=np.float64) + K = merged["strike"].to_numpy(dtype=np.float64) + T = merged["T"].to_numpy(dtype=np.float64) + call = (merged["option_type"] == "call").to_numpy() + + + df["iv"] = np.nan + df.loc[sub.index, "iv"] = implied_vol(price, S, K, T, 0.05, call) + return df + +def fit_ivsimle(option_quotes_contracts): + from scipy.interpolate import UnivariateSpline + sort = option_quotes_contracts.sort_values("log_moneyness").dropna() + x = sort["log_moneyness"] + y = sort["iv"] + y_yahoo = sort["implied_vol"] + print(x,y,y_yahoo) + f = UnivariateSpline(x, y, s=None) + f_yahoo = UnivariateSpline(x, y_yahoo, s=None) + # plot the smile + x_lin = np.linspace(x.min(), x.max(), 200) + plt.plot(x_lin, f(x_lin), label="iv smile") + plt.plot(x_lin, f_yahoo(x_lin), label="yahoo iv smile") + plt.legend() + plt.savefig("iv_smile_fit.pdf") + + + return f + +def calibrate_svi_surface(option_quotes_contracts: pd.DataFrame, r: float = 0.05, **kwargs): + """ + Fit SVI per expiry on ``iv`` from :func:`compute_iv` and plot diagnostics. + + See :func:`option_pricing.src.ImpliedVolatility.svi.calibrate_from_option_frame`. + """ + from option_pricing.src.ImpliedVolatility.svi import calibrate_from_option_frame + + return calibrate_from_option_frame(option_quotes_contracts, r=r, **kwargs) + +def clean_before_svi(option_quotes_contracts: pd.DataFrame): + option_quotes_contracts = option_quotes_contracts[option_quotes_contracts["T"] > 0.05] + return option_quotes_contracts + + +def plot_smoothed_svi_surface(prep: pd.DataFrame, params: pd.DataFrame, r: float = 0.05): + """ + Plot independent slice fits after maturity smoothing. + + Outputs: + - svi_smoothed_surface.pdf + - svi_calendar_violation_heatmap.pdf + """ + from option_pricing.src.ImpliedVolatility.svi import ( + calendar_violation_matrix, + smooth_svi_parameters, + ) + + # Build smoothed maturity-parameter curves from calibrated slice parameters + curves = smooth_svi_parameters( + params, + T_col="T_mean", + smooth_factor_a=1e-4, + smooth_factor_m=1e-4, + smooth_factor_others=0.0, + min_T=0.05, + weight_col="n_points", + ) + + # Overlay market points and smoothed model by maturity + plot_df = prep.copy() + if "T" not in plot_df.columns or "total_var" not in plot_df.columns: + raise KeyError("prep must include columns 'T' and 'total_var'") + + T_grid = np.sort(params.loc[params["success"], "T_mean"].to_numpy(dtype=np.float64)) + if T_grid.size < 2: + return + k_grid = np.linspace( + float(plot_df["log_moneyness"].quantile(0.02)), + float(plot_df["log_moneyness"].quantile(0.98)), + 180, + ) + + plt.figure(figsize=(11, 7)) + cmap = plt.colormaps["viridis"] + nT = max(len(T_grid), 1) + for i, Ti in enumerate(T_grid): + color = cmap(i / max(nT - 1, 1)) if nT > 1 else cmap(0.5) + near = np.isclose(plot_df["T"].to_numpy(dtype=np.float64), Ti, rtol=0.03, atol=2e-3) + sub = plot_df.loc[near] + if sub.empty: + continue + # market IV points + iv_mkt = np.sqrt( + np.maximum(sub["total_var"].to_numpy(dtype=np.float64), 0.0) + / np.maximum(Ti, 1e-12) + ) + plt.scatter( + sub["log_moneyness"].to_numpy(dtype=np.float64), + iv_mkt, + s=10, + alpha=0.35, + color=color, + ) + # smoothed curve IV + w_model = curves.total_var(k_grid, np.array([Ti], dtype=np.float64))[0] + iv_model = np.sqrt(np.maximum(w_model, 0.0) / np.maximum(Ti, 1e-12)) + plt.plot(k_grid, iv_model, color=color, lw=2, label=f"T={Ti:.3f}") + + plt.xlabel("log moneyness log(K/F)") + plt.ylabel("implied vol") + plt.title("SVI surface: market points vs smoothed maturity curves") + plt.grid(alpha=0.3) + plt.legend(fontsize=8, ncol=2) + plt.tight_layout() + plt.savefig("svi_smoothed_surface.pdf", bbox_inches="tight") + plt.clf() + + # Calendar diagnostics from smoothed surface + cal_diff = calendar_violation_matrix(curves, T_grid, k_grid) + # diff shape: (len(T_grid)-1, len(k_grid)) where negative is violation + plt.figure(figsize=(11, 4)) + im = plt.imshow( + cal_diff, + aspect="auto", + origin="lower", + cmap="coolwarm", + vmin=-0.02, + vmax=0.02, + extent=[k_grid.min(), k_grid.max(), 0, cal_diff.shape[0]], + ) + plt.colorbar(im, label="w(T_{j+1},k)-w(T_j,k)") + plt.xlabel("log moneyness") + plt.ylabel("maturity step j") + plt.title("Calendar diagnostic heatmap (negative = violation)") + plt.tight_layout() + plt.savefig("svi_calendar_violation_heatmap.pdf", bbox_inches="tight") + plt.clf() + + +def _fit_slice_with_svi_py_model( + model: object, + model_name: str, + k: np.ndarray, + w: np.ndarray, + T: float, + *, + theta_ref: float, + prev_params: dict | None, + k_eval: np.ndarray, +) -> tuple[np.ndarray, dict]: + """Fit one slice with a specific pysvi model and evaluate total variance on k_eval.""" + T = float(T) + k = np.asarray(k, dtype=np.float64) + w = np.asarray(w, dtype=np.float64) + k_eval = np.asarray(k_eval, dtype=np.float64) + + # ATM total variance proxy for models requiring theta + theta = float(np.interp(0.0, np.sort(k), w[np.argsort(k)])) + theta = max(theta, 1e-8) + + kwargs: dict = {} + if model_name == "ssvi": + kwargs["theta"] = theta + elif model_name == "essvi": + kwargs["theta"] = theta + kwargs["theta_ref"] = max(float(theta_ref), 1e-8) + elif model_name in {"jumpwings", "jw"}: + kwargs["T"] = max(T, 1e-8) + + # Option B: calendar penalty uses pysvi internal 200-point grid per current slice. + # Build w_prev on that exact grid to avoid shape mismatch. + if prev_params is not None: + k_cal = np.linspace(float(k.min()) - 0.5, float(k.max()) + 0.5, 200) + kwargs["w_prev"] = np.asarray(model.total_variance(k_cal, prev_params), dtype=np.float64) + + params = model.calibrate(k, w, **kwargs) + if params is None: + raise RuntimeError(f"pysvi {model_name} calibration failed") + w_eval = model.total_variance(k_eval, params) + return np.asarray(w_eval, dtype=np.float64), params + + +def compare_vs_svi_py(prep: pd.DataFrame, params: pd.DataFrame): + """ + Compare in-house SVI fit against pysvi models with explicit no-arbitrage flags. + + Outputs: + - svi_vs_pysvi__comparison.pdf for model in {svi, ssvi, essvi, jumpwings} + - svi_vs_pysvi_metrics.csv + """ + from option_pricing.src.ImpliedVolatility.svi import SVIParams + from pysvi import ArbitrageFreedom, get_model + + ok_params = params[params["success"]].copy() + if ok_params.empty: + print("compare_vs_svi_py: no successful in-house slices; skipping.") + return + + k_min = float(prep["log_moneyness"].quantile(0.02)) + k_max = float(prep["log_moneyness"].quantile(0.98)) + k_grid = np.linspace(k_min, k_max, 180) + + models = ["svi", "ssvi", "essvi", "jumpwings"] + rows: list[dict] = [] + + # reference theta for eSSVI from in-house successful slices + theta_ref = float(np.median(ok_params["T_mean"].to_numpy(dtype=np.float64) * 0 + 1.0)) + # Better theta_ref proxy from observed market ATM if available + theta_vals = [] + for _, row in ok_params.iterrows(): + Ti = float(row["T_mean"]) + near = np.isclose(prep["T"].to_numpy(dtype=np.float64), Ti, rtol=0.03, atol=2e-3) + sub = prep.loc[near].sort_values("log_moneyness") + if len(sub) < 10: + continue + ks = sub["log_moneyness"].to_numpy(dtype=np.float64) + ws = sub["total_var"].to_numpy(dtype=np.float64) + theta_vals.append(float(np.interp(0.0, np.sort(ks), ws[np.argsort(ks)]))) + if theta_vals: + theta_ref = float(np.median(theta_vals)) + + sorted_rows = list(ok_params.sort_values("T_mean").iterrows()) + for model_name in models: + flags = ArbitrageFreedom.NO_BUTTERFLY | ArbitrageFreedom.NO_CALENDAR + model = get_model(model_name, flags) + plt.figure(figsize=(11, 7)) + cmap = plt.colormaps["tab20"] + prev_params = None + n_used = 0 + for _, row in sorted_rows: + Ti = float(row["T_mean"]) + near = np.isclose(prep["T"].to_numpy(dtype=np.float64), Ti, rtol=0.03, atol=2e-3) + sub = prep.loc[near].sort_values("log_moneyness") + if len(sub) < 10: + continue + k = sub["log_moneyness"].to_numpy(dtype=np.float64) + w = sub["total_var"].to_numpy(dtype=np.float64) + + p_ours = SVIParams( + float(row["a"]), float(row["b"]), float(row["rho"]), float(row["m"]), float(row["sigma"]) + ) + w_ours = p_ours.total_var(k_grid) + rmse_ours = float(np.sqrt(np.mean((p_ours.total_var(k) - w) ** 2))) + + try: + w_ext, ext_params = _fit_slice_with_svi_py_model( + model, + model_name, + k, + w, + Ti, + theta_ref=theta_ref, + prev_params=prev_params, + k_eval=k_grid, + ) + rmse_ext = float(np.sqrt(np.mean((np.interp(k, k_grid, w_ext) - w) ** 2))) + rows.append( + { + "model": model_name, + "T_mean": Ti, + "rmse_ours": rmse_ours, + "rmse_pysvi": rmse_ext, + "delta_rmse": rmse_ext - rmse_ours, + "ext_params": str(ext_params), + } + ) + color = cmap(n_used % 20) + n_used += 1 + plt.plot(k_grid, np.sqrt(np.maximum(w_ours, 0) / max(Ti, 1e-12)), color=color, lw=2, alpha=0.9) + plt.plot(k_grid, np.sqrt(np.maximum(w_ext, 0) / max(Ti, 1e-12)), color=color, lw=1.5, ls="--", alpha=0.9) + prev_params = ext_params + except Exception as exc: + print(f"compare_vs_svi_py[{model_name}]: skipping T={Ti:.4f}, reason: {exc}") + continue + + if n_used == 0: + plt.close() + continue + + plt.xlabel("log moneyness") + plt.ylabel("implied vol") + plt.title(f"In-house SVI (solid) vs pysvi {model_name} (dashed)") + plt.grid(alpha=0.3) + plt.tight_layout() + plt.savefig(f"svi_vs_pysvi_{model_name}_comparison.pdf", bbox_inches="tight") + plt.clf() + + out = pd.DataFrame(rows) + if out.empty: + print("compare_vs_svi_py: no slices compared (pysvi unavailable or incompatible).") + return + out = out.sort_values(["model", "T_mean"]) + out.to_csv("svi_vs_pysvi_metrics.csv", index=False) + print(out.groupby("model")[["rmse_ours", "rmse_pysvi", "delta_rmse"]].mean()) + + +def plot_ivsmile(option_quotes_contracts): + option_quotes_contracts = option_quotes_contracts.sort_values("strike") + option_quotes_contracts["log_moneyness"] = np.log( + option_quotes_contracts["spot"] * np.exp(0.05 * option_quotes_contracts["T"])/option_quotes_contracts["strike"] + ) + option_quotes_contracts = option_quotes_contracts[option_quotes_contracts["log_moneyness"].abs() < 0.2] + #option_quotes_contracts = option_quotes_contracts[option_quotes_contracts["mid"] > 0.2] + plt.plot(option_quotes_contracts["strike"], option_quotes_contracts["iv"], label="iv smile") + plt.plot(option_quotes_contracts["strike"], option_quotes_contracts["implied_vol"], label="i. vol") + plt.legend() + plt.savefig("iv_smile.pdf") + plt.xlabel("iv") + plt.ylabel("strike price") + plt.clf() + return option_quotes_contracts + + + + + +if __name__ == "__main__": + underlyings, underlying_prices, option_quotes, option_contracts = load_data() + option_quotes_contracts = merge_quotes_contracts(option_quotes, option_contracts) + option_quotes_contracts = clean_data(option_quotes_contracts) + option_quotes_contracts = compute_iv(option_quotes_contracts, underlying_prices) + mask = option_quotes_contracts["iv"].notna() + print(option_quotes_contracts) + print(option_quotes_contracts.columns) + #plt.plot(option_quotes_contracts["contract_id"][mask], option_quotes_contracts["implied_vol"][mask], label="i. iv") + #plt.plot(option_quotes_contracts["contract_id"][mask],option_quotes_contracts["iv"][mask], label="comp. iv") + #plt.legend() + #plt.show() + option_quotes_contracts = plot_ivsmile(option_quotes_contracts) + fit_ivsimle(option_quotes_contracts) + prep, svi_fit, params = calibrate_svi_surface( + clean_before_svi(option_quotes_contracts), + r=0.05, + plot_backend="matplotlib", + finplot_show=True, + # optionally: plot_path=None to avoid matplotlib PDF output + ) + print(svi_fit) + plot_smoothed_svi_surface(prep, params, r=0.05) + compare_vs_svi_py(prep, params) +