Centralize DB settings in ingestion config, remove embedded secrets from ingestion helpers, and add an idempotent PostgreSQL bootstrap script to create role/database and apply schema safely. Made-with: Cursor
109 lines
3.3 KiB
Python
109 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Idempotent PostgreSQL bootstrap script for the option_pricing project.
|
|
|
|
What it does:
|
|
1) Creates the project role if it does not exist.
|
|
2) Creates the project database if it does not exist.
|
|
3) Grants ownership/privileges.
|
|
4) Applies src/data/sql/schema.sql to the project database.
|
|
|
|
Configuration comes from environment variables (see .env.example).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
from psycopg2 import sql
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SCHEMA_PATH = ROOT / "src" / "data" / "sql" / "schema.sql"
|
|
|
|
|
|
def _env(name: str, default: str | None = None) -> str:
|
|
value = os.getenv(name, default)
|
|
if value is None:
|
|
raise RuntimeError(f"Missing required environment variable: {name}")
|
|
return value
|
|
|
|
|
|
def admin_connect(dbname: str):
|
|
return psycopg2.connect(
|
|
dbname=dbname,
|
|
user=_env("POSTGRES_ADMIN_USER", "postgres"),
|
|
password=_env("POSTGRES_ADMIN_PASSWORD", "postgres"),
|
|
host=_env("POSTGRES_ADMIN_HOST", "localhost"),
|
|
port=_env("POSTGRES_ADMIN_PORT", "5432"),
|
|
)
|
|
|
|
|
|
def ensure_role_and_database() -> None:
|
|
db_user = _env("DB_USER", "quant_user")
|
|
db_password = _env("DB_PASSWORD", "")
|
|
db_name = _env("DB_NAME", "options_db")
|
|
|
|
admin_db = _env("POSTGRES_ADMIN_DB", "postgres")
|
|
with admin_connect(admin_db) as conn:
|
|
conn.autocommit = True
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT 1 FROM pg_roles WHERE rolname = %s", (db_user,))
|
|
role_exists = cur.fetchone() is not None
|
|
if not role_exists:
|
|
cur.execute(
|
|
sql.SQL("CREATE ROLE {} WITH LOGIN PASSWORD %s").format(
|
|
sql.Identifier(db_user)
|
|
),
|
|
(db_password,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
sql.SQL("ALTER ROLE {} WITH LOGIN PASSWORD %s").format(
|
|
sql.Identifier(db_user)
|
|
),
|
|
(db_password,),
|
|
)
|
|
|
|
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db_name,))
|
|
db_exists = cur.fetchone() is not None
|
|
if not db_exists:
|
|
cur.execute(
|
|
sql.SQL("CREATE DATABASE {} OWNER {}").format(
|
|
sql.Identifier(db_name),
|
|
sql.Identifier(db_user),
|
|
)
|
|
)
|
|
else:
|
|
cur.execute(
|
|
sql.SQL("ALTER DATABASE {} OWNER TO {}").format(
|
|
sql.Identifier(db_name),
|
|
sql.Identifier(db_user),
|
|
)
|
|
)
|
|
|
|
|
|
def apply_schema() -> None:
|
|
if not SCHEMA_PATH.exists():
|
|
raise FileNotFoundError(f"Schema file not found: {SCHEMA_PATH}")
|
|
|
|
schema_sql = SCHEMA_PATH.read_text(encoding="utf-8")
|
|
with admin_connect(_env("DB_NAME", "options_db")) as conn:
|
|
conn.autocommit = True
|
|
with conn.cursor() as cur:
|
|
cur.execute(schema_sql)
|
|
|
|
|
|
def main() -> None:
|
|
print("Ensuring role/database exist...")
|
|
ensure_role_and_database()
|
|
print("Applying schema...")
|
|
apply_schema()
|
|
print("Database setup complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|