diff --git a/pages/ETF_Analyzer.py b/pages/ETF_Analyzer.py
index 5a59781..e50d523 100644
--- a/pages/ETF_Analyzer.py
+++ b/pages/ETF_Analyzer.py
@@ -1,3 +1,11 @@
+# Set page config first, before any other Streamlit commands
+st.set_page_config(
+ page_title="ETF Analyzer",
+ page_icon="📊",
+ layout="wide",
+ initial_sidebar_state="expanded"
+)
+
"""
ETF Analyzer - Comprehensive ETF Analysis Tool
@@ -21,7 +29,46 @@ import time
from typing import Dict, List, Tuple, Any, Optional, Union
import sys
import yfinance as yf
+from dotenv import load_dotenv
+import logging
+# Load environment variables
+load_dotenv()
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+# FMP API configuration
+FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', ''))
+FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
+
+def test_fmp_connection():
+ """Test the FMP API connection and display status."""
+ try:
+ if not FMP_API_KEY:
+ return False, "No API key found"
+
+ session = get_fmp_session()
+ test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}"
+ response = session.get(test_url)
+
+ if response.status_code == 200:
+ data = response.json()
+ if data and isinstance(data, list) and len(data) > 0:
+ return True, "Connected"
+ return False, f"Error: {response.status_code}"
+ except Exception as e:
+ return False, f"Error: {str(e)}"
+
+# Add FMP connection status to the navigation bar
+st.sidebar.markdown("---")
+st.sidebar.subheader("FMP API Status")
+connection_status, message = test_fmp_connection()
+if connection_status:
+ st.sidebar.success(f"✅ FMP API: {message}")
+else:
+ st.sidebar.error(f"❌ FMP API: {message}")
# --- Constants and Settings ---
CACHE_DIR = Path("cache")
diff --git a/pages/ETF_Portfolio_Builder.py b/pages/ETF_Portfolio_Builder.py
index f452012..52647fa 100644
--- a/pages/ETF_Portfolio_Builder.py
+++ b/pages/ETF_Portfolio_Builder.py
@@ -1,2527 +1,782 @@
import streamlit as st
-st.set_page_config(
- page_title="ETF Dividend Portfolio Builder",
- page_icon="💼",
- layout="wide",
- initial_sidebar_state="expanded"
-)
-
-# Add navigation in sidebar
-with st.sidebar:
- st.markdown("### Navigation")
- if st.button("🏠 ETF Suite Launcher", key="launcher_portfolio"):
- st.switch_page("ETF_Suite_Launcher.py")
- if st.button("📈 ETF Analyzer", key="analyzer_portfolio"):
- st.switch_page("ETF_Analyzer.py")
-
-# --- Imports & Settings ---
import pandas as pd
+import numpy as np
import plotly.express as px
import plotly.graph_objects as go
-import yfinance as yf
-import time
-import re
-from typing import Tuple, List, Dict, Any, Optional, TypeVar, Callable
-from io import StringIO, BytesIO
-from openai import OpenAI
-from functools import lru_cache
-from datetime import datetime, timezone, timedelta
-import asyncio
-import concurrent.futures
-import pdfkit
-import base64
-import tempfile
-import os
-import requests
-import json
-import hashlib
from pathlib import Path
-import numpy as np
+import json
+from datetime import datetime
+from typing import List, Dict, Tuple, Optional, Any, Callable, T
+import time
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import yfinance as yf
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+import os
+import sys
+import logging
+import traceback
+from dotenv import load_dotenv
-# --- Settings ---
-REQUESTS_PER_MINUTE = 5 # yfinance rate limit
-MAX_TICKERS = 10
-RETRY_ATTEMPTS = 5
-RETRY_DELAY = 2 # Seconds between retries
-MAX_YIELD_THRESHOLD = 50 # Warn if yield exceeds 50%
-MAX_WORKERS = 5 # Maximum number of parallel workers for API requests
-DRIP_FORECAST_MONTHS = 12 # Number of months to forecast DRIP compounding
-USE_FMP_API = True # Whether to use FMP API as an additional data source
-CACHE_EXPIRATION_DAYS = 7 # Number of days before cache expires
+# Load environment variables
+load_dotenv()
-# --- Cache Setup ---
-def setup_cache_dir():
- """Set up cache directory if it doesn't exist"""
- cache_dir = Path("cache")
- cache_dir.mkdir(exist_ok=True)
- return cache_dir
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
-CACHE_DIR = setup_cache_dir()
+# FMP API configuration
+FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', ''))
+FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
-# Expected yield ranges for validation (based on 2024 data)
-EXPECTED_YIELDS = {
- "JEPI": (7, 9), # 7-9%
- "FEPI": (10, 12), # 10-12%
- "CONY": (20, 30), # 20-30%
- "SMCY": (20, 30),
- "ULTY": (20, 30),
- "MSTY": (20, 30)
-}
+def get_fmp_session():
+ """Create a session with retry logic for FMP API calls."""
+ session = requests.Session()
+ retries = Retry(total=3, backoff_factor=0.5)
+ session.mount('https://', HTTPAdapter(max_retries=retries))
+ return session
-# Reference dictionary with accurate ETF yields (2024 data)
-ETF_REFERENCE_DB = {
- "SPY": 1.35, "VOO": 1.40, "VTI": 1.34, "QQQ": 0.70,
- "SCHD": 3.50, "VYM": 2.95, "HDV": 3.80, "SPYD": 4.20,
- "SPHD": 4.65, "SDIV": 8.30, "JEPI": 7.80, "DGRO": 2.15,
- "VIG": 1.85, "BND": 2.80, "AGG": 2.65, "TLT": 3.10,
- "GLD": 0.00, "VNQ": 3.90, "XLF": 1.75, "XLV": 1.40, "XLE": 3.20,
- "PFF": 6.20, "SDY": 2.65, "DVY": 3.95, "IDV": 5.10, "NOBL": 2.30,
- "DGRW": 1.90, "DIV": 6.80, "VGT": 0.70, "VDC": 2.40
-}
-
-# High-yield ETFs that benefit from FMP API verification (2024 data)
-HIGH_YIELD_ETFS = {
- "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"},
- "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"},
- "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"},
- "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"},
- "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"},
- "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"},
- "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"},
- "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
- "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
- "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"}
-}
-
-# --- Helper Functions ---
-T = TypeVar('T')
-
-def generate_cache_key(source: str, ticker: str, endpoint: str = None) -> str:
- """Generate a unique cache key for a data request.
+def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]:
+ """
+ Fetch ETF data from Financial Modeling Prep API.
Args:
- source: Data source (e.g., 'yfinance', 'fmp')
- ticker: Ticker symbol
- endpoint: API endpoint or data type
+ ticker: ETF ticker symbol
Returns:
- A string hash key
+ Dictionary with ETF data or None if failed
"""
- components = [source, ticker.upper()]
- if endpoint:
- components.append(endpoint)
- key_string = "_".join(components)
- return hashlib.md5(key_string.encode()).hexdigest()
-
-def get_cache_path(cache_key: str) -> Path:
- """Get the file path for a cache key."""
- return CACHE_DIR / f"{cache_key}.json"
-
-def save_to_cache(cache_key: str, data: Any) -> None:
- """Save data to cache with timestamp."""
- cache_file = get_cache_path(cache_key)
-
- # Process data to ensure it's JSON serializable
- processed_data = convert_to_serializable(data)
-
- cache_data = {
- "data": processed_data,
- "timestamp": datetime.now().isoformat()
- }
-
try:
- with open(cache_file, 'w') as f:
- json.dump(cache_data, f)
- except Exception as e:
- print(f"Error saving to cache: {str(e)}")
- # If caching fails, we continue without raising an exception
- # This allows the app to work even if caching doesn't
-
-def convert_to_serializable(obj: Any) -> Any:
- """Convert an object to a JSON serializable format."""
- if obj is None:
- return None
-
- # Handle pandas Series
- if isinstance(obj, pd.Series):
- try:
- # Handle special case of pandas Series with non-serializable index
- return {
- "__pandas_series__": True,
- "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index),
- "values": obj.values.tolist() if hasattr(obj.values, 'tolist') else list(obj.values),
- "name": obj.name
- }
- except Exception as e:
- # If all else fails, convert to list
- return list(obj)
-
- # Handle pandas DataFrame
- elif isinstance(obj, pd.DataFrame):
- try:
- return {
- "__pandas_dataframe__": True,
- "columns": obj.columns.tolist() if hasattr(obj.columns, 'tolist') else list(obj.columns),
- "data": obj.values.tolist() if hasattr(obj.values, 'tolist') else obj.values.tolist(),
- "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index)
- }
- except Exception as e:
- # Fall back to records format
- return obj.to_dict(orient='records')
-
- # Handle numpy arrays
- elif isinstance(obj, np.ndarray):
- try:
- return obj.tolist()
- except Exception as e:
- return list(obj)
-
- # Handle dictionaries with non-serializable values
- elif isinstance(obj, dict):
- return {str(k): convert_to_serializable(v) for k, v in obj.items()}
-
- # Handle lists with non-serializable items
- elif isinstance(obj, (list, tuple)):
- return [convert_to_serializable(item) for item in obj]
-
- # Handle datetime objects
- elif isinstance(obj, datetime):
- return obj.isoformat()
-
- # Handle other objects by converting to string if needed
- try:
- json.dumps(obj)
- return obj
- except (TypeError, OverflowError):
- return str(obj)
-
-def load_from_cache(cache_key: str) -> Tuple[Any, bool]:
- """Load data from cache if it exists and is not expired.
-
- Returns:
- Tuple of (data, is_valid)
- """
- cache_file = get_cache_path(cache_key)
- if not cache_file.exists():
- return None, False
-
- try:
- with open(cache_file, 'r') as f:
- cache_data = json.load(f)
+ if not FMP_API_KEY:
+ logger.warning("FMP API key not configured, skipping FMP data fetch")
+ return None
- # Check if cache is expired
- timestamp = datetime.fromisoformat(cache_data["timestamp"])
- if datetime.now() - timestamp > timedelta(days=CACHE_EXPIRATION_DAYS):
- return cache_data["data"], False # Expired but usable as fallback
+ session = get_fmp_session()
- # Restore any special data structures
- data = restore_from_serializable(cache_data["data"])
- return data, True # Valid cache
- except Exception as e:
- print(f"Error loading from cache: {str(e)}")
- return None, False
-
-def restore_from_serializable(obj):
- """Restore special data structures from serialized format."""
- if obj is None:
- return None
+ # Get profile data for current price
+ profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}"
+ logger.info(f"Fetching FMP profile data for {ticker}")
+ profile_response = session.get(profile_url)
- # Restore pandas Series
- if isinstance(obj, dict) and obj.get("__pandas_series__"):
- return pd.Series(
- data=obj["values"],
- index=obj["index"],
- name=obj["name"]
- )
-
- # Restore pandas DataFrame
- elif isinstance(obj, dict) and obj.get("__pandas_dataframe__"):
- return pd.DataFrame(
- data=obj["data"],
- columns=obj["columns"],
- index=obj["index"]
- )
-
- # Restore nested dictionaries
- elif isinstance(obj, dict):
- return {k: restore_from_serializable(v) for k, v in obj.items()}
-
- # Restore nested lists
- elif isinstance(obj, list):
- return [restore_from_serializable(item) for item in obj]
-
- # Return original object
- return obj
-
-def get_cache_stats() -> Dict:
- """Get cache statistics."""
- cache_files = list(CACHE_DIR.glob("*.json"))
- stats = {
- "file_count": len(cache_files),
- "total_size_kb": sum(f.stat().st_size for f in cache_files) / 1024,
- "sources": {},
- "tickers": set()
- }
-
- for file in cache_files:
- # Extract info from filename
- name = file.stem
- if "_" in name:
- parts = name.split("_")
- if len(parts) >= 2:
- source = parts[0]
- ticker = parts[1]
-
- if source not in stats["sources"]:
- stats["sources"][source] = 0
- stats["sources"][source] += 1
- stats["tickers"].add(ticker)
-
- stats["tickers"] = list(stats["tickers"])
- stats["ticker_count"] = len(stats["tickers"])
-
- return stats
-
-def clear_cache(ticker: str = None) -> None:
- """Clear cache files.
-
- Args:
- ticker: If provided, only clear cache for this ticker
- """
- if ticker:
- # Clear only files for this ticker
- for file in CACHE_DIR.glob(f"*_{ticker.upper()}_*.json"):
- file.unlink()
- else:
- # Clear all cache files
- for file in CACHE_DIR.glob("*.json"):
- file.unlink()
-
-def fetch_with_retry(fetch_func: Callable[[], T], attempts: int = RETRY_ATTEMPTS, delay: int = RETRY_DELAY) -> Tuple[Optional[T], str]:
- """Generic retry function for API calls that might fail temporarily.
-
- Args:
- fetch_func: Function to execute
- attempts: Number of retry attempts
- delay: Delay between retries in seconds
-
- Returns:
- Tuple of (result, debug_info)
- """
- debug_info = ""
- result = None
-
- for attempt in range(attempts):
- try:
- result = fetch_func()
- return result, debug_info
- except Exception as e:
- debug_info += f"Attempt {attempt+1} failed: {str(e)}\n"
- if attempt < attempts - 1:
- time.sleep(delay)
-
- return None, debug_info
-
-def fetch_fmp_data(ticker: str) -> Tuple[Dict, str]:
- """Fetch ETF data from FMP API with caching.
-
- Args:
- ticker: The ETF ticker symbol
-
- Returns:
- Tuple of (data_dict, debug_info)
- """
- debug_info = ""
- result = {
- "profile": None,
- "quote": None,
- "dividend_history": None
- }
-
- # Get API key
- API_KEY = os.environ.get("FMP_API_KEY")
- if not API_KEY:
- API_KEY = st.session_state.get("fmp_api_key")
- if not API_KEY:
- return result, "FMP API key not found"
-
- # Check if we should use force refresh
- force_refresh = st.session_state.get("force_refresh_data", False)
-
- try:
- # Fetch profile data with cache
- profile_cache_key = generate_cache_key("fmp", ticker, "profile")
- profile_data, is_valid = load_from_cache(profile_cache_key) if not force_refresh else (None, False)
-
- if not is_valid:
- # Need to fetch from API
- profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}"
- profile_response = requests.get(profile_url)
- if profile_response.status_code == 200:
- profile_data = profile_response.json()
- save_to_cache(profile_cache_key, profile_data)
- debug_info += f"Profile data fetched from API and cached\n"
- # Track API call
- if "api_calls" in st.session_state:
- st.session_state.api_calls += 1
- else:
- debug_info += f"Profile request failed with status {profile_response.status_code}\n"
- profile_data = None
- else:
- debug_info += f"Profile data loaded from cache\n"
+ if profile_response.status_code != 200:
+ logger.error(f"FMP API error for {ticker}: {profile_response.status_code}")
+ return None
- result["profile"] = profile_data
+ profile_data = profile_response.json()
+
+ if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0:
+ logger.warning(f"No profile data found for {ticker} in FMP")
+ return None
- # Fetch quote data with cache
- quote_cache_key = generate_cache_key("fmp", ticker, "quote")
- quote_data, is_valid = load_from_cache(quote_cache_key) if not force_refresh else (None, False)
-
- if not is_valid:
- # Need to fetch from API
- quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}"
- quote_response = requests.get(quote_url)
- if quote_response.status_code == 200:
- quote_data = quote_response.json()
- save_to_cache(quote_cache_key, quote_data)
- debug_info += f"Quote data fetched from API and cached\n"
- # Track API call
- if "api_calls" in st.session_state:
- st.session_state.api_calls += 1
- else:
- debug_info += f"Quote request failed with status {quote_response.status_code}\n"
- quote_data = None
- else:
- debug_info += f"Quote data loaded from cache\n"
+ profile = profile_data[0]
+ current_price = float(profile.get('price', 0))
+ if current_price <= 0:
+ logger.error(f"Invalid price for {ticker}: {current_price}")
+ return None
- result["quote"] = quote_data
+ # Get dividend history
+ dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}"
+ logger.info(f"Fetching FMP dividend data for {ticker}")
+ dividend_response = session.get(dividend_url)
+
+ if dividend_response.status_code != 200:
+ logger.error(f"FMP API error for dividend data: {dividend_response.status_code}")
+ return None
- # Fetch dividend history with cache
- dividend_cache_key = generate_cache_key("fmp", ticker, "dividend_history")
- dividend_data, is_valid = load_from_cache(dividend_cache_key) if not force_refresh else (None, False)
+ dividend_data = dividend_response.json()
- if not is_valid:
- # Need to fetch from API
- dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}"
- dividend_response = requests.get(dividend_url)
- if dividend_response.status_code == 200:
- dividend_data = dividend_response.json()
- save_to_cache(dividend_cache_key, dividend_data)
- debug_info += f"Dividend history fetched from API and cached\n"
- # Track API call
- if "api_calls" in st.session_state:
- st.session_state.api_calls += 1
- else:
- debug_info += f"Dividend history request failed with status {dividend_response.status_code}\n"
- dividend_data = None
- else:
- debug_info += f"Dividend history loaded from cache\n"
+ if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]:
+ logger.warning(f"No dividend history found for {ticker}")
+ return None
- result["dividend_history"] = dividend_data
+ # Calculate TTM dividend
+ dividends = pd.DataFrame(dividend_data["historical"])
+ dividends["date"] = pd.to_datetime(dividends["date"])
+ dividends = dividends.sort_values("date")
- return result, debug_info
- except Exception as e:
- debug_info += f"FMP API request failed: {str(e)}\n"
- return result, debug_info
-
-def yf_fetch_with_cache(ticker: str, data_type: str, fetch_func: Callable) -> Tuple[Any, str]:
- """Fetch data from yfinance with caching.
-
- Args:
- ticker: Ticker symbol
- data_type: Type of data (info, dividends, history)
- fetch_func: Function to execute for fetching data
+ # Get dividends in the last 12 months
+ one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
+ recent_dividends = dividends[dividends["date"] >= one_year_ago]
- Returns:
- Tuple of (data, debug_info)
- """
- debug_info = ""
-
- # Generate cache key for this request
- cache_key = generate_cache_key("yf", ticker, data_type)
-
- # Check if we should force refresh
- force_refresh = st.session_state.get("force_refresh_data", False)
-
- # Try to get data from cache first
- data, is_valid = load_from_cache(cache_key) if not force_refresh else (None, False)
-
- if is_valid:
- # We have valid cached data
- debug_info = f"Data for {ticker} ({data_type}) loaded from cache"
- return data, debug_info
-
- # Need to fetch data from API
- data, api_debug = fetch_with_retry(fetch_func)
- debug_info += api_debug
-
- # If successful, cache the data
- if data is not None:
- save_to_cache(cache_key, data)
- debug_info += f"\nData for {ticker} ({data_type}) saved to cache"
-
- return data, debug_info
-
-def process_ticker_data(ticker: str, debug: bool = False) -> Tuple[Optional[Dict], Tuple[str, str, str], List[str]]:
- """Process a single ticker to get all relevant data.
-
- Args:
- ticker: The ticker symbol
- debug: Whether to include debug information
-
- Returns:
- Tuple of (ticker_data, error_info, warnings) where:
- - ticker_data is the processed data or None
- - error_info is (ticker, reason, debug_info) or None
- - warnings is a list of warning messages
- """
- debug_info = ""
- warnings = []
-
- # Check if this is a high-yield ETF that would benefit from FMP API verification
- is_high_yield = ticker in HIGH_YIELD_ETFS
-
- try:
- # First try yfinance as primary data source
- yf_ticker = yf.Ticker(ticker)
-
- # Fetch price and inception data with caching
- info, price_debug = yf_fetch_with_cache(
- ticker,
- "info",
- lambda: yf_ticker.info
- )
- debug_info += price_debug
-
- if not info or not info.get("previousClose"):
- if not USE_FMP_API or not is_high_yield:
- return None, (ticker, "No price data available", debug_info), []
+ if recent_dividends.empty:
+ logger.warning(f"No recent dividends found for {ticker}")
+ return None
- # Default values from yfinance
- price = info.get("previousClose", 0) if info else 0
- inception_date = info.get("fundInceptionDate") if info else None
+ # Calculate TTM dividend
+ ttm_dividend = recent_dividends["dividend"].sum()
- # Get NAV data
- nav = info.get("navPrice", None) if info else None
- if nav is None and info:
- # For some ETFs, this might be stored under a different key
- nav = info.get("regularMarketPrice", price)
+ # Calculate yield
+ yield_pct = (ttm_dividend / current_price) * 100
- # Calculate premium/discount to NAV
- nav_premium = 0
- if nav and nav > 0:
- nav_premium = ((price / nav) - 1) * 100 # as percentage
+ logger.info(f"Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})")
- if debug:
- debug_info += f"\nYFinance - Price: {price}\nInception Date: {inception_date}\nNAV: {nav}\nPremium/Discount: {nav_premium:.2f}%\n"
-
- # If this is a high-yield ETF and FMP API is enabled, get additional data from FMP API
- fmp_yield_calculated = None
- fmp_price = None
- fmp_dist_period = None
-
- if USE_FMP_API and is_high_yield:
- fmp_data, fmp_debug = fetch_fmp_data(ticker)
- debug_info += f"\nFMP API Debug: {fmp_debug}\n"
-
- # Extract data from FMP response
- if fmp_data["profile"] and len(fmp_data["profile"]) > 0:
- profile = fmp_data["profile"][0]
- fmp_price = profile.get("price", price) # Default to yfinance price if not available
- last_div = profile.get("lastDiv", 0)
-
- if fmp_price > 0 and last_div > 0:
- # Calculate yield from profile data
- fmp_profile_yield = (last_div / fmp_price) * 100
- debug_info += f"FMP Profile Yield: {fmp_profile_yield:.2f}%\n"
-
- # Extract yield from quote data
- if fmp_data["quote"] and len(fmp_data["quote"]) > 0:
- quote = fmp_data["quote"][0]
- if "dividendYield" in quote:
- div_yield = quote["dividendYield"]
- fmp_quote_yield = div_yield * 100 if div_yield < 1 else div_yield
- debug_info += f"FMP Quote Yield: {fmp_quote_yield:.2f}%\n"
-
- # Update price if available
- if "price" in quote and not fmp_price:
- fmp_price = quote["price"]
-
- # Calculate yield from dividend history
- if fmp_data["dividend_history"] and "historical" in fmp_data["dividend_history"] and fmp_data["dividend_history"]["historical"]:
- recent_divs = fmp_data["dividend_history"]["historical"][:3] # Get last 3 dividends
-
- if recent_divs and "dividend" in recent_divs[0] and fmp_price:
- # Try to figure out payment frequency
- if len(recent_divs) >= 2:
- try:
- date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d")
- date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d")
- days_between = abs((date1 - date2).days)
-
- if days_between < 45: # Monthly
- frequency = 12
- fmp_dist_period = "Monthly"
- elif days_between < 100: # Quarterly
- frequency = 4
- fmp_dist_period = "Quarterly"
- elif days_between < 200: # Semi-annual
- frequency = 2
- fmp_dist_period = "Semi-Annually"
- else: # Annual
- frequency = 1
- fmp_dist_period = "Annually"
-
- annual_div = recent_divs[0]["dividend"] * frequency
- fmp_yield_calculated = (annual_div / fmp_price) * 100
-
- debug_info += f"FMP Calculated Yield: {fmp_yield_calculated:.2f}% (Distribution: {fmp_dist_period})\n"
- except Exception as e:
- debug_info += f"Error calculating FMP yield: {str(e)}\n"
-
- # Decide which data source to use - for high-yield ETFs, prefer FMP if available
- use_fmp = USE_FMP_API and is_high_yield and fmp_yield_calculated is not None
-
- # If we're using FMP data for high-yield ETFs
- if use_fmp:
- if debug:
- debug_info += f"Using FMP data for {ticker} (high-yield ETF)\n"
-
- # For high-yield ETFs, FMP data is typically more accurate
- yield_pct = fmp_yield_calculated
- final_price = fmp_price if fmp_price else price
- dist_period = fmp_dist_period if fmp_dist_period else HIGH_YIELD_ETFS[ticker]["frequency"]
- income_per_1k = (1000 / final_price) * (yield_pct * final_price) / 100
-
- # Add a note that we're using validated data
- debug_info += f"Using validated FMP yield data: {yield_pct:.2f}%\n"
-
- else:
- # For normal ETFs, proceed with yfinance data
- # Fetch dividend data from yfinance with caching
- dividends, div_debug = yf_fetch_with_cache(
- ticker,
- "dividends",
- lambda: yf_ticker.dividends
- )
- debug_info += div_debug
-
- if dividends is None or dividends.empty:
- return None, (ticker, "No dividend data available", debug_info), []
-
- dividends = dividends.reset_index()
- dividends.columns = ["date", "amount"]
- dividends["date"] = pd.to_datetime(dividends["date"])
- last_year = dividends[dividends["date"] >= pd.Timestamp.now(tz='America/New_York') - pd.Timedelta(days=365)]
- ttm_dividend = last_year["amount"].sum()
-
- if not ttm_dividend and not last_year.empty:
- ttm_dividend = dividends["amount"].mean() * 12
- debug_info += f"\nFallback: Estimated TTM dividend = {ttm_dividend:.2f}"
-
- if not ttm_dividend or not price:
- return None, (ticker, f"Missing data: Price={price}, Dividend={ttm_dividend}", debug_info), []
-
- yield_pct = (ttm_dividend / price) * 100
- income_per_1k = (1000 / price) * ttm_dividend
-
- # Check for unrealistic yields
- if yield_pct > MAX_YIELD_THRESHOLD and ticker not in HIGH_YIELD_ETFS:
- warnings.append(f"Unrealistic yield for {ticker}: {yield_pct:.2f}%. Verify data accuracy.")
- debug_info += f"Warning: Yield {yield_pct:.2f}% exceeds {MAX_YIELD_THRESHOLD}% threshold\n"
-
- # Use reference yield if available
- if ticker in ETF_REFERENCE_DB:
- yield_pct = ETF_REFERENCE_DB[ticker]
- debug_info += f"Corrected to reference yield: {yield_pct:.2f}%\n"
- income_per_1k = (1000 / price) * (yield_pct * price) / 100
-
- # Calculate distribution period
- if len(last_year) >= 2:
- intervals = (last_year["date"].diff().dt.days).dropna()
- avg_interval = intervals.mean()
- if avg_interval <= 45:
- dist_period = "Monthly"
- elif avg_interval <= 100:
- dist_period = "Quarterly"
- elif avg_interval <= 200:
- dist_period = "Semi-Annually"
- else:
- dist_period = "Annually"
- else:
- dist_period = "Unknown"
-
- # Convert inception date to datetime
- inception_date_str = None
- if inception_date:
- try:
- # Ensure timestamp is valid and convert to UTC
- inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True)
- inception_date_str = inception_date_dt.strftime("%Y-%m-%d")
- except Exception as e:
- debug_info += f"Invalid inception date format: {inception_date}, error: {str(e)}\n"
- inception_date_str = None
-
- # Final data with validated yield info
- final_price = fmp_price if use_fmp and fmp_price else price
-
- return {
+ etf_data = {
"Ticker": ticker,
- "Price": round(final_price, 2),
- "NAV": round(nav, 2) if nav else None,
- "Premium/Discount (%)": round(nav_premium, 2) if nav else None,
- "Dividend Rate": round((yield_pct * final_price) / 100, 2),
- "Yield (%)": round(yield_pct, 2),
- "Income per $1K": round(income_per_1k, 2),
- "Distribution Period": dist_period,
- "Inception Date": inception_date_str,
- "Data Source": "FMP API" if use_fmp else "YFinance"
- }, None, warnings
-
- except Exception as e:
- return None, (ticker, f"Error processing ticker: {str(e)}", debug_info), []
-
-# --- Validate ETF Input ---
-def validate_etf_input(etf_allocations: List[Dict]) -> List[str]:
- """Validate ETF tickers from session state."""
- if not etf_allocations:
- st.error("Please add at least one ETF.")
- return []
- tickers = [etf["ticker"] for etf in etf_allocations]
- valid_tickers = []
- for t in tickers:
- if re.match(r'^[A-Z]{1,7}$', t):
- try:
- yf_ticker = yf.Ticker(t)
- info, _ = fetch_with_retry(lambda: yf_ticker.info)
- if info and info.get("previousClose"):
- valid_tickers.append(t)
- else:
- st.warning(f"Skipping {t}: No price data available.")
- except Exception as e:
- st.warning(f"Skipping {t}: Failed to fetch data ({str(e)}).")
- else:
- st.warning(f"Invalid ticker: {t}. Must be 1-7 uppercase letters.")
- if not valid_tickers:
- st.error("No valid tickers found. Please check tickers and try again.")
- return valid_tickers
-
-# --- Fetch ETF Data ---
-@st.cache_data(show_spinner=False)
-def fetch_etfs(tickers: str, debug: bool, use_parallel: bool = True) -> Tuple[pd.DataFrame, List[Tuple[str, str, str]]]:
- """Fetch ETF data from yfinance and FMP API with retries."""
- tickers_list = tickers.split(",")
- valid, skipped = [], []
- all_warnings = []
- progress = st.progress(0)
- status = st.empty()
-
- # Initialize API call counter if not in session state
- if "api_calls" not in st.session_state:
- st.session_state.api_calls = 0
-
- # Check if we need FMP API key
- if USE_FMP_API and any(t in HIGH_YIELD_ETFS for t in tickers_list):
- # Get API key - either from environment or input
- API_KEY = os.environ.get("FMP_API_KEY")
- if not API_KEY and "fmp_api_key" not in st.session_state:
- API_KEY = st.text_input("Enter FMP API Key for more accurate yield data:", type="password")
- st.session_state.fmp_api_key = API_KEY
- if not API_KEY:
- st.warning("Without FMP API key, high-yield ETF data may be less accurate.")
-
- # Define sequential processing function
- def process_sequentially():
- for idx, ticker in enumerate(tickers_list):
- status.text(f"Fetching {ticker} ({idx+1}/{len(tickers_list)})...")
- progress.progress((idx + 1) / len(tickers_list))
-
- ticker_data, error, warnings = process_ticker_data(ticker, debug)
-
- if ticker_data:
- valid.append(ticker_data)
- all_warnings.extend(warnings)
- elif error:
- skipped.append(error)
-
- # Rate limit
- if idx < len(tickers_list) - 1:
- time.sleep(60 / REQUESTS_PER_MINUTE)
-
- # Define parallel processing function
- def process_parallel():
- def process_with_status(ticker):
- # No Streamlit operations in this thread
- return process_ticker_data(ticker, debug)
-
- # Create a list to collect results
- results = []
-
- # Show processing status
- status.text(f"Fetching {len(tickers_list)} ETFs in parallel...")
-
- # Use ThreadPoolExecutor for parallel processing
- with concurrent.futures.ThreadPoolExecutor(max_workers=min(MAX_WORKERS, len(tickers_list))) as executor:
- # Submit all tasks
- future_to_ticker = {executor.submit(process_with_status, ticker): ticker for ticker in tickers_list}
-
- # Process results as they complete
- for i, future in enumerate(concurrent.futures.as_completed(future_to_ticker)):
- progress.progress((i + 1) / len(tickers_list))
- ticker = future_to_ticker[future]
- try:
- ticker_data, error, warnings = future.result()
- if ticker_data:
- valid.append(ticker_data)
- all_warnings.extend(warnings)
- elif error:
- skipped.append(error)
- except Exception as e:
- skipped.append((ticker, f"Thread error: {str(e)}", ""))
-
- # Choose processing method based on setting
- if use_parallel and len(tickers_list) > 1:
- try:
- process_parallel()
- except Exception as e:
- st.error(f"Error in parallel processing: {str(e)}. Falling back to sequential processing.")
- valid, skipped = [], []
- process_sequentially()
- else:
- process_sequentially()
-
- # Display warnings collected from all threads
- for warning in all_warnings:
- st.warning(warning)
-
- if debug and skipped:
- st.subheader("🛑 Skipped Tickers (Debug)")
- st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]), use_container_width=True)
-
- progress.empty()
- status.empty()
-
- # Check if we have data source information and add it to the display
- df = pd.DataFrame(valid)
-
- # Debug info about data sources
- if debug and not df.empty and "Data Source" in df.columns:
- source_counts = df["Data Source"].value_counts()
- st.info(f"Data sources used: {dict(source_counts)}")
-
- return df, skipped
-
-# --- Test FMP API Function (for debugging) ---
-def test_fmp_api():
- """Test function to verify FMP API responses for ETF yield data."""
- st.subheader("FMP API Test Results")
-
- # Get API key from environment or input
- API_KEY = os.environ.get("FMP_API_KEY")
- if not API_KEY and "fmp_api_key" not in st.session_state:
- API_KEY = st.text_input("Enter your FMP API Key:", type="password")
- st.session_state.fmp_api_key = API_KEY
- else:
- API_KEY = st.session_state.get("fmp_api_key", API_KEY)
-
- if not API_KEY:
- st.warning("Please enter your FMP API key to continue")
- return
-
- # List of ETFs to test (including high-yield ETFs)
- test_tickers_default = "MSTY,SCHD,JEPI,SMCY,SPY"
- test_tickers_input = st.text_input("Enter ETF tickers to test (comma separated):", test_tickers_default)
- test_tickers = [ticker.strip() for ticker in test_tickers_input.split(",") if ticker.strip()]
-
- if st.button("Run FMP API Test"):
- results = []
-
- for ticker in test_tickers:
- st.write(f"### Testing {ticker}")
-
- # Try profile endpoint
- profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}"
- response = requests.get(profile_url)
-
- with st.expander(f"{ticker} Profile Response (Status: {response.status_code})"):
- if response.status_code == 200:
- data = response.json()
- if data and len(data) > 0:
- # Check if there's yield info
- if "lastDiv" in data[0]:
- price = data[0].get("price", 0)
- last_div = data[0].get("lastDiv", 0)
- if price > 0 and last_div > 0:
- div_yield = (last_div / price) * 100
- st.write(f"- lastDiv: {last_div}")
- st.write(f"- price: {price}")
- st.write(f"- calculated yield: {div_yield:.2f}%")
- else:
- st.write(f"- lastDiv: {last_div}")
- st.write(f"- price: {price}")
- st.write("- Cannot calculate yield (price or lastDiv is zero)")
- else:
- st.write("- No 'lastDiv' found in response")
-
- # Save other useful fields
- for field in ["companyName", "symbol", "industry", "sector"]:
- if field in data[0]:
- st.write(f"- {field}: {data[0][field]}")
- else:
- st.write("- Empty response data")
- else:
- st.write(f"- Error response: {response.text}")
-
- # Also try quote endpoint
- quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}"
- response = requests.get(quote_url)
-
- with st.expander(f"{ticker} Quote Response (Status: {response.status_code})"):
- if response.status_code == 200:
- data = response.json()
- if data and len(data) > 0:
- # Check if there's yield info
- if "dividendYield" in data[0]:
- div_yield = data[0]["dividendYield"] * 100 if data[0]["dividendYield"] < 1 else data[0]["dividendYield"]
- st.write(f"- dividendYield: {data[0]['dividendYield']}")
- st.write(f"- formatted yield: {div_yield:.2f}%")
- else:
- st.write("- No 'dividendYield' found in response")
-
- # Save other useful fields
- for field in ["name", "price", "exchange", "marketCap"]:
- if field in data[0]:
- st.write(f"- {field}: {data[0][field]}")
- else:
- st.write("- Empty response data")
- else:
- st.write(f"- Error response: {response.text}")
-
- # Also try historical dividends endpoint
- dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}"
- response = requests.get(dividend_url)
-
- with st.expander(f"{ticker} Dividend History Response (Status: {response.status_code})"):
- if response.status_code == 200:
- data = response.json()
- if "historical" in data and data["historical"]:
- recent_divs = data["historical"][:3] # Show last 3 dividends
- for div in recent_divs:
- st.write(f"- Date: {div.get('date')}, Dividend: {div.get('dividend')}")
-
- # Calculate annualized yield if possible
- if recent_divs and "dividend" in recent_divs[0]:
- # Try to get current price from previous quote response
- current_price = None
- quote_response = requests.get(quote_url)
- if quote_response.status_code == 200:
- quote_data = quote_response.json()
- if quote_data and len(quote_data) > 0 and "price" in quote_data[0]:
- current_price = quote_data[0]["price"]
-
- if current_price:
- # Use most recent dividend payment and estimate annual yield
- if len(recent_divs) >= 2:
- # Try to figure out payment frequency
- try:
- date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d")
- date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d")
- days_between = abs((date1 - date2).days)
-
- if days_between < 45: # Monthly
- frequency = 12
- freq_text = "monthly"
- elif days_between < 100: # Quarterly
- frequency = 4
- freq_text = "quarterly"
- elif days_between < 200: # Semi-annual
- frequency = 2
- freq_text = "semi-annual"
- else: # Annual
- frequency = 1
- freq_text = "annual"
-
- annual_div = recent_divs[0]["dividend"] * frequency
- estimated_yield = (annual_div / current_price) * 100
- st.write(f"- Estimated {freq_text} yield: {estimated_yield:.2f}% (price: ${current_price})")
- except Exception as e:
- st.write(f"- Error calculating estimated yield: {str(e)}")
- else:
- st.write("- Not enough dividend history to determine frequency")
- else:
- st.write("- Cannot calculate yield (price not available)")
- else:
- st.write("- No dividend history found")
- else:
- st.write(f"- Error response: {response.text}")
-
- st.write("---")
-
- st.write("Note: Add the 'test_fmp_api' function to the sidebar to use it for debugging.")
-
-# --- PDF Export Function ---
-def create_pdf_report(final_alloc, df_data, chat_summary=None):
- """Generate a PDF report of the ETF portfolio.
-
- Args:
- final_alloc: DataFrame of final allocations
- df_data: DataFrame of raw ETF data
- chat_summary: Optional text summary from ChatGPT
-
- Returns:
- Base64 encoded PDF file
- """
- # Calculate summary metrics
- total_capital = final_alloc["Capital Allocated ($)"].sum()
- total_income = final_alloc["Income Contributed ($)"].sum()
- monthly_income = total_income / 12
- weighted_yield = (final_alloc["Income Contributed ($)"] * final_alloc["Yield (%)"]).sum() / total_income if total_income else 0
-
- # Calculate DRIP forecast if needed
- if st.session_state.drip_enabled:
- drip_forecast = calculate_drip_growth(final_alloc)
-
- # Get key metrics for DRIP
- initial_value = drip_forecast["Total Value ($)"].iloc[0]
- final_value = drip_forecast["Total Value ($)"].iloc[-1]
- value_growth = final_value - initial_value
- value_growth_pct = (value_growth / initial_value) * 100
-
- initial_income = drip_forecast["Monthly Income ($)"].iloc[0] * 12
- final_income = drip_forecast["Monthly Income ($)"].iloc[-1] * 12
- income_growth = final_income - initial_income
- income_growth_pct = (income_growth / initial_income) * 100
-
- total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1]
- years_to_recover = 100 * initial_value / final_income # 100% recovery
-
- # Create HTML report
- html = f"""
-
-
-
-
-
-
-
- Portfolio Summary
-
-
-
-
Total Capital
-
${total_capital:,.2f}
-
-
-
Annual Income
-
${total_income:,.2f}
-
-
-
Monthly Income
-
${monthly_income:,.2f}
-
-
-
Weighted Yield
-
{weighted_yield:.2f}%
-
-
-
- ETF Allocation Details
-
-
-
- | Ticker |
- Capital ($) |
- Income ($) |
- Allocation (%) |
- Yield (%) |
- Risk Level |
- Distribution |
-
- """
-
- # Add rows for each ETF
- for _, row in final_alloc.iterrows():
- risk_class = ""
- if "High" in str(row.get("Risk Level", "")):
- risk_class = "risk-high"
- elif "Medium" in str(row.get("Risk Level", "")):
- risk_class = "risk-medium"
- elif "Average" in str(row.get("Risk Level", "")):
- risk_class = "risk-average"
-
- html += f"""
-
- | {row["Ticker"]} |
- ${row["Capital Allocated ($)"]:,.2f} |
- ${row["Income Contributed ($)"]:,.2f} |
- {row["Allocation (%)"]:.2f}% |
- {row["Yield (%)"]:.2f}% |
- {row.get("Risk Level", "Unknown")} |
- {row.get("Distribution Period", "Unknown")} |
-
- """
-
- html += """
-
- """
-
- # Add DRIP forecast if enabled
- if st.session_state.drip_enabled:
- html += f"""
- Dividend Reinvestment (DRIP) Forecast
-
-
-
-
1-Year Value Growth
-
${value_growth:,.2f} ({value_growth_pct:.2f}%)
-
-
-
1-Year Income Growth
-
${income_growth:,.2f} ({income_growth_pct:.2f}%)
-
-
-
Total Dividends Earned
-
${total_dividends:,.2f}
-
-
-
Years to Recover Capital
-
{years_to_recover:.2f}
-
-
-
- This forecast assumes ETF prices remain constant and all dividends are reinvested proportionally to original allocations.
-
-
-
- | Month |
- Portfolio Value ($) |
- Monthly Income ($) |
- Cumulative Income ($) |
-
- """
-
- # Add rows for each month
- for _, row in drip_forecast.iterrows():
- month = row["Month"]
- value = row["Total Value ($)"]
- monthly = row["Monthly Income ($)"]
- cumulative = row["Cumulative Income ($)"]
-
- html += f"""
-
- | {month} |
- ${value:,.2f} |
- ${monthly:,.2f} |
- ${cumulative:,.2f} |
-
- """
-
- html += """
-
- """
-
- # Add ChatGPT summary if available
- if chat_summary:
- html += f"""
- ETF Analysis
- {chat_summary.replace(chr(10), '
')}
- """
-
- # Add footer
- html += """
-
-
-
- """
-
- # Create PDF from HTML
- try:
- # Create temporary file for the PDF
- with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
- pdf_path = tmp.name
-
- # Convert HTML to PDF
- pdfkit_options = {
- 'quiet': '',
- 'enable-local-file-access': None,
- 'page-size': 'Letter',
- 'margin-top': '0.75in',
- 'margin-right': '0.75in',
- 'margin-bottom': '0.75in',
- 'margin-left': '0.75in',
+ "Price": current_price,
+ "Yield (%)": yield_pct,
+ "Risk Level": "High" # Default for high-yield ETFs
}
+ logger.info(f"FMP data for {ticker}: {etf_data}")
+ return etf_data
- pdfkit.from_string(html, pdf_path, options=pdfkit_options)
-
- # Read the PDF file
- with open(pdf_path, 'rb') as pdf_file:
- pdf_data = pdf_file.read()
-
- # Delete the temporary file
- os.unlink(pdf_path)
-
- # Encode the PDF as base64
- return base64.b64encode(pdf_data).decode()
-
except Exception as e:
- st.error(f"Failed to create PDF: {str(e)}")
- if "wkhtmltopdf" in str(e).lower():
- st.error("Please install wkhtmltopdf: https://wkhtmltopdf.org/downloads.html")
+ logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
return None
-# --- ChatGPT Summary ---
-@st.cache_data(show_spinner=False)
-def get_chatgpt_summary(tickers: str, api_key: str) -> str:
- """Generate ETF summary using ChatGPT."""
- tickers_list = tickers.split(",")
- if not api_key:
- return "Please enter a valid OpenAI API key."
+def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]:
+ """
+ Fetch ETF data from yfinance as fallback.
+
+ Args:
+ ticker: ETF ticker symbol
+
+ Returns:
+ Dictionary with ETF data or None if failed
+ """
try:
- client = OpenAI(api_key=api_key)
- prompt = f"""
- Act as a financial analyst. Provide a concise summary (150-200 words) of these ETFs: {', '.join(tickers_list)}.
- Include for each:
- - Key characteristics (yield, sector exposure, strategy).
- - Investment suitability (risk level, investor type).
- - Recent performance trends (if available).
- Highlight risks and benefits. Use public data or your knowledge.
- """
- response = client.chat.completions.create(
- model="gpt-4o-mini",
- messages=[
- {"role": "system", "content": "You are a financial analyst specializing in ETFs."},
- {"role": "user", "content": prompt}
- ],
- max_tokens=300,
- temperature=0.7
- )
- return response.choices[0].message.content
- except Exception as e:
- return f"ChatGPT error: {str(e)}. Check API key or try again."
-
-# --- Assign Risk Level ---
-def assign_risk_level(df: pd.DataFrame, allocations: List[Dict]) -> pd.DataFrame:
- """Assign risk level based on yield, allocation, and ETF age."""
- df = df.copy()
- df["Risk Level"] = "Unknown"
- current_date = pd.Timestamp.now(tz='UTC')
-
- # Calculate total allocation to high-yield, new ETFs
- high_risk_alloc = 0
- for alloc in allocations:
- ticker = alloc["ticker"]
- alloc_pct = alloc["allocation"]
- row = df[df["Ticker"] == ticker]
- if row.empty:
- continue
- yield_pct = row["Yield (%)"].iloc[0]
- inception_date = row["Inception Date"].iloc[0]
- if pd.isna(inception_date):
- continue
- inception_date = pd.to_datetime(inception_date, utc=True)
- age_years = (current_date - inception_date).days / 365.25
- if yield_pct > 20 and age_years < 2:
- high_risk_alloc += alloc_pct
-
- for idx, row in df.iterrows():
- ticker = row["Ticker"]
- yield_pct = row["Yield (%)"]
- inception_date = row["Inception Date"]
- alloc_pct = next((a["allocation"] for a in allocations if a["ticker"] == ticker), 0)
-
- if pd.isna(inception_date):
- df.at[idx, "Risk Level"] = "Unknown (Missing Inception)"
- continue
-
- inception_date = pd.to_datetime(inception_date, utc=True)
- age_years = (current_date - inception_date).days / 365.25
-
- # Risk criteria
- is_new = age_years < 2
- is_mid_age = 2 <= age_years <= 5
- is_old = age_years > 5
- is_high_yield = yield_pct > 20
- is_mid_yield = 12 <= yield_pct <= 20
- is_low_yield = yield_pct < 12
- is_high_alloc = alloc_pct > 30
- is_mid_alloc = 20 <= alloc_pct <= 30
- is_portfolio_risky = high_risk_alloc > 50
-
- if (is_new or is_high_yield) and (is_high_alloc or is_portfolio_risky):
- df.at[idx, "Risk Level"] = "High Risk"
- elif is_mid_age or is_mid_yield or is_mid_alloc:
- df.at[idx, "Risk Level"] = "Medium Risk"
- elif is_old and is_low_yield and alloc_pct < 20:
- df.at[idx, "Risk Level"] = "Average Risk"
+ logger.info(f"Fetching yfinance data for {ticker}")
+ etf = yf.Ticker(ticker)
+ info = etf.info
+
+ # Get the most recent dividend yield
+ if 'dividendYield' in info and info['dividendYield'] is not None:
+ yield_pct = info['dividendYield'] * 100
+ logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%")
else:
- df.at[idx, "Risk Level"] = "Medium Risk"
+ # Try to calculate from dividend history
+ hist = etf.history(period="1y")
+ if not hist.empty and 'Dividends' in hist.columns:
+ annual_dividend = hist['Dividends'].sum()
+ current_price = info.get('regularMarketPrice', 0)
+ yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0
+ logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%")
+ else:
+ yield_pct = 0
+ logger.warning(f"No yield data found for {ticker} in yfinance")
+
+ # Get current price
+ current_price = info.get('regularMarketPrice', 0)
+ if current_price <= 0:
+ current_price = info.get('regularMarketPreviousClose', 0)
+ logger.warning(f"Using previous close price for {ticker}: {current_price}")
+
+ etf_data = {
+ "Ticker": ticker,
+ "Price": current_price,
+ "Yield (%)": yield_pct,
+ "Risk Level": "High" # Default for high-yield ETFs
+ }
+ logger.info(f"yfinance data for {ticker}: {etf_data}")
+ return etf_data
+
+ except Exception as e:
+ logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
+ return None
- return df
-
-# --- AI Suggestion ---
-def ai_suggestion(df: pd.DataFrame, target: float, user_allocations: List[Dict]) -> pd.DataFrame:
- """Generate AI-suggested portfolio with risk-mitigated allocations."""
- adjusted_df = df.copy()
- adjustments = []
- current_date = pd.Timestamp.now(tz='UTC')
-
- # Validate and adjust yields
- for idx, row in adjusted_df.iterrows():
- ticker = row["Ticker"]
- yield_pct = row["Yield (%)"]
- if ticker in EXPECTED_YIELDS:
- min_yield, max_yield = EXPECTED_YIELDS[ticker]
- if yield_pct > max_yield:
- adjusted_yield = max_yield
- adjustments.append(f"Adjusted {ticker} yield from {yield_pct:.2f}% to {max_yield:.2f}% (max expected).")
- adjusted_df.at[idx, "Yield (%)"] = adjusted_yield
- adjusted_df.at[idx, "Dividend Rate"] = (adjusted_yield / 100) * row["Price"]
- adjusted_df.at[idx, "Income per $1K"] = (1000 / row["Price"]) * adjusted_df.at[idx, "Dividend Rate"]
-
- # Optimize allocations: prioritize older, stable ETFs, limit high-yield exposure
- sorted_df = adjusted_df.sort_values(by=["Inception Date", "Yield (%)"], ascending=[True, False])
- ai_allocations = []
- total_alloc = 0
- high_yield_new_alloc = 0
-
- for idx, row in sorted_df.iterrows():
- ticker = row["Ticker"]
- yield_pct = row["Yield (%)"]
- inception_date = row["Inception Date"]
- if pd.isna(inception_date):
- continue
- inception_date = pd.to_datetime(inception_date, utc=True)
- age_years = (current_date - inception_date).days / 365.25
-
- # Allocate based on age and yield
- if age_years > 5 and yield_pct < 12: # Stable, older ETFs (e.g., JEPI)
- alloc = 20
- elif age_years >= 2 and yield_pct <= 20: # Mid-age, moderate yield (e.g., FEPI)
- alloc = 15
- else: # Newer, high-yield ETFs (e.g., CONY, MSTY)
- alloc = 10 # Limit to reduce risk
- if yield_pct > 20 and age_years < 2:
- high_yield_new_alloc += alloc
-
- # Cap high-yield, new ETF allocation
- if high_yield_new_alloc > 40:
- alloc = 0
-
- if total_alloc + alloc > 100:
- alloc = 100 - total_alloc
- if alloc <= 0:
- continue
-
- ai_allocations.append({"ticker": ticker, "allocation": alloc})
- total_alloc += alloc
-
- # Adjust to sum to 100%
- if total_alloc < 100:
- remaining = 100 - total_alloc
- for alloc in ai_allocations:
- if adjusted_df[adjusted_df["Ticker"] == alloc["ticker"]]["Yield (%)"].iloc[0] < 12:
- alloc["allocation"] += remaining
- break
- elif total_alloc > 100:
- excess = total_alloc - 100
- ai_allocations[-1]["allocation"] -= excess
-
- results = []
- weighted_yield = 0
- for _, row in adjusted_df.iterrows():
- ticker = row["Ticker"]
- alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0)
- if alloc_pct == 0:
- continue
- weighted_yield += alloc_pct * (row["Yield (%)"] / 100)
-
- if weighted_yield <= 0:
- st.error("AI Suggestion: Weighted yield is zero or negative. Check ETF data.")
+def fetch_etf_data(tickers: List[str]) -> pd.DataFrame:
+ """
+ Fetch ETF data using FMP API with yfinance fallback.
+
+ Args:
+ tickers: List of ETF tickers
+
+ Returns:
+ DataFrame with ETF data
+ """
+ try:
+ data = {}
+ for ticker in tickers:
+ if not ticker: # Skip empty tickers
+ continue
+
+ logger.info(f"Processing {ticker}")
+
+ # Try FMP first
+ etf_data = fetch_etf_data_fmp(ticker)
+
+ # If FMP fails, try yfinance
+ if etf_data is None:
+ logger.info(f"Falling back to yfinance for {ticker}")
+ etf_data = fetch_etf_data_yfinance(ticker)
+
+ if etf_data is not None:
+ # Validate and cap yield at a reasonable maximum (e.g., 30%)
+ etf_data["Yield (%)"] = min(etf_data["Yield (%)"], 30.0)
+ data[ticker] = etf_data
+ logger.info(f"Final data for {ticker}: {etf_data}")
+ else:
+ logger.error(f"Failed to fetch data for {ticker} from both sources")
+
+ if not data:
+ st.error("No ETF data could be fetched")
+ return pd.DataFrame()
+
+ df = pd.DataFrame(data.values())
+
+ # Validate the data
+ if df.empty:
+ st.error("No ETF data could be fetched")
+ return pd.DataFrame()
+
+ if (df["Price"] <= 0).any():
+ st.error("Some ETFs have invalid prices")
+ return pd.DataFrame()
+
+ if (df["Yield (%)"] <= 0).any():
+ st.warning("Some ETFs have zero or negative yields")
+
+ logger.info(f"Final DataFrame:\n{df}")
+ return df
+
+ except Exception as e:
+ st.error(f"Error fetching ETF data: {str(e)}")
+ logger.error(f"Error in fetch_etf_data: {str(e)}")
+ logger.error(traceback.format_exc())
return pd.DataFrame()
- total_capital = target / weighted_yield
-
- for _, row in adjusted_df.iterrows():
- ticker = row["Ticker"]
- alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0)
- if alloc_pct == 0:
- continue
- capital = total_capital * alloc_pct
- shares = capital / row["Price"]
- income = shares * row["Dividend Rate"]
+def run_portfolio_simulation(
+ mode: str,
+ target: float,
+ risk_tolerance: str,
+ etf_inputs: List[Dict[str, str]],
+ enable_drip: bool,
+ enable_erosion: bool
+) -> Tuple[pd.DataFrame, pd.DataFrame]:
+ """
+ Run the portfolio simulation.
+
+ Args:
+ mode: Simulation mode ("income_target" or "capital_target")
+ target: Target value (monthly income or initial capital)
+ risk_tolerance: Risk tolerance level
+ etf_inputs: List of ETF inputs
+ enable_drip: Whether to enable dividend reinvestment
+ enable_erosion: Whether to enable NAV & yield erosion
- # Create result dictionary with all available data
- result = {
- "Ticker": ticker,
- "Yield (%)": row["Yield (%)"],
- "Dividend Rate": row["Dividend Rate"],
- "Capital Allocated ($)": round(capital, 2),
- "Income Contributed ($)": round(income, 2),
- "Allocation (%)": round(alloc_pct * 100, 2),
- "Inception Date": row["Inception Date"],
- "Distribution Period": row.get("Distribution Period", "Unknown"),
- "Price": row["Price"]
- }
+ Returns:
+ Tuple of (ETF data DataFrame, Final allocation DataFrame)
+ """
+ try:
+ # Fetch real ETF data
+ tickers = [input["ticker"] for input in etf_inputs]
+ etf_data = fetch_etf_data(tickers)
- # Add NAV data if available
- if "NAV" in row and row["NAV"] is not None:
- result["NAV"] = row["NAV"]
- if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None:
- result["Premium/Discount (%)"] = row["Premium/Discount (%)"]
+ if etf_data is None or etf_data.empty:
+ st.error("Failed to fetch ETF data")
+ return pd.DataFrame(), pd.DataFrame()
+
+ # Calculate allocations based on risk tolerance
+ if risk_tolerance == "Conservative":
+ # Higher allocation to lower yield ETFs
+ sorted_data = etf_data.sort_values("Yield (%)")
+ allocations = [40.0, 40.0, 20.0] # More to lower yield
+ elif risk_tolerance == "Moderate":
+ # Balanced allocation
+ allocations = [33.33, 33.34, 33.33]
+ else: # Aggressive
+ # Higher allocation to higher yield ETFs
+ sorted_data = etf_data.sort_values("Yield (%)", ascending=False)
+ allocations = [20.0, 30.0, 50.0] # More to higher yield
+
+ # Create final allocation DataFrame
+ final_alloc = etf_data.copy()
+ final_alloc["Allocation (%)"] = allocations
+
+ if mode == "income_target":
+ # Calculate required capital for income target
+ monthly_income = target
+ annual_income = monthly_income * 12
- results.append(result)
-
- suggestion_df = pd.DataFrame(results)
- suggestion_df = assign_risk_level(suggestion_df, ai_allocations)
-
- if adjustments or ai_allocations != user_allocations:
- notes = ["The AI balanced high-yield ETFs with older, stable ETFs to mitigate risk while meeting the income target. Newer ETFs with high yields are capped to reduce exposure to unsustainable distributions."]
- if adjustments:
- notes.extend(adjustments)
- st.info("AI Suggestion Notes:\n- " + "\n- ".join(notes))
- return suggestion_df
-
-# --- Yield Trends ---
-def yield_chart(tickers: List[str], debug: bool = False):
- """Plot TTM yield trends."""
- fig = go.Figure()
- for ticker in tickers:
- try:
- yf_ticker = yf.Ticker(ticker)
+ # Calculate weighted average yield
+ weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
- # Get dividends with retry
- dividends, debug_info = fetch_with_retry(lambda: yf_ticker.dividends)
- if debug:
- st.write(f"DEBUG: {ticker} dividend data: {dividends.to_dict() if dividends is not None else 'None'}")
-
- if dividends is None or dividends.empty:
- continue
-
- dividends = dividends.reset_index()
- dividends.columns = ["date", "amount"]
- dividends["date"] = pd.to_datetime(dividends["date"])
-
- monthly = dividends.set_index("date")["amount"].resample("ME").sum()
- ttm_dividend = monthly.rolling(12).sum()
-
- # Get prices with retry
- prices, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y")["Close"])
- if prices is None:
- continue
+ # Validate weighted yield
+ if weighted_yield <= 0 or weighted_yield > 30:
+ st.error(f"Invalid weighted yield calculated: {weighted_yield:.2f}%")
+ return pd.DataFrame(), pd.DataFrame()
- avg_price = prices.rolling(252).mean()
- ttm_yield = (ttm_dividend / avg_price) * 100
- ttm_yield = ttm_yield.dropna()
+ # Calculate required capital based on weighted yield
+ required_capital = (annual_income / weighted_yield) * 100
+ else:
+ required_capital = target
+
+ # Calculate capital allocation and income
+ final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
+ final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
+ final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
+
+ # Apply erosion if enabled
+ if enable_erosion:
+ # Apply a small erosion factor to yield and price
+ erosion_factor = 0.98 # 2% erosion per year
+ final_alloc["Yield (%)"] = final_alloc["Yield (%)"] * erosion_factor
+ final_alloc["Price"] = final_alloc["Price"] * erosion_factor
+ final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
+
+ # Validate final calculations
+ total_capital = final_alloc["Capital Allocated ($)"].sum()
+ total_income = final_alloc["Income Contributed ($)"].sum()
+ effective_yield = (total_income / total_capital) * 100
+
+ if effective_yield <= 0 or effective_yield > 30:
+ st.error(f"Invalid effective yield calculated: {effective_yield:.2f}%")
+ return pd.DataFrame(), pd.DataFrame()
+
+ return etf_data, final_alloc
+ except Exception as e:
+ st.error(f"Error in portfolio simulation: {str(e)}")
+ logger.error(f"Error in run_portfolio_simulation: {str(e)}")
+ logger.error(traceback.format_exc())
+ return pd.DataFrame(), pd.DataFrame()
- fig.add_trace(go.Scatter(
- x=ttm_yield.index,
- y=ttm_yield,
- name=ticker,
- mode="lines",
- hovertemplate="%{x|%Y-%m}: %{y:.2f}%"
- ))
- except Exception as e:
- if debug:
- st.write(f"DEBUG: Error plotting {ticker}: {str(e)}")
- continue
-
- fig.update_layout(
- title="TTM Dividend Yield Trend",
- xaxis_title="Date",
- yaxis_title="Yield (%)",
- template="plotly_dark",
- hovermode="x unified",
- xaxis=dict(tickformat="%Y-%m"),
- yaxis=dict(gridcolor="rgba(255,255,255,0.2)")
- )
- st.plotly_chart(fig, use_container_width=True)
-
-# --- NAV Trends ---
-def nav_chart(tickers: List[str], debug: bool = False):
- """Plot NAV (Net Asset Value) trends over time for selected ETFs."""
- fig = go.Figure()
+def portfolio_summary(final_alloc: pd.DataFrame) -> None:
+ """
+ Display a summary of the portfolio allocation.
- for ticker in tickers:
- try:
- yf_ticker = yf.Ticker(ticker)
-
- # Get historical price data
- price_history, debug_info = fetch_with_retry(lambda: yf_ticker.history(period="2y"))
-
- if price_history is None or price_history.empty:
- if debug:
- st.write(f"DEBUG: No price history for {ticker}")
- continue
-
- # Extract NAV data - for most ETFs, we'll use Close price as a proxy for NAV
- # For some closed-end funds, there might be specific NAV history available
- nav_series = price_history["Close"]
-
- # Plot the NAV trend
- fig.add_trace(go.Scatter(
- x=nav_series.index,
- y=nav_series,
- name=f"{ticker} NAV",
- mode="lines",
- hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}"
- ))
-
- # Add price for comparison (slight transparency)
- fig.add_trace(go.Scatter(
- x=price_history.index,
- y=price_history["Close"],
- name=f"{ticker} Price",
- mode="lines",
- line=dict(dash="dot", width=1),
- opacity=0.7,
- hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}"
- ))
-
- except Exception as e:
- if debug:
- st.write(f"DEBUG: Error plotting NAV for {ticker}: {str(e)}")
- continue
-
- fig.update_layout(
- title="ETF NAV and Price Trends (2-Year)",
- xaxis_title="Date",
- yaxis_title="Value ($)",
- template="plotly_dark",
- hovermode="x unified",
- legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
- yaxis=dict(gridcolor="rgba(255,255,255,0.2)")
- )
- st.plotly_chart(fig, use_container_width=True)
-
-# --- Price/NAV Premium-Discount Chart ---
-def premium_discount_chart(tickers: List[str], df: pd.DataFrame, debug: bool = False):
- """Plot Premium/Discount to NAV for selected ETFs."""
- # Create dataframe for the bar chart
- premium_data = []
-
- for ticker in tickers:
- try:
- # Get data from our processed dataframe
- ticker_row = df[df["Ticker"] == ticker]
- if ticker_row.empty:
- continue
-
- premium = ticker_row["Premium/Discount (%)"].iloc[0]
- if premium is None:
- continue
-
- premium_data.append({
- "Ticker": ticker,
- "Premium/Discount (%)": premium
- })
-
- except Exception as e:
- if debug:
- st.write(f"DEBUG: Error getting premium/discount for {ticker}: {str(e)}")
- continue
-
- if not premium_data:
- st.info("No premium/discount data available for the selected ETFs.")
+ Args:
+ final_alloc: DataFrame containing the portfolio allocation
+ """
+ if final_alloc is None or final_alloc.empty:
+ st.warning("No portfolio data available.")
return
- premium_df = pd.DataFrame(premium_data)
-
- # Create the bar chart
- fig = px.bar(
- premium_df,
- x="Ticker",
- y="Premium/Discount (%)",
- title="Current Premium/Discount to NAV",
- template="plotly_dark",
- color="Premium/Discount (%)",
- color_continuous_scale=["red", "white", "green"],
- range_color=[-5, 5] # Typical range for premium/discount
- )
-
- # Add a reference line at 0
- fig.add_hline(
- y=0,
- line_width=1,
- line_dash="dash",
- line_color="white",
- annotation_text="NAV",
- annotation_position="bottom right"
- )
-
- fig.update_layout(
- yaxis=dict(zeroline=True, zerolinewidth=2, zerolinecolor="rgba(255,255,255,0.5)")
- )
-
- st.plotly_chart(fig, use_container_width=True)
-
-# --- Portfolio Summary ---
-def portfolio_summary(df: pd.DataFrame):
- """Display portfolio summary metrics."""
- total_capital = df["Capital Allocated ($)"].sum()
- total_income = df["Income Contributed ($)"].sum()
- weighted_yield = (df["Income Contributed ($)"] * df["Yield (%)"]).sum() / total_income if total_income else 0
-
- col1, col2, col3 = st.columns(3)
- with col1:
- st.metric("Total Capital", f"${total_capital:,.2f}")
- with col2:
- st.metric("Annual Income", f"${total_income:,.2f}")
- st.metric("Monthly Income", f"${total_income/12:,.2f}")
- with col3:
- st.metric("Weighted Yield", f"{weighted_yield:.2f}%")
-
-# --- Allocation Functions ---
-def allocate_for_income(df: pd.DataFrame, target: float, allocations: List[Dict]) -> pd.DataFrame:
- """Allocate capital to ETFs to meet the annual income target."""
try:
- # Store initial capital in session state
- st.session_state.initial_capital = target
- st.session_state.mode = "Income Target"
- st.session_state.target = target
+ # Calculate key metrics
+ total_capital = final_alloc["Capital Allocated ($)"].sum()
+ total_income = final_alloc["Income Contributed ($)"].sum()
- results = []
- weighted_yield = 0
- for _, row in df.iterrows():
- ticker = row["Ticker"]
- alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0)
- if alloc_pct == 0:
- continue
- weighted_yield += alloc_pct * (row["Yield (%)"] / 100)
-
- if weighted_yield <= 0:
- st.error("Weighted yield is zero or negative. Check ETF data.")
- return None
-
- total_capital = target / weighted_yield
-
- for _, row in df.iterrows():
- ticker = row["Ticker"]
- alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0)
- if alloc_pct == 0:
- continue
- capital = total_capital * alloc_pct
- shares = capital / row["Price"]
- income = shares * row["Dividend Rate"]
-
- # Create result dictionary with all available data
- result = {
- "Ticker": ticker,
- "Yield (%)": row["Yield (%)"],
- "Dividend Rate": round(row["Dividend Rate"], 2),
- "Capital Allocated ($)": round(capital, 2),
- "Income Contributed ($)": round(income, 2),
- "Allocation (%)": round(alloc_pct * 100, 2),
- "Inception Date": row["Inception Date"],
- "Distribution Period": row.get("Distribution Period", "Unknown"),
- "Price": row["Price"]
+ # Calculate weighted average yield
+ weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
+
+ # Display metrics in columns
+ col1, col2, col3 = st.columns(3)
+
+ with col1:
+ st.metric("Total Capital", f"${total_capital:,.2f}")
+
+ with col2:
+ st.metric("Annual Income", f"${total_income:,.2f}")
+ st.metric("Monthly Income", f"${total_income/12:,.2f}")
+
+ with col3:
+ st.metric("Average Yield", f"{weighted_yield:.2f}%")
+ st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
+
+ # Display allocation chart
+ fig = px.pie(
+ final_alloc,
+ values="Allocation (%)",
+ names="Ticker",
+ title="Portfolio Allocation by ETF",
+ hover_data={
+ "Ticker": True,
+ "Allocation (%)": ":.2f",
+ "Yield (%)": ":.2f",
+ "Capital Allocated ($)": ":,.2f",
+ "Income Contributed ($)": ":,.2f"
}
-
- # Add NAV data if available
- if "NAV" in row and row["NAV"] is not None:
- result["NAV"] = row["NAV"]
- if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None:
- result["Premium/Discount (%)"] = row["Premium/Discount (%)"]
-
- results.append(result)
-
- alloc_df = pd.DataFrame(results)
- alloc_df = assign_risk_level(alloc_df, allocations)
- return alloc_df
+ )
+ st.plotly_chart(fig, use_container_width=True)
+ # Display detailed allocation table
+ st.subheader("Detailed Allocation")
+ display_df = final_alloc.copy()
+ display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
+ display_df = display_df[[
+ "Ticker",
+ "Allocation (%)",
+ "Yield (%)",
+ "Price",
+ "Shares",
+ "Capital Allocated ($)",
+ "Monthly Income",
+ "Income Contributed ($)",
+ "Risk Level"
+ ]]
+
+ # Format the display
+ st.dataframe(
+ display_df.style.format({
+ "Allocation (%)": "{:.2f}%",
+ "Yield (%)": "{:.2f}%",
+ "Price": "${:,.2f}",
+ "Shares": "{:,.4f}",
+ "Capital Allocated ($)": "${:,.2f}",
+ "Monthly Income": "${:,.2f}",
+ "Income Contributed ($)": "${:,.2f}"
+ }),
+ use_container_width=True
+ )
+
+ except Exception as e:
+ st.error(f"Error calculating portfolio summary: {str(e)}")
+ logger.error(f"Error in portfolio_summary: {str(e)}")
+ logger.error(traceback.format_exc())
+
+def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool:
+ """
+ Save portfolio allocation to a JSON file.
+
+ Args:
+ portfolio_name: Name of the portfolio
+ final_alloc: DataFrame containing portfolio allocation
+ mode: Portfolio mode ("Income Target" or "Capital Target")
+ target: Target value (income or capital)
+
+ Returns:
+ bool: True if save was successful, False otherwise
+ """
+ try:
+ # Create portfolios directory if it doesn't exist
+ portfolios_dir = Path("portfolios")
+ portfolios_dir.mkdir(exist_ok=True)
+
+ # Prepare portfolio data
+ portfolio_data = {
+ "name": portfolio_name,
+ "created_at": datetime.now().isoformat(),
+ "mode": mode,
+ "target": target,
+ "allocations": []
+ }
+
+ # Convert DataFrame to list of dictionaries
+ for _, row in final_alloc.iterrows():
+ allocation = {
+ "ticker": row["Ticker"],
+ "allocation": float(row["Allocation (%)"]),
+ "yield": float(row["Yield (%)"]),
+ "price": float(row["Price"]),
+ "risk_level": row["Risk Level"]
+ }
+ portfolio_data["allocations"].append(allocation)
+
+ # Save to JSON file
+ file_path = portfolios_dir / f"{portfolio_name}.json"
+ with open(file_path, 'w') as f:
+ json.dump(portfolio_data, f, indent=2)
+
+ return True
+
+ except Exception as e:
+ st.error(f"Error saving portfolio: {str(e)}")
+ return False
+
+def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]:
+ """
+ Load portfolio allocation from a JSON file.
+
+ Args:
+ portfolio_name: Name of the portfolio to load
+
+ Returns:
+ Tuple containing:
+ - DataFrame with portfolio allocation
+ - Portfolio mode
+ - Target value
+ """
+ try:
+ # Check if portfolio exists
+ file_path = Path("portfolios") / f"{portfolio_name}.json"
+ if not file_path.exists():
+ st.error(f"Portfolio '{portfolio_name}' not found.")
+ return None, None, None
+
+ # Load portfolio data
+ with open(file_path, 'r') as f:
+ portfolio_data = json.load(f)
+
+ # Convert allocations to DataFrame
+ allocations = portfolio_data["allocations"]
+ df = pd.DataFrame(allocations)
+
+ # Rename columns to match expected format
+ df = df.rename(columns={
+ "allocation": "Allocation (%)",
+ "yield": "Yield (%)",
+ "price": "Price"
+ })
+
+ return df, portfolio_data["mode"], portfolio_data["target"]
+
+ except Exception as e:
+ st.error(f"Error loading portfolio: {str(e)}")
+ return None, None, None
+
+def list_saved_portfolios() -> List[str]:
+ """
+ List all saved portfolios.
+
+ Returns:
+ List of portfolio names
+ """
+ try:
+ portfolios_dir = Path("portfolios")
+ if not portfolios_dir.exists():
+ return []
+
+ # Get all JSON files in the portfolios directory
+ portfolio_files = list(portfolios_dir.glob("*.json"))
+
+ # Extract portfolio names from filenames
+ portfolio_names = [f.stem for f in portfolio_files]
+
+ return sorted(portfolio_names)
+
+ except Exception as e:
+ st.error(f"Error listing portfolios: {str(e)}")
+ return []
+
+def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
+ """
+ Allocate portfolio for income target.
+
+ Args:
+ df: DataFrame with ETF data
+ target: Monthly income target
+ etf_allocations: List of ETF allocations
+
+ Returns:
+ DataFrame with final allocation
+ """
+ try:
+ # Create final allocation DataFrame
+ final_alloc = df.copy()
+
+ # Set allocations
+ for alloc in etf_allocations:
+ mask = final_alloc["Ticker"] == alloc["ticker"]
+ final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
+
+ # Calculate required capital for income target
+ monthly_income = target
+ annual_income = monthly_income * 12
+ avg_yield = final_alloc["Yield (%)"].mean()
+ required_capital = (annual_income / avg_yield) * 100
+
+ # Calculate capital allocation and income
+ final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
+ final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
+ final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
+
+ return final_alloc
except Exception as e:
st.error(f"Error in income allocation: {str(e)}")
return None
-def allocate_for_capital(df: pd.DataFrame, capital: float, allocations: List[Dict]) -> pd.DataFrame:
- """Allocate a fixed amount of capital across ETFs and calculate resulting income."""
+def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
+ """
+ Allocate portfolio for capital target.
+
+ Args:
+ df: DataFrame with ETF data
+ initial_capital: Initial capital amount
+ etf_allocations: List of ETF allocations
+
+ Returns:
+ DataFrame with final allocation
+ """
try:
- # Store initial capital in session state
- st.session_state.initial_capital = capital
- st.session_state.mode = "Capital Target"
- st.session_state.target = capital
+ # Create final allocation DataFrame
+ final_alloc = df.copy()
- results = []
- total_income = 0
+ # Set allocations
+ for alloc in etf_allocations:
+ mask = final_alloc["Ticker"] == alloc["ticker"]
+ final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
- for _, row in df.iterrows():
- ticker = row["Ticker"]
- alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0)
- if alloc_pct == 0:
- continue
-
- # Calculate capital allocation
- allocated_capital = capital * alloc_pct
- shares = allocated_capital / row["Price"]
- income = shares * row["Dividend Rate"]
- total_income += income
-
- # Create result dictionary with all available data
- result = {
- "Ticker": ticker,
- "Yield (%)": row["Yield (%)"],
- "Dividend Rate": round(row["Dividend Rate"], 2),
- "Capital Allocated ($)": round(allocated_capital, 2),
- "Income Contributed ($)": round(income, 2),
- "Allocation (%)": round(alloc_pct * 100, 2),
- "Inception Date": row["Inception Date"],
- "Distribution Period": row.get("Distribution Period", "Unknown"),
- "Price": row["Price"]
- }
-
- # Add NAV data if available
- if "NAV" in row and row["NAV"] is not None:
- result["NAV"] = row["NAV"]
- if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None:
- result["Premium/Discount (%)"] = row["Premium/Discount (%)"]
-
- results.append(result)
-
- alloc_df = pd.DataFrame(results)
- alloc_df = assign_risk_level(alloc_df, allocations)
- return alloc_df
+ # Calculate capital allocation and income
+ final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital
+ final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
+ final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
+ return final_alloc
except Exception as e:
st.error(f"Error in capital allocation: {str(e)}")
return None
-# --- Portfolio Management Functions ---
-def recalculate_portfolio(allocations):
- """Recalculate portfolio with new allocations"""
- try:
- # Get the initial capital from session state
- initial_capital = st.session_state.get('initial_capital')
- if initial_capital is None:
- st.error("Initial capital not found. Please run the portfolio simulation first.")
- return None
-
- # Create a new DataFrame for the recalculated portfolio
- final_alloc = pd.DataFrame()
-
- # Add tickers and their allocations
- final_alloc["Ticker"] = list(allocations.keys())
- final_alloc["Allocation (%)"] = list(allocations.values())
-
- # Merge with the global df to get other information
- final_alloc = final_alloc.merge(df, on="Ticker", how="left")
-
- # Calculate capital allocation
- final_alloc["Capital Allocated ($)"] = final_alloc["Allocation (%)"] * initial_capital / 100
-
- # Calculate income contribution
- final_alloc["Income Contributed ($)"] = final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"] / 100
-
- # Store the recalculated portfolio in session state
- st.session_state.final_alloc = final_alloc
-
- return final_alloc
-
- except Exception as e:
- st.error(f"Error recalculating portfolio: {str(e)}")
- return None
-
-def update_allocation(ticker, new_alloc):
- """Update the allocation for a specific ticker."""
- st.session_state.etf_allocations[ticker] = new_alloc
-
-# --- DRIP Calculation Function ---
-def calculate_drip_growth(portfolio_df: pd.DataFrame, months: int = DRIP_FORECAST_MONTHS,
- erosion_type: str = "None", erosion_level: Any = 0) -> pd.DataFrame:
- """
- Calculate the growth of a portfolio with dividend reinvestment (DRIP) over time.
-
- Args:
- portfolio_df: DataFrame containing portfolio allocation data
- months: Number of months to forecast
- erosion_type: Type of erosion simulation ("None", "NAV & Yield Erosion")
- erosion_level: Erosion configuration (0 or dict with global and per-ticker settings)
-
- Returns:
- DataFrame with monthly portfolio growth data (exactly 'months' rows)
- """
- # Extract needed data
- initial_capital = portfolio_df["Capital Allocated ($)"].sum()
- tickers = portfolio_df["Ticker"].tolist()
-
- # Calculate monthly erosion rate(s) if applicable
- max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
-
- # Initialize erosion rates for each ticker
- ticker_nav_rates = {}
- ticker_yield_rates = {}
-
- # Handle different erosion configurations
- if erosion_type != "None" and isinstance(erosion_level, dict):
- # Check if using per-ticker rates
- if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level:
- # Get global defaults
- global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion
- global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion
-
- # Apply per-ticker rates where available, global rates otherwise
- for ticker in tickers:
- ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0})
- ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion
- ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion
- else:
- # Use global rates for all tickers
- global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion
- global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion
-
- for ticker in tickers:
- ticker_nav_rates[ticker] = global_nav
- ticker_yield_rates[ticker] = global_yield
- else:
- # No erosion
- for ticker in tickers:
- ticker_nav_rates[ticker] = 0
- ticker_yield_rates[ticker] = 0
-
- # Create a dictionary of ticker-specific data for easier access
- ticker_data = {}
- for _, row in portfolio_df.iterrows():
- ticker = row["Ticker"]
- ticker_data[ticker] = {
- "price": row["Price"],
- "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal
- "initial_shares": row["Capital Allocated ($)"] / row["Price"],
- "initial_allocation": row["Allocation (%)"] / 100, # Convert from % to decimal
- "distribution": row.get("Distribution Period", "Monthly")
- }
-
- # Initialize result data structure
- results = []
-
- # Initial portfolio state
- current_shares = {ticker: data["initial_shares"] for ticker, data in ticker_data.items()}
- current_prices = {ticker: data["price"] for ticker, data in ticker_data.items()}
- current_yields = {ticker: data["yield_annual"] for ticker, data in ticker_data.items()}
- current_total_value = initial_capital
-
- # Calculate the monthly dividend for each ETF based on distribution period
- dividend_frequency = {
- "Monthly": 12,
- "Quarterly": 4,
- "Semi-Annually": 2,
- "Annually": 1,
- "Unknown": 12 # Default to monthly if unknown
- }
-
- # Calculate growth for each month (exactly 'months' total rows)
- cumulative_income = 0
-
- for month in range(1, months + 1):
- # Calculate expected monthly income based on current portfolio and yields
- monthly_income = sum(
- (current_yields[ticker] / 12) *
- (current_shares[ticker] * current_prices[ticker])
- for ticker in tickers
- )
-
- # Store month data (this reflects the portfolio at the START of the month)
- month_data = {
- "Month": month,
- "Total Value ($)": current_total_value,
- "Monthly Income ($)": monthly_income,
- "Cumulative Income ($)": cumulative_income
- }
-
- # Add shares and current price/yield for each ticker
- for ticker in tickers:
- month_data[f"{ticker} Shares"] = current_shares[ticker]
- month_data[f"{ticker} Price ($)"] = current_prices[ticker]
- month_data[f"{ticker} Yield (%)"] = current_yields[ticker] * 100 # Convert back to percentage
-
- results.append(month_data)
-
- # After recording the month's data, apply erosion and calculate dividends
- # for the current month
-
- # Apply NAV and yield erosion to each ticker
- for ticker in tickers:
- # Apply NAV erosion
- if ticker_nav_rates[ticker] > 0:
- current_prices[ticker] *= (1 - ticker_nav_rates[ticker])
-
- # Apply yield erosion
- if ticker_yield_rates[ticker] > 0:
- current_yields[ticker] *= (1 - ticker_yield_rates[ticker])
-
- # Calculate dividends for each ETF
- month_dividends = {}
- for ticker, data in ticker_data.items():
- freq = dividend_frequency[data["distribution"]]
- # Check if dividend is paid this month
- if month % (12 / freq) == 0:
- # Annual dividend / frequency = dividend per distribution
- # Use current yield if yield erosion is being simulated
- if ticker_yield_rates[ticker] > 0:
- dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker]
- else:
- dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker]
- else:
- dividend = 0
- month_dividends[ticker] = dividend
-
- # Total dividends for this month
- total_month_dividend = sum(month_dividends.values())
- cumulative_income += total_month_dividend
-
- # Only reinvest for the next month if we're not at the last month
- if month < months:
- # Reinvest dividends proportionally to original allocation
- for ticker, data in ticker_data.items():
- # Calculate new shares purchased with reinvested dividends
- # Use current price for calculation
- if current_prices[ticker] > 0: # Avoid division by zero
- new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker]
- current_shares[ticker] += new_shares
-
- # Recalculate portfolio value with updated shares and prices
- current_total_value = sum(current_shares[ticker] * current_prices[ticker] for ticker in tickers)
-
- return pd.DataFrame(results)
-
-# --- AI Erosion Risk Assessment ---
-def analyze_etf_erosion_risk(tickers: List[str], debug: bool = False) -> pd.DataFrame:
- """
- Analyze historical ETF data to estimate realistic NAV and yield erosion likelihood.
-
- Args:
- tickers: List of ETF tickers to analyze
- debug: Whether to show debug information
-
- Returns:
- DataFrame with erosion risk assessment for each ETF
- """
- risk_data = []
- current_date = pd.Timestamp.now(tz='UTC')
-
- for ticker in tickers:
- try:
- yf_ticker = yf.Ticker(ticker)
-
- # Get basic info with retry
- info, _ = fetch_with_retry(lambda: yf_ticker.info)
- if not info:
- continue
-
- # Get historical price data (5 years or since inception)
- hist, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y"))
- if hist.empty:
- continue
-
- # Check ETF age
- inception_date = info.get("fundInceptionDate")
- etf_age_years = None
- if inception_date:
- try:
- inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True)
- etf_age_years = (current_date - inception_date_dt).days / 365.25
- except:
- pass
-
- # Get historical dividends
- dividends, _ = fetch_with_retry(lambda: yf_ticker.dividends)
- if dividends is None or dividends.empty:
- continue
-
- # Calculate historical metrics
-
- # 1. NAV Erosion Analysis (using price as proxy for NAV)
- # Calculate max drawdowns in different timeframes
- rolling_max = hist["Close"].rolling(window=252, min_periods=1).max()
- daily_drawdown = hist["Close"] / rolling_max - 1.0
- max_drawdown_1y = abs(daily_drawdown[-252:].min()) if len(daily_drawdown) >= 252 else None
-
- # Calculate annualized volatility
- returns = hist["Close"].pct_change().dropna()
- volatility = returns.std() * (252**0.5) # Annualized
-
- # 2. Yield Erosion Analysis
- # Convert to pandas Series with a DatetimeIndex
- if not isinstance(dividends, pd.Series):
- dividends = dividends.reset_index()
- dividends.columns = ["date", "amount"]
- dividends = dividends.set_index("date")["amount"]
-
- # Calculate rolling 12-month dividend total
- monthly_div = dividends.resample('M').sum()
- rolling_12m_div = monthly_div.rolling(window=12, min_periods=6).sum()
-
- # Calculate the trend over time
- if len(rolling_12m_div) > 12:
- earliest_ttm = rolling_12m_div[11]
- latest_ttm = rolling_12m_div[-1]
- if earliest_ttm > 0:
- dividend_trend = (latest_ttm / earliest_ttm) - 1
- else:
- dividend_trend = 0
-
- # Calculate worst dividend reduction
- div_changes = rolling_12m_div.pct_change()
- worst_div_change = div_changes.min() if not div_changes.empty else 0
- else:
- dividend_trend = None
- worst_div_change = None
-
- # 3. Risk Assessment
- # Determine if ETF is new or established
- is_new = etf_age_years is not None and etf_age_years < 2
-
- # Assign erosion risk levels
- if is_new:
- # For new ETFs, use higher default risk
- nav_erosion_risk = 5 # Medium
- yield_erosion_risk = 6 # Medium-high
- nav_risk_reason = "New ETF without significant history"
- yield_risk_reason = "New ETF dividend pattern not established"
- else:
- # For established ETFs, base on historical data
-
- # NAV Erosion Risk (0-9 scale)
- if max_drawdown_1y is not None:
- if max_drawdown_1y > 0.40:
- nav_erosion_risk = 7 # High risk
- nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year"
- elif max_drawdown_1y > 0.25:
- nav_erosion_risk = 5 # Medium risk
- nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year"
- elif max_drawdown_1y > 0.15:
- nav_erosion_risk = 3 # Lower-medium risk
- nav_risk_reason = f"Moderate {max_drawdown_1y:.1%} max drawdown in the past year"
- else:
- nav_erosion_risk = 2 # Low risk
- nav_risk_reason = f"Limited {max_drawdown_1y:.1%} max drawdown in the past year"
- else:
- nav_erosion_risk = 4 # Default medium-low
- nav_risk_reason = "Insufficient price history"
-
- # Yield Erosion Risk (0-9 scale)
- if worst_div_change is not None and not pd.isna(worst_div_change):
- if worst_div_change < -0.30:
- yield_erosion_risk = 8 # Very high risk
- yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}"
- elif worst_div_change < -0.15:
- yield_erosion_risk = 6 # High risk
- yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}"
- elif worst_div_change < -0.05:
- yield_erosion_risk = 4 # Medium risk
- yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}"
- elif dividend_trend is not None and dividend_trend < -0.10:
- yield_erosion_risk = 5 # Medium risk due to declining trend
- yield_risk_reason = f"Dividend trend declined by {abs(dividend_trend):.1%}"
- elif dividend_trend is not None and dividend_trend > 0.10:
- yield_erosion_risk = 2 # Low risk due to growing trend
- yield_risk_reason = f"Dividend trend growing by {dividend_trend:.1%}"
- else:
- yield_erosion_risk = 3 # Low-medium risk
- yield_risk_reason = "Stable dividend history"
- else:
- yield_erosion_risk = 4 # Default medium
- yield_risk_reason = "Insufficient dividend history"
-
- # Adjust for volatility
- if volatility > 0.40:
- nav_erosion_risk = min(9, nav_erosion_risk + 2) # Increase risk for high volatility
- nav_risk_reason += f" with high volatility ({volatility:.1%})"
- elif volatility > 0.25:
- nav_erosion_risk = min(9, nav_erosion_risk + 1) # Slightly increase risk
- nav_risk_reason += f" with elevated volatility ({volatility:.1%})"
-
- # Convert to annual erosion percentage estimate
- nav_erosion_pct = nav_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion
- yield_erosion_pct = yield_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion
-
- risk_data.append({
- "Ticker": ticker,
- "NAV Erosion Risk (0-9)": nav_erosion_risk,
- "Yield Erosion Risk (0-9)": yield_erosion_risk,
- "Estimated Annual NAV Erosion": f"{nav_erosion_pct:.1%}",
- "Estimated Annual Yield Erosion": f"{yield_erosion_pct:.1%}",
- "NAV Risk Explanation": nav_risk_reason,
- "Yield Risk Explanation": yield_risk_reason,
- "ETF Age (Years)": etf_age_years,
- "Is New ETF": is_new,
- "Max Drawdown (1Y)": max_drawdown_1y,
- "Volatility (Annual)": volatility,
- "Dividend Trend": dividend_trend
- })
-
- except Exception as e:
- if debug:
- st.error(f"Error analyzing {ticker}: {str(e)}")
- continue
-
- return pd.DataFrame(risk_data)
-
-# --- Streamlit Setup ---
-st.title("💸 ETF Dividend Portfolio Builder")
-
-# Initialize session state for real-time updates
-if "simulation_run" not in st.session_state:
+def reset_simulation():
+ """Reset all simulation data and state."""
st.session_state.simulation_run = False
-if "df_data" not in st.session_state:
st.session_state.df_data = None
-if "edited_allocations" not in st.session_state:
- st.session_state.edited_allocations = None
-if "show_recalculation" not in st.session_state:
- st.session_state.show_recalculation = False
-if "simulation_mode" not in st.session_state:
- st.session_state.simulation_mode = "income_target" # Default mode
-if "drip_enabled" not in st.session_state:
- st.session_state.drip_enabled = False # Default DRIP setting
-if "erosion_level" not in st.session_state:
- st.session_state.erosion_level = 0 # Default erosion level
-if "erosion_type" not in st.session_state:
- st.session_state.erosion_type = "None" # Default erosion type
-if "run_fmp_test" not in st.session_state:
- st.session_state.run_fmp_test = False # Flag for FMP API test
-if "fmp_api_key" not in st.session_state:
- st.session_state.fmp_api_key = os.environ.get("FMP_API_KEY", "") # FMP API key
-if "api_calls" not in st.session_state:
- st.session_state.api_calls = 0 # API call counter
-if "force_refresh_data" not in st.session_state:
- st.session_state.force_refresh_data = False # Flag to force refresh data
-
-# Radio button to select simulation mode
-simulation_mode = st.sidebar.radio(
- "Choose Simulation Mode",
- options=["Income Target Mode", "Capital Investment Mode"],
- index=0 if st.session_state.simulation_mode == "income_target" else 1,
- help="Choose whether to start with a monthly income goal or a fixed capital amount"
-)
-
-# Update session state with selected mode
-st.session_state.simulation_mode = "income_target" if simulation_mode == "Income Target Mode" else "capital_investment"
-
-# Add DRIP toggle
-drip_enabled = st.sidebar.toggle(
- "Enable Dividend Reinvestment (DRIP)",
- value=st.session_state.drip_enabled,
- help="When enabled, shows how reinvesting dividends compounds growth over time instead of taking income"
-)
-st.session_state.drip_enabled = drip_enabled
-
-# Initialize erosion_type from session state
-erosion_type = st.session_state.erosion_type
-
-# Add erosion simulation slider
-st.sidebar.subheader("Portfolio Risk Simulation")
-erosion_enabled = st.sidebar.checkbox(
- "Enable NAV & Yield Erosion",
- value=erosion_type != "None",
- help="Simulate price drops and dividend cuts over time"
-)
-
-# Define max erosion constants
-MAX_EROSION_LEVEL = 9
-max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
-
-if erosion_enabled:
- # Store the previous erosion type
- erosion_type = "NAV & Yield Erosion"
- st.session_state.erosion_type = erosion_type
-
- # Initialize per-ticker erosion settings if not already in session state
- if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict):
- st.session_state.per_ticker_erosion = {}
-
- # Create advanced per-ticker erosion controls
- st.sidebar.subheader("ETF Erosion Settings")
- st.sidebar.write("Set custom erosion levels for each ETF")
-
- # Use the ETFs from the final allocation if simulation has run
- if st.session_state.simulation_run and hasattr(st.session_state, 'final_alloc'):
- tickers = st.session_state.final_alloc["Ticker"].unique().tolist()
- # Otherwise use the ETFs from user input
- elif "etf_allocations" in st.session_state:
- tickers = [etf["ticker"] for etf in st.session_state.etf_allocations]
- else:
- tickers = []
-
- # Initialize or update per-ticker erosion settings
- per_ticker_erosion = {}
-
- if tickers:
- # Create a DataFrame for the per-ticker settings
- per_ticker_data = []
-
- for ticker in tickers:
- # Get existing settings or use defaults (5 = medium erosion)
- existing_settings = st.session_state.per_ticker_erosion.get(ticker, {
- "nav": 5,
- "yield": 5
- })
-
- per_ticker_data.append({
- "Ticker": ticker,
- "NAV Erosion (0-9)": existing_settings["nav"],
- "Yield Erosion (0-9)": existing_settings["yield"],
- })
-
- # Create a data editor for the per-ticker settings
- per_ticker_df = pd.DataFrame(per_ticker_data)
- edited_df = st.sidebar.data_editor(
- per_ticker_df,
- column_config={
- "Ticker": st.column_config.TextColumn("Ticker", disabled=True),
- "NAV Erosion (0-9)": st.column_config.NumberColumn(
- "NAV Erosion (0-9)",
- min_value=0,
- max_value=MAX_EROSION_LEVEL,
- step=1,
- format="%d"
- ),
- "Yield Erosion (0-9)": st.column_config.NumberColumn(
- "Yield Erosion (0-9)",
- min_value=0,
- max_value=MAX_EROSION_LEVEL,
- step=1,
- format="%d"
- ),
- },
- use_container_width=True,
- num_rows="fixed",
- hide_index=True,
- key="per_ticker_editor"
- )
-
- # Save the edited values back to session state
- for _, row in edited_df.iterrows():
- ticker = row["Ticker"]
- per_ticker_erosion[ticker] = {
- "nav": row["NAV Erosion (0-9)"],
- "yield": row["Yield Erosion (0-9)"]
- }
-
- st.session_state.per_ticker_erosion = per_ticker_erosion
-
- # Calculate some example annual erosion rates for display
- max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
-
- # Show sample erosion rates for different levels
- st.sidebar.info("""
- **Erosion Level Guide:**
- - Level 0: No erosion (0%)
- - Level 5: Medium erosion (~40% annually)
- - Level 9: Severe erosion (~90% annually)
- """)
- else:
- st.sidebar.write("Add ETFs to enable per-ETF erosion settings")
-
- # Always use per-ticker settings
- st.session_state.use_per_ticker_erosion = True
-
- # Store erosion settings for DRIP calculation
- erosion_level = {
- "global": {
- "nav": 5, # Default medium level for global fallback
- "yield": 5
- },
- "per_ticker": st.session_state.per_ticker_erosion,
- "use_per_ticker": True
- }
-
- # Update session state erosion level to match current settings
- st.session_state.erosion_level = erosion_level
-else:
- # No erosion
- erosion_type = "None"
- st.session_state.erosion_type = erosion_type
- erosion_level = 0
-
-# Display appropriate input field based on mode
-if st.session_state.simulation_mode == "income_target":
- monthly_target = st.sidebar.number_input(
- "Monthly Income Target ($)",
- value=1500,
- min_value=100,
- help="Desired monthly dividend income. We'll calculate the required capital."
- )
- initial_capital = None
- ANNUAL_TARGET = monthly_target * 12
-else:
- initial_capital = st.sidebar.number_input(
- "Initial Capital ($)",
- value=250000,
- min_value=1000,
- help="Amount of capital you want to invest. We'll calculate the expected monthly income."
- )
- monthly_target = None
- ANNUAL_TARGET = None # Will be calculated based on allocations and yields
-
-# Add PDF export information
-with st.sidebar.expander("PDF Export Information"):
- st.info("""
- For PDF export functionality, you'll need to install wkhtmltopdf on your system:
-
- - **Windows**: Download from [wkhtmltopdf.org](https://wkhtmltopdf.org/downloads.html) and install
- - **Mac**: Run `brew install wkhtmltopdf` in Terminal
- - **Linux**: Run `sudo apt-get install wkhtmltopdf` for Debian/Ubuntu or `sudo yum install wkhtmltopdf` for CentOS/RHEL
-
- After installation, restart the app for PDF export to work correctly.
- """)
-
-# Manual ETF allocation input
-st.sidebar.header("ETF Allocation")
-if "etf_allocations" not in st.session_state:
- st.session_state.etf_allocations = []
-
-# Only show input fields if not in real-time adjustment mode or if no ETFs added yet
-if not st.session_state.simulation_run or not st.session_state.etf_allocations:
- col1, col2 = st.sidebar.columns([2, 1])
- with col1:
- new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., JEPI)")
- with col2:
- new_allocation = st.number_input(
- "Allocation (%)",
- min_value=0.0,
- max_value=100.0,
- value=0.0,
- step=1.0,
- help="Percentage of total capital"
- )
-
- # Add button to add ETF
- add_etf_button = st.sidebar.button("ADD ETF", use_container_width=True)
- if add_etf_button and new_ticker and new_allocation > 0:
- # Check if ticker already exists
- if any(etf["ticker"] == new_ticker.upper() for etf in st.session_state.etf_allocations):
- st.sidebar.warning(f"{new_ticker.upper()} is already in your portfolio. Please adjust its allocation instead.")
- else:
- # Validate ticker exists and has data before adding
- validation_status = st.sidebar.empty()
- validation_status.info(f"Validating {new_ticker.upper()}...")
-
- is_valid = False
-
- # Check ticker format first
- if not re.match(r'^[A-Z]{1,7}$', new_ticker.upper()):
- validation_status.error(f"Invalid ticker: {new_ticker.upper()}. Must be 1-7 uppercase letters.")
- else:
- try:
- # Check if ticker has data
- yf_ticker = yf.Ticker(new_ticker.upper())
- info, _ = fetch_with_retry(lambda: yf_ticker.info)
-
- if info and info.get("previousClose"):
- is_valid = True
- validation_status.success(f"Validated {new_ticker.upper()} successfully.")
- else:
- # If YFinance fails, try FMP API for high-yield ETFs if enabled
- if USE_FMP_API and st.session_state.get("fmp_api_key"):
- fmp_data, _ = fetch_fmp_data(new_ticker.upper())
- if fmp_data["quote"] and len(fmp_data["quote"]) > 0:
- is_valid = True
- validation_status.success(f"Validated {new_ticker.upper()} using FMP API data.")
-
- if not is_valid:
- validation_status.error(f"Could not validate {new_ticker.upper()}. No price data available.")
- except Exception as e:
- validation_status.error(f"Error validating {new_ticker.upper()}: {str(e)}")
-
- if is_valid:
- # Add new ETF to allocations
- st.session_state.etf_allocations.append({
- "ticker": new_ticker.upper(),
- "allocation": new_allocation
- })
- st.sidebar.success(f"Added {new_ticker.upper()} with {new_allocation}% allocation.")
- st.rerun()
- elif add_etf_button:
- # Show error if missing data
- if not new_ticker:
- st.sidebar.error("Please enter an ETF ticker.")
- if new_allocation <= 0:
- st.sidebar.error("Allocation must be greater than 0%.")
-
-# Calculate total allocation after potential addition
-total_alloc = sum(etf["allocation"] for etf in st.session_state.etf_allocations) if st.session_state.etf_allocations else 0
-
-# Display ETF allocations
-if st.session_state.etf_allocations:
- st.sidebar.subheader("Selected ETFs")
- alloc_df = pd.DataFrame(st.session_state.etf_allocations)
- alloc_df["Remove"] = [st.button(f"Remove {etf['ticker']}", key=f"remove_{i}") for i, etf in enumerate(st.session_state.etf_allocations)]
- st.sidebar.dataframe(alloc_df[["ticker", "allocation"]], use_container_width=True)
- st.sidebar.metric("Total Allocation (%)", f"{total_alloc:.2f}")
- if total_alloc > 100:
- st.error(f"Total allocation is {total_alloc:.2f}%, which exceeds 100%. Please adjust allocations.")
-
-# Advanced Options section in sidebar
-with st.sidebar.expander("Advanced Options"):
- # Option to toggle FMP API usage
- use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API,
- help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs")
- if use_fmp_api != USE_FMP_API:
- # Update global setting if changed
- globals()["USE_FMP_API"] = use_fmp_api
- st.success("FMP API usage setting updated")
-
- # Add FMP API Key input
- fmp_api_key = st.text_input(
- "FMP API Key",
- value=os.environ.get("FMP_API_KEY", st.session_state.get("fmp_api_key", "")),
- type="password",
- help="Enter your Financial Modeling Prep API key for more accurate yield data."
- )
- if fmp_api_key:
- st.session_state.fmp_api_key = fmp_api_key
-
- # Add cache controls
- st.subheader("Cache Settings")
-
- # Display cache statistics
- cache_stats = get_cache_stats()
- st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)")
-
- # Force refresh option
- st.session_state.force_refresh_data = st.checkbox(
- "Force refresh data (ignore cache)",
- value=st.session_state.get("force_refresh_data", False),
- help="When enabled, always fetch fresh data from APIs"
- )
-
- # Cache clearing options
- col1, col2 = st.columns(2)
- with col1:
- if st.button("Clear All Cache", key="clear_all_cache"):
- clear_cache()
- st.success("All cache files cleared!")
- st.session_state.api_calls = 0
-
- with col2:
- ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker")
- if st.button("Clear") and ticker_to_clear:
- clear_cache(ticker_to_clear)
- st.success(f"Cache cleared for {ticker_to_clear.upper()}")
-
- # Show API call counter
- st.write(f"API calls this session: {st.session_state.api_calls}")
-
- # Add button to run the FMP API test
- if st.button("Test FMP API Connection"):
- # Check if API key is available
- if not fmp_api_key and not os.environ.get("FMP_API_KEY"):
- st.error("Please provide an FMP API key first.")
- else:
- st.info("Opening FMP API test panel...")
- # Set a flag to trigger the test in the main UI
- st.session_state.run_fmp_test = True
- st.rerun()
-
- # Add option for debug mode and parallel processing
- debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.")
- parallel_processing = st.checkbox("Enable Parallel Processing", value=True,
- help="Fetch data for multiple ETFs simultaneously")
-
-api_key = st.sidebar.text_input(
- "OpenAI API Key",
- type="password",
- help="Enter your OpenAI API key for ChatGPT summaries."
-)
-
-# Show simulation button only initially, hide after simulation is run
-if not st.session_state.simulation_run:
- run_simulation = st.sidebar.button("Run Simulation", help="Launch capital allocation simulation", disabled=abs(total_alloc - 100) > 1 or total_alloc == 0)
-else:
- run_simulation = False
- st.sidebar.success("Simulation ready - adjust allocations in the table below")
- if st.sidebar.button("Reset Simulation", help="Start over with new ETFs"):
- st.session_state.simulation_run = False
- st.session_state.df_data = None
- st.session_state.edited_allocations = None
- st.session_state.show_recalculation = False
- st.rerun()
-
-refresh_button = st.sidebar.button("Refresh Data", help="Clear cache and fetch new data")
-
-# Handle remove buttons
-for i, etf in enumerate(st.session_state.etf_allocations.copy()):
- if st.session_state.get(f"remove_{i}"):
- st.session_state.etf_allocations.pop(i)
- st.rerun()
-
-# --- Run App Logic ---
-if refresh_button:
- st.session_state.force_refresh_data = True # Force refresh when manually requested
- st.cache_data.clear()
+ st.session_state.final_alloc = None
+ st.session_state.mode = 'Capital Target'
+ st.session_state.target = 0
+ st.session_state.initial_capital = 0
+ st.session_state.enable_drip = False
+ st.session_state.enable_erosion = False
st.rerun()
-# Check if FMP API test should be run
-if st.session_state.get("run_fmp_test", False):
- # Display the FMP API test UI
- st.header("🔍 FMP API Test Tool")
- st.write("""
- This tool allows you to test the Financial Modeling Prep API responses for ETF yield data.
- Use this to verify accurate dividend yield information, especially for high-yield ETFs.
- """)
-
- # Clear the flag so it won't show again unless requested
- st.session_state.run_fmp_test = False
-
- # Run the API test function
- test_fmp_api()
-
- # Stop execution to prevent the main app from rendering
- st.stop()
-
-# Initial simulation run
-if run_simulation:
+def test_fmp_connection():
+ """Test the FMP API connection and display status."""
try:
- with st.spinner("Validating ETFs..."):
- tickers = validate_etf_input(st.session_state.etf_allocations)
- if not tickers:
- st.stop()
-
- with st.spinner("Fetching ETF data..."):
- df, skipped = fetch_etfs(",".join(tickers), debug_mode, parallel_processing)
- # Store data in session state for reuse
- st.session_state.df_data = df
-
- if not df.empty:
- # Run appropriate allocation based on mode
- if st.session_state.simulation_mode == "income_target":
- final_alloc = allocate_for_income(df, ANNUAL_TARGET, st.session_state.etf_allocations)
- else:
- final_alloc = allocate_for_capital(df, initial_capital, st.session_state.etf_allocations)
-
- if final_alloc.empty:
- st.error("Failed to allocate capital. Check ETF data or allocations.")
- st.stop()
-
- # Mark simulation as run successfully
- st.session_state.simulation_run = True
- st.session_state.final_alloc = final_alloc
- else:
- st.error("❌ No valid ETF data retrieved. Check tickers or enable Debug Mode for details.")
- st.session_state.simulation_run = False
- if skipped:
- st.subheader("🛑 Skipped Tickers")
- st.write("The following tickers could not be processed. Enable Debug Mode for detailed logs.")
- st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]).drop(columns=["Debug Info"]), use_container_width=True)
-
+ if not FMP_API_KEY:
+ return False, "No API key found"
+
+ session = get_fmp_session()
+ test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}"
+ response = session.get(test_url)
+
+ if response.status_code == 200:
+ data = response.json()
+ if data and isinstance(data, list) and len(data) > 0:
+ return True, "Connected"
+ return False, f"Error: {response.status_code}"
except Exception as e:
- st.error(f"Simulation failed: {str(e)}. Please check inputs or try again.")
- st.session_state.simulation_run = False
+ return False, f"Error: {str(e)}"
+
+# Set page config
+st.set_page_config(
+ page_title="ETF Portfolio Builder",
+ page_icon="📈",
+ layout="wide",
+ initial_sidebar_state="expanded"
+)
+
+# Initialize session state variables
+if 'simulation_run' not in st.session_state:
+ st.session_state.simulation_run = False
+if 'df_data' not in st.session_state:
+ st.session_state.df_data = None
+if 'final_alloc' not in st.session_state:
+ st.session_state.final_alloc = None
+if 'mode' not in st.session_state:
+ st.session_state.mode = 'Capital Target'
+if 'target' not in st.session_state:
+ st.session_state.target = 0
+if 'initial_capital' not in st.session_state:
+ st.session_state.initial_capital = 0
+if 'enable_drip' not in st.session_state:
+ st.session_state.enable_drip = False
+if 'enable_erosion' not in st.session_state:
+ st.session_state.enable_erosion = False
+
+# Main title
+st.title("📈 ETF Portfolio Builder")
+
+# Sidebar for simulation parameters
+with st.sidebar:
+ st.header("Simulation Parameters")
+
+ # Add refresh data button at the top
+ if st.button("🔄 Refresh Data", use_container_width=True):
+ st.info("Refreshing ETF data...")
+ # Add your data refresh logic here
+ st.success("Data refreshed successfully!")
+
+ # Mode selection
+ simulation_mode = st.radio(
+ "Select Simulation Mode",
+ ["Capital Target", "Income Target"]
+ )
+
+ if simulation_mode == "Income Target":
+ monthly_target = st.number_input(
+ "Monthly Income Target ($)",
+ min_value=100.0,
+ max_value=100000.0,
+ value=1000.0,
+ step=100.0
+ )
+ ANNUAL_TARGET = monthly_target * 12
+ else:
+ initial_capital = st.number_input(
+ "Initial Capital ($)",
+ min_value=1000.0,
+ max_value=1000000.0,
+ value=100000.0,
+ step=1000.0
+ )
+
+ # Risk tolerance
+ risk_tolerance = st.select_slider(
+ "Risk Tolerance",
+ options=["Conservative", "Moderate", "Aggressive"],
+ value="Moderate"
+ )
+
+ # Additional options
+ st.subheader("Additional Options")
+
+ # DRIP option
+ enable_drip = st.radio(
+ "Enable Dividend Reinvestment (DRIP)",
+ ["Yes", "No"],
+ index=1
+ )
+
+ # Erosion options
+ enable_erosion = st.radio(
+ "Enable NAV & Yield Erosion",
+ ["Yes", "No"],
+ index=1
+ )
+
+ # ETF Selection
+ st.subheader("ETF Selection")
+
+ # Create a form for ETF selection
+ with st.form("etf_selection_form"):
+ # Number of ETFs
+ num_etfs = st.number_input("Number of ETFs", min_value=1, max_value=10, value=3, step=1)
+
+ # Create columns for ETF inputs
+ etf_inputs = []
+ for i in range(num_etfs):
+ ticker = st.text_input(f"ETF {i+1} Ticker", key=f"ticker_{i}")
+ etf_inputs.append({"ticker": ticker})
+
+ # Submit button
+ submitted = st.form_submit_button("Run Portfolio Simulation", type="primary")
+
+ if submitted:
+ try:
+ # Store parameters in session state
+ st.session_state.mode = simulation_mode
+ st.session_state.enable_drip = enable_drip == "Yes"
+ st.session_state.enable_erosion = enable_erosion == "Yes"
+
+ if simulation_mode == "Income Target":
+ st.session_state.target = monthly_target
+ else:
+ st.session_state.target = initial_capital
+ st.session_state.initial_capital = initial_capital
+
+ # Run simulation
+ df_data, final_alloc = run_portfolio_simulation(
+ simulation_mode.lower().replace(" ", "_"),
+ st.session_state.target,
+ risk_tolerance,
+ etf_inputs,
+ st.session_state.enable_drip,
+ st.session_state.enable_erosion
+ )
+
+ # Store results in session state
+ st.session_state.simulation_run = True
+ st.session_state.df_data = df_data
+ st.session_state.final_alloc = final_alloc
+
+ st.success("Portfolio simulation completed!")
+ st.rerun()
+
+ except Exception as e:
+ st.error(f"Error running simulation: {str(e)}")
+
+ # Add reset simulation button at the bottom of sidebar
+ if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"):
+ reset_simulation()
+
+ # Add FMP connection status to the navigation bar
+ st.sidebar.markdown("---")
+ st.sidebar.subheader("FMP API Status")
+ connection_status, message = test_fmp_connection()
+ if connection_status:
+ st.sidebar.success(f"✅ FMP API: {message}")
+ else:
+ st.sidebar.error(f"❌ FMP API: {message}")
# Display results and interactive allocation adjustment UI after simulation is run
if st.session_state.simulation_run and st.session_state.df_data is not None:
@@ -2536,13 +791,48 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
portfolio_summary(final_alloc)
# Display mode-specific information
- if st.session_state.simulation_mode == "income_target":
+ if st.session_state.mode == "Income Target":
st.info(f"🎯 **Income Target Mode**: You need ${final_alloc['Capital Allocated ($)'].sum():,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).")
else:
annual_income = final_alloc["Income Contributed ($)"].sum()
monthly_income = annual_income / 12
st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).")
+ # Add save/load section
+ st.subheader("💾 Save/Load Portfolio")
+
+ # Create two columns for save and load
+ save_col, load_col = st.columns(2)
+
+ with save_col:
+ st.write("Save current portfolio")
+ portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name")
+ if st.button("Save Portfolio", key="save_portfolio"):
+ if portfolio_name:
+ if save_portfolio(portfolio_name, final_alloc,
+ st.session_state.mode,
+ st.session_state.target):
+ st.success(f"Portfolio '{portfolio_name}' saved successfully!")
+ else:
+ st.warning("Please enter a portfolio name.")
+
+ with load_col:
+ st.write("Load saved portfolio")
+ if st.button("Show Saved Portfolios", key="show_portfolios"):
+ saved_portfolios = list_saved_portfolios()
+ if saved_portfolios:
+ selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio")
+ if st.button("Load Portfolio", key="load_portfolio_btn"):
+ loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio)
+ if loaded_df is not None:
+ st.session_state.final_alloc = loaded_df
+ st.session_state.mode = loaded_mode
+ st.session_state.target = loaded_target
+ st.success(f"Portfolio '{selected_portfolio}' loaded successfully!")
+ st.rerun()
+ else:
+ st.info("No saved portfolios found.")
+
# Display full detailed allocation table
st.subheader("📊 Capital Allocation Details")
@@ -2567,7 +857,7 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
"Allocation (%)",
min_value=0.0,
max_value=100.0,
- step=1.0,
+ step=0.1,
format="%.1f"
),
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
@@ -2581,11 +871,15 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
# Calculate total allocation
total_alloc = edited_df["Allocation (%)"].sum()
- # Display total allocation
- st.metric("Total Allocation (%)", f"{total_alloc:.2f}",
- delta=f"{total_alloc - 100:.2f}" if abs(total_alloc - 100) > 0.01 else None)
+ # Display total allocation with color coding
+ if abs(total_alloc - 100) <= 0.1:
+ st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None)
+ else:
+ st.metric("Total Allocation (%)", f"{total_alloc:.2f}",
+ delta=f"{total_alloc - 100:.2f}",
+ delta_color="off")
- if abs(total_alloc - 100) > 0.01:
+ if abs(total_alloc - 100) > 0.1:
st.warning("Total allocation should be 100%")
# Create columns for quick actions
@@ -2602,7 +896,7 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
# Submit button for manual edits
submitted = st.form_submit_button("Update Allocations",
- disabled=abs(total_alloc - 100) > 1,
+ disabled=abs(total_alloc - 100) > 0.1,
type="primary",
use_container_width=True)
@@ -2616,9 +910,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()]
# Get the mode and target from session state
- mode = st.session_state.get('mode', 'Capital Target')
- target = st.session_state.get('target', 0)
- initial_capital = st.session_state.get('initial_capital', 0)
+ mode = st.session_state.mode
+ target = st.session_state.target
+ initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
@@ -2646,9 +940,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
- mode = st.session_state.get('mode', 'Capital Target')
- target = st.session_state.get('target', 0)
- initial_capital = st.session_state.get('initial_capital', 0)
+ mode = st.session_state.mode
+ target = st.session_state.target
+ initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
@@ -2677,9 +971,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation})
# Get the mode and target from session state
- mode = st.session_state.get('mode', 'Capital Target')
- target = st.session_state.get('target', 0)
- initial_capital = st.session_state.get('initial_capital', 0)
+ mode = st.session_state.mode
+ target = st.session_state.target
+ initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
@@ -2704,9 +998,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
- mode = st.session_state.get('mode', 'Capital Target')
- target = st.session_state.get('target', 0)
- initial_capital = st.session_state.get('initial_capital', 0)
+ mode = st.session_state.mode
+ target = st.session_state.target
+ initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
@@ -2719,654 +1013,4 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
st.success("Portfolio adjusted to focus on capital!")
st.rerun()
except Exception as e:
- st.error(f"Error focusing on capital: {str(e)}")
-
- # Display charts
- col1, col2 = st.columns(2)
- with col1:
- fig1 = px.bar(
- final_alloc,
- x="Ticker",
- y="Capital Allocated ($)",
- title="Capital Allocation by ETF",
- template="plotly_dark",
- hover_data=["Yield (%)", "Income Contributed ($)", "Allocation (%)", "Risk Level"],
- labels={"Capital Allocated ($)": "Capital ($)"}
- )
- fig1.update_traces(marker_color="#1f77b4")
- st.plotly_chart(fig1, use_container_width=True)
- with col2:
- fig2 = px.bar(
- final_alloc,
- x="Ticker",
- y="Income Contributed ($)",
- title="Income Contribution by ETF",
- template="plotly_dark",
- hover_data=["Yield (%)", "Capital Allocated ($)", "Allocation (%)", "Risk Level"],
- labels={"Income Contributed ($)": "Income ($)"}
- )
- fig2.update_traces(marker_color="#ff7f0e")
- st.plotly_chart(fig2, use_container_width=True)
-
- # Display NAV Premium/Discount chart if data is available
- st.subheader("📈 NAV Premium/Discount")
- premium_discount_chart(final_alloc["Ticker"].tolist(), df, debug_mode)
-
- # Display trend charts
- trend_tabs = st.tabs(["📉 Yield Trends", "📊 NAV Trends"])
-
- with trend_tabs[0]:
- yield_chart(final_alloc["Ticker"].tolist(), debug_mode)
-
- with trend_tabs[1]:
- nav_chart(final_alloc["Ticker"].tolist(), debug_mode)
-
- with tab2:
- st.subheader("📈 Dividend Reinvestment (DRIP) Forecast")
-
- # Calculate DRIP growth with erosion simulation if enabled
- drip_forecast = calculate_drip_growth(
- final_alloc,
- erosion_type=erosion_type,
- erosion_level=erosion_level
- )
-
- # Display explanatory text
- st.write("This forecast shows the growth of your portfolio over time if dividends are reinvested instead of taken as income.")
- base_assumptions = "ETF prices remain constant, dividends are reinvested proportionally to original allocations"
-
- # Show erosion information if enabled
- if erosion_type != "None" and isinstance(erosion_level, dict):
- # Check if using per-ticker rates
- if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level:
- st.write("**Erosion Simulation:** Custom erosion rates applied per ETF")
-
- # Format the per-ticker erosion rates for display
- per_ticker_display = []
- for ticker in tickers:
- if ticker in erosion_level["per_ticker"]:
- ticker_settings = erosion_level["per_ticker"][ticker]
- nav_rate = (1 - (1 - (ticker_settings["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
- yield_rate = (1 - (1 - (ticker_settings["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
-
- per_ticker_display.append({
- "Ticker": ticker,
- "NAV Erosion (Annual %)": f"{nav_rate:.1f}%",
- "Yield Erosion (Annual %)": f"{yield_rate:.1f}%"
- })
-
- # Display the per-ticker settings in a table
- st.dataframe(
- pd.DataFrame(per_ticker_display),
- use_container_width=True,
- hide_index=True
- )
-
- st.write(f"Assumptions: {base_assumptions}, with custom erosion applied monthly per ETF.")
- else:
- # Global rates only
- nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
- yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
- st.write(f"**Erosion Simulation:** NAV erosion at {nav_annual:.1f}% annual rate, Yield erosion at {yield_annual:.1f}% annual rate")
- st.write(f"Assumptions: {base_assumptions}, with erosion applied monthly to all ETFs.")
- else:
- st.write(f"Assumptions: {base_assumptions}.")
-
- # Create columns for key metrics
- col1, col2, col3, col4 = st.columns(4)
-
- # Extract key metrics from forecast
- initial_value = drip_forecast["Total Value ($)"].iloc[0]
- final_value = drip_forecast["Total Value ($)"].iloc[-1]
- value_growth = final_value - initial_value
- value_growth_pct = (value_growth / initial_value) * 100
-
- initial_income = drip_forecast["Monthly Income ($)"].iloc[0]
- final_income = drip_forecast["Monthly Income ($)"].iloc[-1]
- income_growth = final_income - initial_income
- income_growth_pct = (income_growth / initial_income) * 100
-
- total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1]
- capital_recovery_pct = (total_dividends / initial_value) * 100
-
- # Display key metrics
- with col1:
- st.metric(
- "Portfolio Value Growth",
- f"${value_growth:,.2f}",
- f"{value_growth_pct:.2f}%"
- )
- with col2:
- st.metric(
- "Monthly Income Growth",
- f"${income_growth:,.2f}",
- f"{income_growth_pct:.2f}%"
- )
- with col3:
- st.metric(
- "Total Dividends Earned",
- f"${total_dividends:,.2f}"
- )
- with col4:
- st.metric(
- "Capital Recovery",
- f"{capital_recovery_pct:.2f}%"
- )
-
- # Display a line chart showing portfolio growth
- st.subheader("Portfolio Value Growth")
- fig1 = px.line(
- drip_forecast,
- x="Month",
- y="Total Value ($)",
- title="Portfolio Value Growth with DRIP",
- markers=True,
- template="plotly_dark",
- )
- fig1.update_traces(line=dict(color="#1f77b4", width=3))
- fig1.update_layout(
- xaxis=dict(tickmode='linear', tick0=0, dtick=1),
- yaxis=dict(title="Portfolio Value ($)")
- )
- st.plotly_chart(fig1, use_container_width=True)
-
- # Display a line chart showing monthly income growth
- st.subheader("Monthly Income Growth")
- fig2 = px.line(
- drip_forecast,
- x="Month",
- y="Monthly Income ($)",
- title="Monthly Income Growth with DRIP",
- markers=True,
- template="plotly_dark"
- )
- fig2.update_traces(line=dict(color="#ff7f0e", width=3))
- fig2.update_layout(
- xaxis=dict(tickmode='linear', tick0=0, dtick=1),
- yaxis=dict(title="Monthly Income ($)")
- )
- st.plotly_chart(fig2, use_container_width=True)
-
- # Display detailed forecast table
- st.subheader("DRIP Forecast Details")
-
- # Format the data for display
- display_forecast = drip_forecast.copy()
- display_forecast["Total Value ($)"] = display_forecast["Total Value ($)"].apply(lambda x: f"${x:,.2f}")
- display_forecast["Monthly Income ($)"] = display_forecast["Monthly Income ($)"].apply(lambda x: f"${x:,.2f}")
- display_forecast["Cumulative Income ($)"] = display_forecast["Cumulative Income ($)"].apply(lambda x: f"${x:,.2f}")
-
- # Format share counts and prices
- share_columns = [col for col in display_forecast.columns if "Shares" in col]
- price_columns = [col for col in display_forecast.columns if "Price" in col]
- yield_columns = [col for col in display_forecast.columns if "Yield (%)" in col]
-
- for col in share_columns:
- display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.4f}")
-
- for col in price_columns:
- display_forecast[col] = display_forecast[col].apply(lambda x: f"${x:.2f}")
-
- for col in yield_columns:
- display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.2f}%")
-
- # Create a more organized view by grouping columns
- basic_columns = ["Month", "Total Value ($)", "Monthly Income ($)", "Cumulative Income ($)"]
-
- # Create tabs for different views of the data
- detail_tabs = st.tabs(["Summary View", "Full Details"])
-
- with detail_tabs[0]:
- st.dataframe(display_forecast[basic_columns], use_container_width=True)
-
- with detail_tabs[1]:
- # Group columns by ticker for better readability
- ticker_columns = {}
- for ticker in tickers:
- ticker_columns[ticker] = [
- f"{ticker} Shares",
- f"{ticker} Price ($)",
- f"{ticker} Yield (%)"
- ]
-
- # Create ordered columns list: first basic columns, then grouped by ticker
- ordered_columns = basic_columns.copy()
- for ticker in tickers:
- ordered_columns.extend(ticker_columns[ticker])
-
- st.dataframe(
- display_forecast[ordered_columns],
- use_container_width=True,
- height=500 # Increase height to show more rows
- )
-
- # Add comparison between DRIP and No-DRIP strategies
- st.subheader("📊 1-Year DRIP vs. No-DRIP Comparison")
-
- # Add note about erosion effects if applicable
- if erosion_type != "None" and isinstance(erosion_level, dict):
- if erosion_level.get("use_per_ticker", False):
- st.info("""
- This comparison factors in the custom per-ETF erosion rates.
- Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares.
- """)
- else:
- nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
- yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100
- st.info(f"""
- This comparison factors in:
- - NAV Erosion: {nav_annual:.1f}% annually
- - Yield Erosion: {yield_annual:.1f}% annually
-
- Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares.
- """)
-
- # Calculate no-drip scenario (taking dividends as income)
- initial_value = drip_forecast["Total Value ($)"].iloc[0]
- initial_monthly_income = drip_forecast["Monthly Income ($)"].iloc[0]
- annual_income = initial_monthly_income * 12
-
- # Get the final prices after erosion from the last month of the DRIP forecast
- final_prices = {}
- for ticker in tickers:
- price_col = f"{ticker} Price ($)"
- if price_col in drip_forecast.columns:
- final_prices[ticker] = drip_forecast[price_col].iloc[-1]
- else:
- # Fallback to initial price if column doesn't exist
- final_prices[ticker] = ticker_data_dict[ticker]["price"]
-
- # Extract initial shares for each ETF from month 1
- initial_shares = {ticker: drip_forecast.iloc[0][f"{ticker} Shares"] for ticker in tickers}
-
- # Calculate the No-DRIP final value by multiplying initial shares by final prices
- # This correctly accounts for NAV erosion while keeping shares constant
- nodrip_final_value = sum(initial_shares[ticker] * final_prices[ticker] for ticker in tickers)
-
- # The final income should account for erosion but not compounding growth
- # This requires simulation of the erosion that would have happened
- if erosion_type != "None" and isinstance(erosion_level, dict):
- # Initialize the current prices and yields from the final_alloc dataframe
- ticker_data_dict = {}
- current_prices = {}
- current_yields = {}
-
- # Reconstruct ticker data from final_alloc
- for _, row in final_alloc.iterrows():
- ticker = row["Ticker"]
- ticker_data_dict[ticker] = {
- "price": row["Price"],
- "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal
- "distribution": row.get("Distribution Period", "Monthly")
- }
- current_prices[ticker] = row["Price"]
- current_yields[ticker] = row["Yield (%)"] / 100
-
- # Get the erosion rates for each ticker
- if erosion_level.get("use_per_ticker", False):
- ticker_nav_rates = {}
- ticker_yield_rates = {}
- for ticker in tickers:
- ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0})
- ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion
- ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion
- else:
- # Use global rates for all tickers
- global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion
- global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion
- ticker_nav_rates = {ticker: global_nav for ticker in tickers}
- ticker_yield_rates = {ticker: global_yield for ticker in tickers}
-
- # Apply 12 months of erosion
- for month in range(1, 13):
- # Apply erosion to each ticker
- for ticker in tickers:
- # Apply NAV erosion
- if ticker_nav_rates[ticker] > 0:
- current_prices[ticker] *= (1 - ticker_nav_rates[ticker])
-
- # Apply yield erosion
- if ticker_yield_rates[ticker] > 0:
- current_yields[ticker] *= (1 - ticker_yield_rates[ticker])
-
- # Calculate final monthly income with eroded prices and yields but original shares
- final_monthly_income_nodrip = sum(
- (current_yields[ticker] / 12) *
- (initial_shares[ticker] * current_prices[ticker])
- for ticker in tickers
- )
- else:
- # No erosion, so final income is the same as initial income
- final_monthly_income_nodrip = initial_monthly_income
-
- nodrip_final_annual_income = final_monthly_income_nodrip * 12
-
- # Get values for DRIP scenario from forecast
- drip_final_value = drip_forecast["Total Value ($)"].iloc[-1]
- drip_final_monthly_income = drip_forecast["Monthly Income ($)"].iloc[-1]
- drip_annual_income_end = drip_final_monthly_income * 12
-
- # Create comparison dataframe with withdrawn income for a more complete financial picture
-
- # For No-DRIP strategy, calculate cumulative withdrawn income (sum of monthly dividends)
- # This is equivalent to the cumulative income in the DRIP forecast, but in No-DRIP it's withdrawn
- withdrawn_income = 0
- monthly_dividends = []
-
- # Reconstruct the monthly dividend calculation for No-DRIP
- current_prices_monthly = {ticker: ticker_data_dict[ticker]["price"] for ticker in tickers}
- current_yields_monthly = {ticker: ticker_data_dict[ticker]["yield_annual"] for ticker in tickers}
-
- for month in range(1, 13):
- # Calculate dividends for this month based on current yields and prices
- month_dividend = sum(
- (current_yields_monthly[ticker] / 12) *
- (initial_shares[ticker] * current_prices_monthly[ticker])
- for ticker in tickers
- )
- withdrawn_income += month_dividend
- monthly_dividends.append(month_dividend)
-
- # Apply erosion for next month
- if erosion_type != "None":
- for ticker in tickers:
- # Apply NAV erosion
- if ticker in ticker_nav_rates and ticker_nav_rates[ticker] > 0:
- current_prices_monthly[ticker] *= (1 - ticker_nav_rates[ticker])
-
- # Apply yield erosion
- if ticker in ticker_yield_rates and ticker_yield_rates[ticker] > 0:
- current_yields_monthly[ticker] *= (1 - ticker_yield_rates[ticker])
-
- # Calculate total economic result (final value + withdrawn income)
- nodrip_economic_result = nodrip_final_value + withdrawn_income
- drip_economic_result = drip_final_value # No withdrawals
-
- comparison_data = {
- "Strategy": ["Take Income (No DRIP)", "Reinvest Dividends (DRIP)"],
- "Initial Portfolio Value": [f"${initial_value:,.2f}", f"${initial_value:,.2f}"],
- "Final Portfolio Value": [f"${nodrip_final_value:,.2f}", f"${drip_final_value:,.2f}"],
- "Value Change": [
- f"${nodrip_final_value - initial_value:,.2f} ({(nodrip_final_value/initial_value - 1)*100:.2f}%)",
- f"${drip_final_value - initial_value:,.2f} ({(drip_final_value/initial_value - 1)*100:.2f}%)"
- ],
- "Income Withdrawn": [f"${withdrawn_income:,.2f}", "$0.00"],
- "Total Economic Result": [
- f"${nodrip_economic_result:,.2f} ({(nodrip_economic_result/initial_value - 1)*100:.2f}%)",
- f"${drip_economic_result:,.2f} ({(drip_economic_result/initial_value - 1)*100:.2f}%)"
- ],
- "Initial Monthly Income": [f"${initial_monthly_income:,.2f}", f"${initial_monthly_income:,.2f}"],
- "Final Monthly Income": [f"${final_monthly_income_nodrip:,.2f}", f"${drip_final_monthly_income:,.2f}"],
- "Income Change": [
- f"${final_monthly_income_nodrip - initial_monthly_income:,.2f} ({(final_monthly_income_nodrip/initial_monthly_income - 1)*100:.2f}%)",
- f"${drip_final_monthly_income - initial_monthly_income:,.2f} ({(drip_final_monthly_income/initial_monthly_income - 1)*100:.2f}%)"
- ],
- }
-
- # Add chart to visualize the No-DRIP income stream
- if erosion_type != "None":
- # Calculate the effect of erosion on value
- pure_nav_effect = sum(
- initial_shares[ticker] * (final_prices[ticker] - ticker_data_dict[ticker]["price"])
- for ticker in tickers
- )
-
- # Explain the NAV erosion impact on no-DRIP strategy and highlight the benefit of income
- st.info(f"""
- **Economic Comparison:**
- - No-DRIP: While portfolio value decreased by ${abs(pure_nav_effect):,.2f} ({pure_nav_effect/initial_value*100:.2f}%),
- you received ${withdrawn_income:,.2f} in income ({withdrawn_income/initial_value*100:.2f}% of initial investment)
- - DRIP: No income taken, but final portfolio value is ${drip_final_value:,.2f}
- ({(drip_final_value/initial_value - 1)*100:.2f}% vs. initial investment)
- - **Total Economic Result** combines final portfolio value with total withdrawn income for the complete financial picture
- """)
-
- # Show a chart of monthly income for the No-DRIP scenario
- monthly_income_df = pd.DataFrame({
- "Month": list(range(1, 13)),
- "Monthly Income": monthly_dividends
- })
-
- fig = px.line(
- monthly_income_df,
- x="Month",
- y="Monthly Income",
- title="Monthly Income Withdrawn (No-DRIP Strategy)",
- markers=True,
- template="plotly_dark"
- )
- fig.update_traces(line=dict(color="#ff7f0e", width=3))
- st.plotly_chart(fig, use_container_width=True)
-
- comparison_df = pd.DataFrame(comparison_data)
- st.dataframe(comparison_df, use_container_width=True, hide_index=True)
-
- # Show time to recover initial capital - CORRECTED CALCULATION
-
- # For DRIP: Calculate what remains to recover (initial value - current value)
- drip_remaining_to_recover = max(0, initial_value - drip_final_value)
- # Time to recover the remaining amount at the final income rate
- years_to_recover_with_drip = drip_remaining_to_recover / drip_annual_income_end if drip_annual_income_end > 0 else 0
-
- # For No-DRIP: Calculate what remains to recover (initial value - [final value + withdrawn income])
- nodrip_economic_result = nodrip_final_value + withdrawn_income
- nodrip_remaining_to_recover = max(0, initial_value - nodrip_economic_result)
- # Time to recover the remaining amount at the final income rate
- years_to_recover_no_drip = nodrip_remaining_to_recover / nodrip_final_annual_income if nodrip_final_annual_income > 0 else 0
-
- # Convert to months and format display
- def format_recovery_time(years):
- if years <= 0:
- return "0 months"
-
- months = int(years * 12)
- if months < 12:
- return f"{months} months"
- else:
- years_part = months // 12
- months_part = months % 12
- if months_part == 0:
- return f"{years_part} years"
- else:
- return f"{years_part} years, {months_part} months"
-
- col1, col2 = st.columns(2)
- with col1:
- st.metric(
- "Remaining Time to Recover Capital (No DRIP)",
- format_recovery_time(years_to_recover_no_drip)
- )
- with col2:
- st.metric(
- "Remaining Time to Recover Capital (DRIP)",
- format_recovery_time(years_to_recover_with_drip),
- f"Difference: {format_recovery_time(abs(years_to_recover_no_drip - years_to_recover_with_drip))}"
- )
-
- st.write("""
- **Note:** This shows the *remaining* time needed to fully recover your initial investment,
- taking into account both current portfolio value and income already withdrawn.
-
- For No-DRIP: Initial Value - (Current Value + Withdrawn Income) = Amount left to recover
- For DRIP: Initial Value - Current Value = Amount left to recover
- """)
-
- with tab3:
- st.subheader("📉 AI Erosion Risk Assessment")
-
- # Add explanatory text
- st.write("""
- This analysis uses historical ETF data to estimate reasonable erosion settings
- based on past performance, volatility, and dividend history.
- """)
-
- # Run the analysis in a spinner
- with st.spinner("Analyzing historical ETF data..."):
- risk_df = analyze_etf_erosion_risk(final_alloc["Ticker"].tolist(), debug_mode)
-
- if not risk_df.empty:
- # Create a summary table with key insights
- display_risk_df = risk_df.copy()
-
- # Format columns for display
- if "ETF Age (Years)" in display_risk_df.columns:
- display_risk_df["ETF Age (Years)"] = display_risk_df["ETF Age (Years)"].apply(
- lambda x: f"{x:.1f} years" if pd.notna(x) else "Unknown"
- )
- if "Volatility (Annual)" in display_risk_df.columns:
- display_risk_df["Volatility (Annual)"] = display_risk_df["Volatility (Annual)"].apply(
- lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown"
- )
- if "Max Drawdown (1Y)" in display_risk_df.columns:
- display_risk_df["Max Drawdown (1Y)"] = display_risk_df["Max Drawdown (1Y)"].apply(
- lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown"
- )
- if "Dividend Trend" in display_risk_df.columns:
- display_risk_df["Dividend Trend"] = display_risk_df["Dividend Trend"].apply(
- lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown"
- )
-
- # Display main assessment table
- st.subheader("Recommended Erosion Settings")
- main_columns = [
- "Ticker",
- "NAV Erosion Risk (0-9)",
- "Yield Erosion Risk (0-9)",
- "Estimated Annual NAV Erosion",
- "Estimated Annual Yield Erosion",
- "NAV Risk Explanation",
- "Yield Risk Explanation"
- ]
-
- st.dataframe(
- display_risk_df[main_columns],
- use_container_width=True,
- hide_index=True
- )
-
- # Allow applying these settings to the simulation
- if st.button("Apply Recommended Erosion Settings", type="primary"):
- # Initialize or update per-ticker erosion settings
- if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict):
- st.session_state.per_ticker_erosion = {}
-
- # Update the session state with recommended settings
- for _, row in risk_df.iterrows():
- ticker = row["Ticker"]
- st.session_state.per_ticker_erosion[ticker] = {
- "nav": int(row["NAV Erosion Risk (0-9)"]),
- "yield": int(row["Yield Erosion Risk (0-9)"])
- }
-
- # Enable erosion and per-ticker settings
- st.session_state.erosion_type = "NAV & Yield Erosion"
- st.session_state.use_per_ticker_erosion = True
-
- # Update the erosion_level variable to match the new settings
- erosion_level = {
- "global": {
- "nav": 5, # Default medium level for global fallback
- "yield": 5
- },
- "per_ticker": st.session_state.per_ticker_erosion,
- "use_per_ticker": True
- }
-
- # Update session state erosion level for DRIP forecast
- st.session_state.erosion_level = erosion_level
-
- st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.")
- st.info("Go to the DRIP Forecast tab to see the impact of these settings.")
-
- # Display additional risk metrics in an expander
- with st.expander("View Detailed Risk Metrics"):
- detail_columns = [
- "Ticker",
- "ETF Age (Years)",
- "Is New ETF",
- "Volatility (Annual)",
- "Max Drawdown (1Y)",
- "Dividend Trend"
- ]
-
- st.dataframe(
- display_risk_df[detail_columns],
- use_container_width=True,
- hide_index=True
- )
-
- st.write("""
- **Understanding the Metrics:**
- - **ETF Age**: Newer ETFs have less historical data and may be assigned higher risk
- - **Volatility**: Higher volatility suggests higher NAV erosion risk
- - **Max Drawdown**: Maximum peak-to-trough decline, indicating worst historical NAV erosion
- - **Dividend Trend**: Positive values indicate growing dividends, negative values indicate declining dividends
- """)
- else:
- st.warning("Unable to perform risk assessment. Check ticker data or try again.")
-
- with tab4:
- st.subheader("🤖 AI Portfolio Suggestions")
-
- # Update AI suggestion to match simulation mode
- if st.session_state.simulation_mode == "income_target":
- suggestion_df = ai_suggestion(df, ANNUAL_TARGET, st.session_state.etf_allocations)
- else:
- # For capital mode, we need to modify the AI suggestion logic
- # First generate optimized allocations
- ai_allocations = ai_suggestion(df, 1000, st.session_state.etf_allocations) # Use dummy target
- if not ai_allocations.empty:
- # Then use those allocations with the actual capital
- ai_allocs_list = [{"ticker": row["Ticker"], "allocation": row["Allocation (%)"]}
- for _, row in ai_allocations.iterrows()]
- suggestion_df = allocate_for_capital(df, initial_capital, ai_allocs_list)
- else:
- suggestion_df = pd.DataFrame()
-
- if not suggestion_df.empty:
- mode_message = "The AI has optimized the portfolio to minimize capital while mitigating risk" if st.session_state.simulation_mode == "income_target" else "The AI has optimized the income from your investment while mitigating risk"
- st.write(f"{mode_message}, using validated 2024 yield data and ETF longevity.")
- portfolio_summary(suggestion_df)
-
- # Format currencies for better readability
- ai_display_df = suggestion_df.copy()
- ai_display_df["Capital Allocated ($)"] = ai_display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}")
- ai_display_df["Income Contributed ($)"] = ai_display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}")
- ai_display_df["Yield (%)"] = ai_display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%")
-
- st.dataframe(
- ai_display_df[["Ticker", "Capital Allocated ($)", "Income Contributed ($)",
- "Allocation (%)", "Yield (%)", "Risk Level"]],
- use_container_width=True,
- hide_index=True
- )
-
- # Add button to apply AI suggestions
- if st.button("Apply AI Suggested Allocations", type="primary"):
- # Convert AI suggestions to the format needed for recalculation
- ai_allocations = {row["Ticker"]: row["Allocation (%)"] for _, row in suggestion_df.iterrows()}
-
- # Update sidebar ETF allocations to match AI suggestions
- new_etf_allocations = []
- for ticker, allocation in ai_allocations.items():
- new_etf_allocations.append({
- "ticker": ticker,
- "allocation": allocation
- })
-
- # Update session state with new allocations
- st.session_state.etf_allocations = new_etf_allocations
-
- # Recalculate portfolio with new allocations
- final_alloc = recalculate_portfolio(ai_allocations)
- st.session_state.final_alloc = final_alloc
- st.success("Applied AI suggested allocations to your portfolio!")
- st.rerun()
- else:
- st.error("AI Suggestion failed to generate. Check ETF data.")
-
- with tab5:
- st.subheader("📊 ETF Details")
- # ... existing code ...
-
-# End of file
\ No newline at end of file
+ st.error(f"Error focusing on capital: {str(e)}")
\ No newline at end of file