import streamlit as st st.set_page_config( page_title="ETF Dividend Portfolio Builder", page_icon="💼", layout="wide", initial_sidebar_state="expanded" ) # Add navigation in sidebar with st.sidebar: st.markdown("### Navigation") if st.button("🏠 ETF Suite Launcher", key="launcher_portfolio"): st.switch_page("ETF_Suite_Launcher.py") if st.button("📈 ETF Analyzer", key="analyzer_portfolio"): st.switch_page("ETF_Analyzer.py") # --- Imports & Settings --- import pandas as pd import plotly.express as px import plotly.graph_objects as go import yfinance as yf import time import re from typing import Tuple, List, Dict, Any, Optional, TypeVar, Callable from io import StringIO, BytesIO from openai import OpenAI from functools import lru_cache from datetime import datetime, timezone, timedelta import asyncio import concurrent.futures import pdfkit import base64 import tempfile import os import requests import json import hashlib from pathlib import Path import numpy as np # --- Settings --- REQUESTS_PER_MINUTE = 5 # yfinance rate limit MAX_TICKERS = 10 RETRY_ATTEMPTS = 5 RETRY_DELAY = 2 # Seconds between retries MAX_YIELD_THRESHOLD = 50 # Warn if yield exceeds 50% MAX_WORKERS = 5 # Maximum number of parallel workers for API requests DRIP_FORECAST_MONTHS = 12 # Number of months to forecast DRIP compounding USE_FMP_API = True # Whether to use FMP API as an additional data source CACHE_EXPIRATION_DAYS = 7 # Number of days before cache expires # --- Cache Setup --- def setup_cache_dir(): """Set up cache directory if it doesn't exist""" cache_dir = Path("cache") cache_dir.mkdir(exist_ok=True) return cache_dir CACHE_DIR = setup_cache_dir() # Expected yield ranges for validation (based on 2024 data) EXPECTED_YIELDS = { "JEPI": (7, 9), # 7-9% "FEPI": (10, 12), # 10-12% "CONY": (20, 30), # 20-30% "SMCY": (20, 30), "ULTY": (20, 30), "MSTY": (20, 30) } # Reference dictionary with accurate ETF yields (2024 data) ETF_REFERENCE_DB = { "SPY": 1.35, "VOO": 1.40, "VTI": 1.34, "QQQ": 0.70, "SCHD": 3.50, "VYM": 2.95, "HDV": 3.80, "SPYD": 4.20, "SPHD": 4.65, "SDIV": 8.30, "JEPI": 7.80, "DGRO": 2.15, "VIG": 1.85, "BND": 2.80, "AGG": 2.65, "TLT": 3.10, "GLD": 0.00, "VNQ": 3.90, "XLF": 1.75, "XLV": 1.40, "XLE": 3.20, "PFF": 6.20, "SDY": 2.65, "DVY": 3.95, "IDV": 5.10, "NOBL": 2.30, "DGRW": 1.90, "DIV": 6.80, "VGT": 0.70, "VDC": 2.40 } # High-yield ETFs that benefit from FMP API verification (2024 data) HIGH_YIELD_ETFS = { "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"}, "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"}, "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"}, "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"}, "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"}, "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"}, "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"}, "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"} } # --- Helper Functions --- T = TypeVar('T') def generate_cache_key(source: str, ticker: str, endpoint: str = None) -> str: """Generate a unique cache key for a data request. Args: source: Data source (e.g., 'yfinance', 'fmp') ticker: Ticker symbol endpoint: API endpoint or data type Returns: A string hash key """ components = [source, ticker.upper()] if endpoint: components.append(endpoint) key_string = "_".join(components) return hashlib.md5(key_string.encode()).hexdigest() def get_cache_path(cache_key: str) -> Path: """Get the file path for a cache key.""" return CACHE_DIR / f"{cache_key}.json" def save_to_cache(cache_key: str, data: Any) -> None: """Save data to cache with timestamp.""" cache_file = get_cache_path(cache_key) # Process data to ensure it's JSON serializable processed_data = convert_to_serializable(data) cache_data = { "data": processed_data, "timestamp": datetime.now().isoformat() } try: with open(cache_file, 'w') as f: json.dump(cache_data, f) except Exception as e: print(f"Error saving to cache: {str(e)}") # If caching fails, we continue without raising an exception # This allows the app to work even if caching doesn't def convert_to_serializable(obj: Any) -> Any: """Convert an object to a JSON serializable format.""" if obj is None: return None # Handle pandas Series if isinstance(obj, pd.Series): try: # Handle special case of pandas Series with non-serializable index return { "__pandas_series__": True, "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index), "values": obj.values.tolist() if hasattr(obj.values, 'tolist') else list(obj.values), "name": obj.name } except Exception as e: # If all else fails, convert to list return list(obj) # Handle pandas DataFrame elif isinstance(obj, pd.DataFrame): try: return { "__pandas_dataframe__": True, "columns": obj.columns.tolist() if hasattr(obj.columns, 'tolist') else list(obj.columns), "data": obj.values.tolist() if hasattr(obj.values, 'tolist') else obj.values.tolist(), "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index) } except Exception as e: # Fall back to records format return obj.to_dict(orient='records') # Handle numpy arrays elif isinstance(obj, np.ndarray): try: return obj.tolist() except Exception as e: return list(obj) # Handle dictionaries with non-serializable values elif isinstance(obj, dict): return {str(k): convert_to_serializable(v) for k, v in obj.items()} # Handle lists with non-serializable items elif isinstance(obj, (list, tuple)): return [convert_to_serializable(item) for item in obj] # Handle datetime objects elif isinstance(obj, datetime): return obj.isoformat() # Handle other objects by converting to string if needed try: json.dumps(obj) return obj except (TypeError, OverflowError): return str(obj) def load_from_cache(cache_key: str) -> Tuple[Any, bool]: """Load data from cache if it exists and is not expired. Returns: Tuple of (data, is_valid) """ cache_file = get_cache_path(cache_key) if not cache_file.exists(): return None, False try: with open(cache_file, 'r') as f: cache_data = json.load(f) # Check if cache is expired timestamp = datetime.fromisoformat(cache_data["timestamp"]) if datetime.now() - timestamp > timedelta(days=CACHE_EXPIRATION_DAYS): return cache_data["data"], False # Expired but usable as fallback # Restore any special data structures data = restore_from_serializable(cache_data["data"]) return data, True # Valid cache except Exception as e: print(f"Error loading from cache: {str(e)}") return None, False def restore_from_serializable(obj): """Restore special data structures from serialized format.""" if obj is None: return None # Restore pandas Series if isinstance(obj, dict) and obj.get("__pandas_series__"): return pd.Series( data=obj["values"], index=obj["index"], name=obj["name"] ) # Restore pandas DataFrame elif isinstance(obj, dict) and obj.get("__pandas_dataframe__"): return pd.DataFrame( data=obj["data"], columns=obj["columns"], index=obj["index"] ) # Restore nested dictionaries elif isinstance(obj, dict): return {k: restore_from_serializable(v) for k, v in obj.items()} # Restore nested lists elif isinstance(obj, list): return [restore_from_serializable(item) for item in obj] # Return original object return obj def get_cache_stats() -> Dict: """Get cache statistics.""" cache_files = list(CACHE_DIR.glob("*.json")) stats = { "file_count": len(cache_files), "total_size_kb": sum(f.stat().st_size for f in cache_files) / 1024, "sources": {}, "tickers": set() } for file in cache_files: # Extract info from filename name = file.stem if "_" in name: parts = name.split("_") if len(parts) >= 2: source = parts[0] ticker = parts[1] if source not in stats["sources"]: stats["sources"][source] = 0 stats["sources"][source] += 1 stats["tickers"].add(ticker) stats["tickers"] = list(stats["tickers"]) stats["ticker_count"] = len(stats["tickers"]) return stats def clear_cache(ticker: str = None) -> None: """Clear cache files. Args: ticker: If provided, only clear cache for this ticker """ if ticker: # Clear only files for this ticker for file in CACHE_DIR.glob(f"*_{ticker.upper()}_*.json"): file.unlink() else: # Clear all cache files for file in CACHE_DIR.glob("*.json"): file.unlink() def fetch_with_retry(fetch_func: Callable[[], T], attempts: int = RETRY_ATTEMPTS, delay: int = RETRY_DELAY) -> Tuple[Optional[T], str]: """Generic retry function for API calls that might fail temporarily. Args: fetch_func: Function to execute attempts: Number of retry attempts delay: Delay between retries in seconds Returns: Tuple of (result, debug_info) """ debug_info = "" result = None for attempt in range(attempts): try: result = fetch_func() return result, debug_info except Exception as e: debug_info += f"Attempt {attempt+1} failed: {str(e)}\n" if attempt < attempts - 1: time.sleep(delay) return None, debug_info def fetch_fmp_data(ticker: str) -> Tuple[Dict, str]: """Fetch ETF data from FMP API with caching. Args: ticker: The ETF ticker symbol Returns: Tuple of (data_dict, debug_info) """ debug_info = "" result = { "profile": None, "quote": None, "dividend_history": None } # Get API key API_KEY = os.environ.get("FMP_API_KEY") if not API_KEY: API_KEY = st.session_state.get("fmp_api_key") if not API_KEY: return result, "FMP API key not found" # Check if we should use force refresh force_refresh = st.session_state.get("force_refresh_data", False) try: # Fetch profile data with cache profile_cache_key = generate_cache_key("fmp", ticker, "profile") profile_data, is_valid = load_from_cache(profile_cache_key) if not force_refresh else (None, False) if not is_valid: # Need to fetch from API profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}" profile_response = requests.get(profile_url) if profile_response.status_code == 200: profile_data = profile_response.json() save_to_cache(profile_cache_key, profile_data) debug_info += f"Profile data fetched from API and cached\n" # Track API call if "api_calls" in st.session_state: st.session_state.api_calls += 1 else: debug_info += f"Profile request failed with status {profile_response.status_code}\n" profile_data = None else: debug_info += f"Profile data loaded from cache\n" result["profile"] = profile_data # Fetch quote data with cache quote_cache_key = generate_cache_key("fmp", ticker, "quote") quote_data, is_valid = load_from_cache(quote_cache_key) if not force_refresh else (None, False) if not is_valid: # Need to fetch from API quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}" quote_response = requests.get(quote_url) if quote_response.status_code == 200: quote_data = quote_response.json() save_to_cache(quote_cache_key, quote_data) debug_info += f"Quote data fetched from API and cached\n" # Track API call if "api_calls" in st.session_state: st.session_state.api_calls += 1 else: debug_info += f"Quote request failed with status {quote_response.status_code}\n" quote_data = None else: debug_info += f"Quote data loaded from cache\n" result["quote"] = quote_data # Fetch dividend history with cache dividend_cache_key = generate_cache_key("fmp", ticker, "dividend_history") dividend_data, is_valid = load_from_cache(dividend_cache_key) if not force_refresh else (None, False) if not is_valid: # Need to fetch from API dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}" dividend_response = requests.get(dividend_url) if dividend_response.status_code == 200: dividend_data = dividend_response.json() save_to_cache(dividend_cache_key, dividend_data) debug_info += f"Dividend history fetched from API and cached\n" # Track API call if "api_calls" in st.session_state: st.session_state.api_calls += 1 else: debug_info += f"Dividend history request failed with status {dividend_response.status_code}\n" dividend_data = None else: debug_info += f"Dividend history loaded from cache\n" result["dividend_history"] = dividend_data return result, debug_info except Exception as e: debug_info += f"FMP API request failed: {str(e)}\n" return result, debug_info def yf_fetch_with_cache(ticker: str, data_type: str, fetch_func: Callable) -> Tuple[Any, str]: """Fetch data from yfinance with caching. Args: ticker: Ticker symbol data_type: Type of data (info, dividends, history) fetch_func: Function to execute for fetching data Returns: Tuple of (data, debug_info) """ debug_info = "" # Generate cache key for this request cache_key = generate_cache_key("yf", ticker, data_type) # Check if we should force refresh force_refresh = st.session_state.get("force_refresh_data", False) # Try to get data from cache first data, is_valid = load_from_cache(cache_key) if not force_refresh else (None, False) if is_valid: # We have valid cached data debug_info = f"Data for {ticker} ({data_type}) loaded from cache" return data, debug_info # Need to fetch data from API data, api_debug = fetch_with_retry(fetch_func) debug_info += api_debug # If successful, cache the data if data is not None: save_to_cache(cache_key, data) debug_info += f"\nData for {ticker} ({data_type}) saved to cache" return data, debug_info def process_ticker_data(ticker: str, debug: bool = False) -> Tuple[Optional[Dict], Tuple[str, str, str], List[str]]: """Process a single ticker to get all relevant data. Args: ticker: The ticker symbol debug: Whether to include debug information Returns: Tuple of (ticker_data, error_info, warnings) where: - ticker_data is the processed data or None - error_info is (ticker, reason, debug_info) or None - warnings is a list of warning messages """ debug_info = "" warnings = [] # Check if this is a high-yield ETF that would benefit from FMP API verification is_high_yield = ticker in HIGH_YIELD_ETFS try: # First try yfinance as primary data source yf_ticker = yf.Ticker(ticker) # Fetch price and inception data with caching info, price_debug = yf_fetch_with_cache( ticker, "info", lambda: yf_ticker.info ) debug_info += price_debug if not info or not info.get("previousClose"): if not USE_FMP_API or not is_high_yield: return None, (ticker, "No price data available", debug_info), [] # Default values from yfinance price = info.get("previousClose", 0) if info else 0 inception_date = info.get("fundInceptionDate") if info else None # Get NAV data nav = info.get("navPrice", None) if info else None if nav is None and info: # For some ETFs, this might be stored under a different key nav = info.get("regularMarketPrice", price) # Calculate premium/discount to NAV nav_premium = 0 if nav and nav > 0: nav_premium = ((price / nav) - 1) * 100 # as percentage if debug: debug_info += f"\nYFinance - Price: {price}\nInception Date: {inception_date}\nNAV: {nav}\nPremium/Discount: {nav_premium:.2f}%\n" # If this is a high-yield ETF and FMP API is enabled, get additional data from FMP API fmp_yield_calculated = None fmp_price = None fmp_dist_period = None if USE_FMP_API and is_high_yield: fmp_data, fmp_debug = fetch_fmp_data(ticker) debug_info += f"\nFMP API Debug: {fmp_debug}\n" # Extract data from FMP response if fmp_data["profile"] and len(fmp_data["profile"]) > 0: profile = fmp_data["profile"][0] fmp_price = profile.get("price", price) # Default to yfinance price if not available last_div = profile.get("lastDiv", 0) if fmp_price > 0 and last_div > 0: # Calculate yield from profile data fmp_profile_yield = (last_div / fmp_price) * 100 debug_info += f"FMP Profile Yield: {fmp_profile_yield:.2f}%\n" # Extract yield from quote data if fmp_data["quote"] and len(fmp_data["quote"]) > 0: quote = fmp_data["quote"][0] if "dividendYield" in quote: div_yield = quote["dividendYield"] fmp_quote_yield = div_yield * 100 if div_yield < 1 else div_yield debug_info += f"FMP Quote Yield: {fmp_quote_yield:.2f}%\n" # Update price if available if "price" in quote and not fmp_price: fmp_price = quote["price"] # Calculate yield from dividend history if fmp_data["dividend_history"] and "historical" in fmp_data["dividend_history"] and fmp_data["dividend_history"]["historical"]: recent_divs = fmp_data["dividend_history"]["historical"][:3] # Get last 3 dividends if recent_divs and "dividend" in recent_divs[0] and fmp_price: # Try to figure out payment frequency if len(recent_divs) >= 2: try: date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d") date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d") days_between = abs((date1 - date2).days) if days_between < 45: # Monthly frequency = 12 fmp_dist_period = "Monthly" elif days_between < 100: # Quarterly frequency = 4 fmp_dist_period = "Quarterly" elif days_between < 200: # Semi-annual frequency = 2 fmp_dist_period = "Semi-Annually" else: # Annual frequency = 1 fmp_dist_period = "Annually" annual_div = recent_divs[0]["dividend"] * frequency fmp_yield_calculated = (annual_div / fmp_price) * 100 debug_info += f"FMP Calculated Yield: {fmp_yield_calculated:.2f}% (Distribution: {fmp_dist_period})\n" except Exception as e: debug_info += f"Error calculating FMP yield: {str(e)}\n" # Decide which data source to use - for high-yield ETFs, prefer FMP if available use_fmp = USE_FMP_API and is_high_yield and fmp_yield_calculated is not None # If we're using FMP data for high-yield ETFs if use_fmp: if debug: debug_info += f"Using FMP data for {ticker} (high-yield ETF)\n" # For high-yield ETFs, FMP data is typically more accurate yield_pct = fmp_yield_calculated final_price = fmp_price if fmp_price else price dist_period = fmp_dist_period if fmp_dist_period else HIGH_YIELD_ETFS[ticker]["frequency"] income_per_1k = (1000 / final_price) * (yield_pct * final_price) / 100 # Add a note that we're using validated data debug_info += f"Using validated FMP yield data: {yield_pct:.2f}%\n" else: # For normal ETFs, proceed with yfinance data # Fetch dividend data from yfinance with caching dividends, div_debug = yf_fetch_with_cache( ticker, "dividends", lambda: yf_ticker.dividends ) debug_info += div_debug if dividends is None or dividends.empty: return None, (ticker, "No dividend data available", debug_info), [] dividends = dividends.reset_index() dividends.columns = ["date", "amount"] dividends["date"] = pd.to_datetime(dividends["date"]) last_year = dividends[dividends["date"] >= pd.Timestamp.now(tz='America/New_York') - pd.Timedelta(days=365)] ttm_dividend = last_year["amount"].sum() if not ttm_dividend and not last_year.empty: ttm_dividend = dividends["amount"].mean() * 12 debug_info += f"\nFallback: Estimated TTM dividend = {ttm_dividend:.2f}" if not ttm_dividend or not price: return None, (ticker, f"Missing data: Price={price}, Dividend={ttm_dividend}", debug_info), [] yield_pct = (ttm_dividend / price) * 100 income_per_1k = (1000 / price) * ttm_dividend # Check for unrealistic yields if yield_pct > MAX_YIELD_THRESHOLD and ticker not in HIGH_YIELD_ETFS: warnings.append(f"Unrealistic yield for {ticker}: {yield_pct:.2f}%. Verify data accuracy.") debug_info += f"Warning: Yield {yield_pct:.2f}% exceeds {MAX_YIELD_THRESHOLD}% threshold\n" # Use reference yield if available if ticker in ETF_REFERENCE_DB: yield_pct = ETF_REFERENCE_DB[ticker] debug_info += f"Corrected to reference yield: {yield_pct:.2f}%\n" income_per_1k = (1000 / price) * (yield_pct * price) / 100 # Calculate distribution period if len(last_year) >= 2: intervals = (last_year["date"].diff().dt.days).dropna() avg_interval = intervals.mean() if avg_interval <= 45: dist_period = "Monthly" elif avg_interval <= 100: dist_period = "Quarterly" elif avg_interval <= 200: dist_period = "Semi-Annually" else: dist_period = "Annually" else: dist_period = "Unknown" # Convert inception date to datetime inception_date_str = None if inception_date: try: # Ensure timestamp is valid and convert to UTC inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) inception_date_str = inception_date_dt.strftime("%Y-%m-%d") except Exception as e: debug_info += f"Invalid inception date format: {inception_date}, error: {str(e)}\n" inception_date_str = None # Final data with validated yield info final_price = fmp_price if use_fmp and fmp_price else price return { "Ticker": ticker, "Price": round(final_price, 2), "NAV": round(nav, 2) if nav else None, "Premium/Discount (%)": round(nav_premium, 2) if nav else None, "Dividend Rate": round((yield_pct * final_price) / 100, 2), "Yield (%)": round(yield_pct, 2), "Income per $1K": round(income_per_1k, 2), "Distribution Period": dist_period, "Inception Date": inception_date_str, "Data Source": "FMP API" if use_fmp else "YFinance" }, None, warnings except Exception as e: return None, (ticker, f"Error processing ticker: {str(e)}", debug_info), [] # --- Validate ETF Input --- def validate_etf_input(etf_allocations: List[Dict]) -> List[str]: """Validate ETF tickers from session state.""" if not etf_allocations: st.error("Please add at least one ETF.") return [] tickers = [etf["ticker"] for etf in etf_allocations] valid_tickers = [] for t in tickers: if re.match(r'^[A-Z]{1,7}$', t): try: yf_ticker = yf.Ticker(t) info, _ = fetch_with_retry(lambda: yf_ticker.info) if info and info.get("previousClose"): valid_tickers.append(t) else: st.warning(f"Skipping {t}: No price data available.") except Exception as e: st.warning(f"Skipping {t}: Failed to fetch data ({str(e)}).") else: st.warning(f"Invalid ticker: {t}. Must be 1-7 uppercase letters.") if not valid_tickers: st.error("No valid tickers found. Please check tickers and try again.") return valid_tickers # --- Fetch ETF Data --- @st.cache_data(show_spinner=False) def fetch_etfs(tickers: str, debug: bool, use_parallel: bool = True) -> Tuple[pd.DataFrame, List[Tuple[str, str, str]]]: """Fetch ETF data from yfinance and FMP API with retries.""" tickers_list = tickers.split(",") valid, skipped = [], [] all_warnings = [] progress = st.progress(0) status = st.empty() # Initialize API call counter if not in session state if "api_calls" not in st.session_state: st.session_state.api_calls = 0 # Check if we need FMP API key if USE_FMP_API and any(t in HIGH_YIELD_ETFS for t in tickers_list): # Get API key - either from environment or input API_KEY = os.environ.get("FMP_API_KEY") if not API_KEY and "fmp_api_key" not in st.session_state: API_KEY = st.text_input("Enter FMP API Key for more accurate yield data:", type="password") st.session_state.fmp_api_key = API_KEY if not API_KEY: st.warning("Without FMP API key, high-yield ETF data may be less accurate.") # Define sequential processing function def process_sequentially(): for idx, ticker in enumerate(tickers_list): status.text(f"Fetching {ticker} ({idx+1}/{len(tickers_list)})...") progress.progress((idx + 1) / len(tickers_list)) ticker_data, error, warnings = process_ticker_data(ticker, debug) if ticker_data: valid.append(ticker_data) all_warnings.extend(warnings) elif error: skipped.append(error) # Rate limit if idx < len(tickers_list) - 1: time.sleep(60 / REQUESTS_PER_MINUTE) # Define parallel processing function def process_parallel(): def process_with_status(ticker): # No Streamlit operations in this thread return process_ticker_data(ticker, debug) # Create a list to collect results results = [] # Show processing status status.text(f"Fetching {len(tickers_list)} ETFs in parallel...") # Use ThreadPoolExecutor for parallel processing with concurrent.futures.ThreadPoolExecutor(max_workers=min(MAX_WORKERS, len(tickers_list))) as executor: # Submit all tasks future_to_ticker = {executor.submit(process_with_status, ticker): ticker for ticker in tickers_list} # Process results as they complete for i, future in enumerate(concurrent.futures.as_completed(future_to_ticker)): progress.progress((i + 1) / len(tickers_list)) ticker = future_to_ticker[future] try: ticker_data, error, warnings = future.result() if ticker_data: valid.append(ticker_data) all_warnings.extend(warnings) elif error: skipped.append(error) except Exception as e: skipped.append((ticker, f"Thread error: {str(e)}", "")) # Choose processing method based on setting if use_parallel and len(tickers_list) > 1: try: process_parallel() except Exception as e: st.error(f"Error in parallel processing: {str(e)}. Falling back to sequential processing.") valid, skipped = [], [] process_sequentially() else: process_sequentially() # Display warnings collected from all threads for warning in all_warnings: st.warning(warning) if debug and skipped: st.subheader("🛑 Skipped Tickers (Debug)") st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]), use_container_width=True) progress.empty() status.empty() # Check if we have data source information and add it to the display df = pd.DataFrame(valid) # Debug info about data sources if debug and not df.empty and "Data Source" in df.columns: source_counts = df["Data Source"].value_counts() st.info(f"Data sources used: {dict(source_counts)}") return df, skipped # --- Test FMP API Function (for debugging) --- def test_fmp_api(): """Test function to verify FMP API responses for ETF yield data.""" st.subheader("FMP API Test Results") # Get API key from environment or input API_KEY = os.environ.get("FMP_API_KEY") if not API_KEY and "fmp_api_key" not in st.session_state: API_KEY = st.text_input("Enter your FMP API Key:", type="password") st.session_state.fmp_api_key = API_KEY else: API_KEY = st.session_state.get("fmp_api_key", API_KEY) if not API_KEY: st.warning("Please enter your FMP API key to continue") return # List of ETFs to test (including high-yield ETFs) test_tickers_default = "MSTY,SCHD,JEPI,SMCY,SPY" test_tickers_input = st.text_input("Enter ETF tickers to test (comma separated):", test_tickers_default) test_tickers = [ticker.strip() for ticker in test_tickers_input.split(",") if ticker.strip()] if st.button("Run FMP API Test"): results = [] for ticker in test_tickers: st.write(f"### Testing {ticker}") # Try profile endpoint profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}" response = requests.get(profile_url) with st.expander(f"{ticker} Profile Response (Status: {response.status_code})"): if response.status_code == 200: data = response.json() if data and len(data) > 0: # Check if there's yield info if "lastDiv" in data[0]: price = data[0].get("price", 0) last_div = data[0].get("lastDiv", 0) if price > 0 and last_div > 0: div_yield = (last_div / price) * 100 st.write(f"- lastDiv: {last_div}") st.write(f"- price: {price}") st.write(f"- calculated yield: {div_yield:.2f}%") else: st.write(f"- lastDiv: {last_div}") st.write(f"- price: {price}") st.write("- Cannot calculate yield (price or lastDiv is zero)") else: st.write("- No 'lastDiv' found in response") # Save other useful fields for field in ["companyName", "symbol", "industry", "sector"]: if field in data[0]: st.write(f"- {field}: {data[0][field]}") else: st.write("- Empty response data") else: st.write(f"- Error response: {response.text}") # Also try quote endpoint quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}" response = requests.get(quote_url) with st.expander(f"{ticker} Quote Response (Status: {response.status_code})"): if response.status_code == 200: data = response.json() if data and len(data) > 0: # Check if there's yield info if "dividendYield" in data[0]: div_yield = data[0]["dividendYield"] * 100 if data[0]["dividendYield"] < 1 else data[0]["dividendYield"] st.write(f"- dividendYield: {data[0]['dividendYield']}") st.write(f"- formatted yield: {div_yield:.2f}%") else: st.write("- No 'dividendYield' found in response") # Save other useful fields for field in ["name", "price", "exchange", "marketCap"]: if field in data[0]: st.write(f"- {field}: {data[0][field]}") else: st.write("- Empty response data") else: st.write(f"- Error response: {response.text}") # Also try historical dividends endpoint dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}" response = requests.get(dividend_url) with st.expander(f"{ticker} Dividend History Response (Status: {response.status_code})"): if response.status_code == 200: data = response.json() if "historical" in data and data["historical"]: recent_divs = data["historical"][:3] # Show last 3 dividends for div in recent_divs: st.write(f"- Date: {div.get('date')}, Dividend: {div.get('dividend')}") # Calculate annualized yield if possible if recent_divs and "dividend" in recent_divs[0]: # Try to get current price from previous quote response current_price = None quote_response = requests.get(quote_url) if quote_response.status_code == 200: quote_data = quote_response.json() if quote_data and len(quote_data) > 0 and "price" in quote_data[0]: current_price = quote_data[0]["price"] if current_price: # Use most recent dividend payment and estimate annual yield if len(recent_divs) >= 2: # Try to figure out payment frequency try: date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d") date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d") days_between = abs((date1 - date2).days) if days_between < 45: # Monthly frequency = 12 freq_text = "monthly" elif days_between < 100: # Quarterly frequency = 4 freq_text = "quarterly" elif days_between < 200: # Semi-annual frequency = 2 freq_text = "semi-annual" else: # Annual frequency = 1 freq_text = "annual" annual_div = recent_divs[0]["dividend"] * frequency estimated_yield = (annual_div / current_price) * 100 st.write(f"- Estimated {freq_text} yield: {estimated_yield:.2f}% (price: ${current_price})") except Exception as e: st.write(f"- Error calculating estimated yield: {str(e)}") else: st.write("- Not enough dividend history to determine frequency") else: st.write("- Cannot calculate yield (price not available)") else: st.write("- No dividend history found") else: st.write(f"- Error response: {response.text}") st.write("---") st.write("Note: Add the 'test_fmp_api' function to the sidebar to use it for debugging.") # --- PDF Export Function --- def create_pdf_report(final_alloc, df_data, chat_summary=None): """Generate a PDF report of the ETF portfolio. Args: final_alloc: DataFrame of final allocations df_data: DataFrame of raw ETF data chat_summary: Optional text summary from ChatGPT Returns: Base64 encoded PDF file """ # Calculate summary metrics total_capital = final_alloc["Capital Allocated ($)"].sum() total_income = final_alloc["Income Contributed ($)"].sum() monthly_income = total_income / 12 weighted_yield = (final_alloc["Income Contributed ($)"] * final_alloc["Yield (%)"]).sum() / total_income if total_income else 0 # Calculate DRIP forecast if needed if st.session_state.drip_enabled: drip_forecast = calculate_drip_growth(final_alloc) # Get key metrics for DRIP initial_value = drip_forecast["Total Value ($)"].iloc[0] final_value = drip_forecast["Total Value ($)"].iloc[-1] value_growth = final_value - initial_value value_growth_pct = (value_growth / initial_value) * 100 initial_income = drip_forecast["Monthly Income ($)"].iloc[0] * 12 final_income = drip_forecast["Monthly Income ($)"].iloc[-1] * 12 income_growth = final_income - initial_income income_growth_pct = (income_growth / initial_income) * 100 total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1] years_to_recover = 100 * initial_value / final_income # 100% recovery # Create HTML report html = f"""

ETF Dividend Portfolio Report

Generated on {datetime.now().strftime('%Y-%m-%d %H:%M')}

Portfolio Summary

Total Capital

${total_capital:,.2f}

Annual Income

${total_income:,.2f}

Monthly Income

${monthly_income:,.2f}

Weighted Yield

{weighted_yield:.2f}%

ETF Allocation Details
""" # Add rows for each ETF for _, row in final_alloc.iterrows(): risk_class = "" if "High" in str(row.get("Risk Level", "")): risk_class = "risk-high" elif "Medium" in str(row.get("Risk Level", "")): risk_class = "risk-medium" elif "Average" in str(row.get("Risk Level", "")): risk_class = "risk-average" html += f""" """ html += """
Ticker Capital ($) Income ($) Allocation (%) Yield (%) Risk Level Distribution
{row["Ticker"]} ${row["Capital Allocated ($)"]:,.2f} ${row["Income Contributed ($)"]:,.2f} {row["Allocation (%)"]:.2f}% {row["Yield (%)"]:.2f}% {row.get("Risk Level", "Unknown")} {row.get("Distribution Period", "Unknown")}
""" # Add DRIP forecast if enabled if st.session_state.drip_enabled: html += f"""
Dividend Reinvestment (DRIP) Forecast

1-Year Value Growth

${value_growth:,.2f} ({value_growth_pct:.2f}%)

1-Year Income Growth

${income_growth:,.2f} ({income_growth_pct:.2f}%)

Total Dividends Earned

${total_dividends:,.2f}

Years to Recover Capital

{years_to_recover:.2f}

This forecast assumes ETF prices remain constant and all dividends are reinvested proportionally to original allocations.

""" # Add rows for each month for _, row in drip_forecast.iterrows(): month = row["Month"] value = row["Total Value ($)"] monthly = row["Monthly Income ($)"] cumulative = row["Cumulative Income ($)"] html += f""" """ html += """
Month Portfolio Value ($) Monthly Income ($) Cumulative Income ($)
{month} ${value:,.2f} ${monthly:,.2f} ${cumulative:,.2f}
""" # Add ChatGPT summary if available if chat_summary: html += f"""
ETF Analysis

{chat_summary.replace(chr(10), '
')}

""" # Add footer html += """ """ # Create PDF from HTML try: # Create temporary file for the PDF with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp: pdf_path = tmp.name # Convert HTML to PDF pdfkit_options = { 'quiet': '', 'enable-local-file-access': None, 'page-size': 'Letter', 'margin-top': '0.75in', 'margin-right': '0.75in', 'margin-bottom': '0.75in', 'margin-left': '0.75in', } pdfkit.from_string(html, pdf_path, options=pdfkit_options) # Read the PDF file with open(pdf_path, 'rb') as pdf_file: pdf_data = pdf_file.read() # Delete the temporary file os.unlink(pdf_path) # Encode the PDF as base64 return base64.b64encode(pdf_data).decode() except Exception as e: st.error(f"Failed to create PDF: {str(e)}") if "wkhtmltopdf" in str(e).lower(): st.error("Please install wkhtmltopdf: https://wkhtmltopdf.org/downloads.html") return None # --- ChatGPT Summary --- @st.cache_data(show_spinner=False) def get_chatgpt_summary(tickers: str, api_key: str) -> str: """Generate ETF summary using ChatGPT.""" tickers_list = tickers.split(",") if not api_key: return "Please enter a valid OpenAI API key." try: client = OpenAI(api_key=api_key) prompt = f""" Act as a financial analyst. Provide a concise summary (150-200 words) of these ETFs: {', '.join(tickers_list)}. Include for each: - Key characteristics (yield, sector exposure, strategy). - Investment suitability (risk level, investor type). - Recent performance trends (if available). Highlight risks and benefits. Use public data or your knowledge. """ response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a financial analyst specializing in ETFs."}, {"role": "user", "content": prompt} ], max_tokens=300, temperature=0.7 ) return response.choices[0].message.content except Exception as e: return f"ChatGPT error: {str(e)}. Check API key or try again." # --- Assign Risk Level --- def assign_risk_level(df: pd.DataFrame, allocations: List[Dict]) -> pd.DataFrame: """Assign risk level based on yield, allocation, and ETF age.""" df = df.copy() df["Risk Level"] = "Unknown" current_date = pd.Timestamp.now(tz='UTC') # Calculate total allocation to high-yield, new ETFs high_risk_alloc = 0 for alloc in allocations: ticker = alloc["ticker"] alloc_pct = alloc["allocation"] row = df[df["Ticker"] == ticker] if row.empty: continue yield_pct = row["Yield (%)"].iloc[0] inception_date = row["Inception Date"].iloc[0] if pd.isna(inception_date): continue inception_date = pd.to_datetime(inception_date, utc=True) age_years = (current_date - inception_date).days / 365.25 if yield_pct > 20 and age_years < 2: high_risk_alloc += alloc_pct for idx, row in df.iterrows(): ticker = row["Ticker"] yield_pct = row["Yield (%)"] inception_date = row["Inception Date"] alloc_pct = next((a["allocation"] for a in allocations if a["ticker"] == ticker), 0) if pd.isna(inception_date): df.at[idx, "Risk Level"] = "Unknown (Missing Inception)" continue inception_date = pd.to_datetime(inception_date, utc=True) age_years = (current_date - inception_date).days / 365.25 # Risk criteria is_new = age_years < 2 is_mid_age = 2 <= age_years <= 5 is_old = age_years > 5 is_high_yield = yield_pct > 20 is_mid_yield = 12 <= yield_pct <= 20 is_low_yield = yield_pct < 12 is_high_alloc = alloc_pct > 30 is_mid_alloc = 20 <= alloc_pct <= 30 is_portfolio_risky = high_risk_alloc > 50 if (is_new or is_high_yield) and (is_high_alloc or is_portfolio_risky): df.at[idx, "Risk Level"] = "High Risk" elif is_mid_age or is_mid_yield or is_mid_alloc: df.at[idx, "Risk Level"] = "Medium Risk" elif is_old and is_low_yield and alloc_pct < 20: df.at[idx, "Risk Level"] = "Average Risk" else: df.at[idx, "Risk Level"] = "Medium Risk" return df # --- AI Suggestion --- def ai_suggestion(df: pd.DataFrame, target: float, user_allocations: List[Dict]) -> pd.DataFrame: """Generate AI-suggested portfolio with risk-mitigated allocations.""" adjusted_df = df.copy() adjustments = [] current_date = pd.Timestamp.now(tz='UTC') # Validate and adjust yields for idx, row in adjusted_df.iterrows(): ticker = row["Ticker"] yield_pct = row["Yield (%)"] if ticker in EXPECTED_YIELDS: min_yield, max_yield = EXPECTED_YIELDS[ticker] if yield_pct > max_yield: adjusted_yield = max_yield adjustments.append(f"Adjusted {ticker} yield from {yield_pct:.2f}% to {max_yield:.2f}% (max expected).") adjusted_df.at[idx, "Yield (%)"] = adjusted_yield adjusted_df.at[idx, "Dividend Rate"] = (adjusted_yield / 100) * row["Price"] adjusted_df.at[idx, "Income per $1K"] = (1000 / row["Price"]) * adjusted_df.at[idx, "Dividend Rate"] # Optimize allocations: prioritize older, stable ETFs, limit high-yield exposure sorted_df = adjusted_df.sort_values(by=["Inception Date", "Yield (%)"], ascending=[True, False]) ai_allocations = [] total_alloc = 0 high_yield_new_alloc = 0 for idx, row in sorted_df.iterrows(): ticker = row["Ticker"] yield_pct = row["Yield (%)"] inception_date = row["Inception Date"] if pd.isna(inception_date): continue inception_date = pd.to_datetime(inception_date, utc=True) age_years = (current_date - inception_date).days / 365.25 # Allocate based on age and yield if age_years > 5 and yield_pct < 12: # Stable, older ETFs (e.g., JEPI) alloc = 20 elif age_years >= 2 and yield_pct <= 20: # Mid-age, moderate yield (e.g., FEPI) alloc = 15 else: # Newer, high-yield ETFs (e.g., CONY, MSTY) alloc = 10 # Limit to reduce risk if yield_pct > 20 and age_years < 2: high_yield_new_alloc += alloc # Cap high-yield, new ETF allocation if high_yield_new_alloc > 40: alloc = 0 if total_alloc + alloc > 100: alloc = 100 - total_alloc if alloc <= 0: continue ai_allocations.append({"ticker": ticker, "allocation": alloc}) total_alloc += alloc # Adjust to sum to 100% if total_alloc < 100: remaining = 100 - total_alloc for alloc in ai_allocations: if adjusted_df[adjusted_df["Ticker"] == alloc["ticker"]]["Yield (%)"].iloc[0] < 12: alloc["allocation"] += remaining break elif total_alloc > 100: excess = total_alloc - 100 ai_allocations[-1]["allocation"] -= excess results = [] weighted_yield = 0 for _, row in adjusted_df.iterrows(): ticker = row["Ticker"] alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0) if alloc_pct == 0: continue weighted_yield += alloc_pct * (row["Yield (%)"] / 100) if weighted_yield <= 0: st.error("AI Suggestion: Weighted yield is zero or negative. Check ETF data.") return pd.DataFrame() total_capital = target / weighted_yield for _, row in adjusted_df.iterrows(): ticker = row["Ticker"] alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0) if alloc_pct == 0: continue capital = total_capital * alloc_pct shares = capital / row["Price"] income = shares * row["Dividend Rate"] # Create result dictionary with all available data result = { "Ticker": ticker, "Yield (%)": row["Yield (%)"], "Dividend Rate": row["Dividend Rate"], "Capital Allocated ($)": round(capital, 2), "Income Contributed ($)": round(income, 2), "Allocation (%)": round(alloc_pct * 100, 2), "Inception Date": row["Inception Date"], "Distribution Period": row.get("Distribution Period", "Unknown"), "Price": row["Price"] } # Add NAV data if available if "NAV" in row and row["NAV"] is not None: result["NAV"] = row["NAV"] if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: result["Premium/Discount (%)"] = row["Premium/Discount (%)"] results.append(result) suggestion_df = pd.DataFrame(results) suggestion_df = assign_risk_level(suggestion_df, ai_allocations) if adjustments or ai_allocations != user_allocations: notes = ["The AI balanced high-yield ETFs with older, stable ETFs to mitigate risk while meeting the income target. Newer ETFs with high yields are capped to reduce exposure to unsustainable distributions."] if adjustments: notes.extend(adjustments) st.info("AI Suggestion Notes:\n- " + "\n- ".join(notes)) return suggestion_df # --- Yield Trends --- def yield_chart(tickers: List[str], debug: bool = False): """Plot TTM yield trends.""" fig = go.Figure() for ticker in tickers: try: yf_ticker = yf.Ticker(ticker) # Get dividends with retry dividends, debug_info = fetch_with_retry(lambda: yf_ticker.dividends) if debug: st.write(f"DEBUG: {ticker} dividend data: {dividends.to_dict() if dividends is not None else 'None'}") if dividends is None or dividends.empty: continue dividends = dividends.reset_index() dividends.columns = ["date", "amount"] dividends["date"] = pd.to_datetime(dividends["date"]) monthly = dividends.set_index("date")["amount"].resample("ME").sum() ttm_dividend = monthly.rolling(12).sum() # Get prices with retry prices, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y")["Close"]) if prices is None: continue avg_price = prices.rolling(252).mean() ttm_yield = (ttm_dividend / avg_price) * 100 ttm_yield = ttm_yield.dropna() fig.add_trace(go.Scatter( x=ttm_yield.index, y=ttm_yield, name=ticker, mode="lines", hovertemplate="%{x|%Y-%m}: %{y:.2f}%" )) except Exception as e: if debug: st.write(f"DEBUG: Error plotting {ticker}: {str(e)}") continue fig.update_layout( title="TTM Dividend Yield Trend", xaxis_title="Date", yaxis_title="Yield (%)", template="plotly_dark", hovermode="x unified", xaxis=dict(tickformat="%Y-%m"), yaxis=dict(gridcolor="rgba(255,255,255,0.2)") ) st.plotly_chart(fig, use_container_width=True) # --- NAV Trends --- def nav_chart(tickers: List[str], debug: bool = False): """Plot NAV (Net Asset Value) trends over time for selected ETFs.""" fig = go.Figure() for ticker in tickers: try: yf_ticker = yf.Ticker(ticker) # Get historical price data price_history, debug_info = fetch_with_retry(lambda: yf_ticker.history(period="2y")) if price_history is None or price_history.empty: if debug: st.write(f"DEBUG: No price history for {ticker}") continue # Extract NAV data - for most ETFs, we'll use Close price as a proxy for NAV # For some closed-end funds, there might be specific NAV history available nav_series = price_history["Close"] # Plot the NAV trend fig.add_trace(go.Scatter( x=nav_series.index, y=nav_series, name=f"{ticker} NAV", mode="lines", hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}" )) # Add price for comparison (slight transparency) fig.add_trace(go.Scatter( x=price_history.index, y=price_history["Close"], name=f"{ticker} Price", mode="lines", line=dict(dash="dot", width=1), opacity=0.7, hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}" )) except Exception as e: if debug: st.write(f"DEBUG: Error plotting NAV for {ticker}: {str(e)}") continue fig.update_layout( title="ETF NAV and Price Trends (2-Year)", xaxis_title="Date", yaxis_title="Value ($)", template="plotly_dark", hovermode="x unified", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), yaxis=dict(gridcolor="rgba(255,255,255,0.2)") ) st.plotly_chart(fig, use_container_width=True) # --- Price/NAV Premium-Discount Chart --- def premium_discount_chart(tickers: List[str], df: pd.DataFrame, debug: bool = False): """Plot Premium/Discount to NAV for selected ETFs.""" # Create dataframe for the bar chart premium_data = [] for ticker in tickers: try: # Get data from our processed dataframe ticker_row = df[df["Ticker"] == ticker] if ticker_row.empty: continue premium = ticker_row["Premium/Discount (%)"].iloc[0] if premium is None: continue premium_data.append({ "Ticker": ticker, "Premium/Discount (%)": premium }) except Exception as e: if debug: st.write(f"DEBUG: Error getting premium/discount for {ticker}: {str(e)}") continue if not premium_data: st.info("No premium/discount data available for the selected ETFs.") return premium_df = pd.DataFrame(premium_data) # Create the bar chart fig = px.bar( premium_df, x="Ticker", y="Premium/Discount (%)", title="Current Premium/Discount to NAV", template="plotly_dark", color="Premium/Discount (%)", color_continuous_scale=["red", "white", "green"], range_color=[-5, 5] # Typical range for premium/discount ) # Add a reference line at 0 fig.add_hline( y=0, line_width=1, line_dash="dash", line_color="white", annotation_text="NAV", annotation_position="bottom right" ) fig.update_layout( yaxis=dict(zeroline=True, zerolinewidth=2, zerolinecolor="rgba(255,255,255,0.5)") ) st.plotly_chart(fig, use_container_width=True) # --- Portfolio Summary --- def portfolio_summary(df: pd.DataFrame): """Display portfolio summary metrics.""" total_capital = df["Capital Allocated ($)"].sum() total_income = df["Income Contributed ($)"].sum() weighted_yield = (df["Income Contributed ($)"] * df["Yield (%)"]).sum() / total_income if total_income else 0 col1, col2, col3 = st.columns(3) with col1: st.metric("Total Capital", f"${total_capital:,.2f}") with col2: st.metric("Annual Income", f"${total_income:,.2f}") st.metric("Monthly Income", f"${total_income/12:,.2f}") with col3: st.metric("Weighted Yield", f"{weighted_yield:.2f}%") # --- Allocation Functions --- def allocate_for_income(df: pd.DataFrame, target: float, allocations: List[Dict]) -> pd.DataFrame: """Allocate capital to ETFs to meet the annual income target.""" try: # Store initial capital in session state st.session_state.initial_capital = target st.session_state.mode = "Income Target" st.session_state.target = target results = [] weighted_yield = 0 for _, row in df.iterrows(): ticker = row["Ticker"] alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) if alloc_pct == 0: continue weighted_yield += alloc_pct * (row["Yield (%)"] / 100) if weighted_yield <= 0: st.error("Weighted yield is zero or negative. Check ETF data.") return None total_capital = target / weighted_yield for _, row in df.iterrows(): ticker = row["Ticker"] alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) if alloc_pct == 0: continue capital = total_capital * alloc_pct shares = capital / row["Price"] income = shares * row["Dividend Rate"] # Create result dictionary with all available data result = { "Ticker": ticker, "Yield (%)": row["Yield (%)"], "Dividend Rate": round(row["Dividend Rate"], 2), "Capital Allocated ($)": round(capital, 2), "Income Contributed ($)": round(income, 2), "Allocation (%)": round(alloc_pct * 100, 2), "Inception Date": row["Inception Date"], "Distribution Period": row.get("Distribution Period", "Unknown"), "Price": row["Price"] } # Add NAV data if available if "NAV" in row and row["NAV"] is not None: result["NAV"] = row["NAV"] if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: result["Premium/Discount (%)"] = row["Premium/Discount (%)"] results.append(result) alloc_df = pd.DataFrame(results) alloc_df = assign_risk_level(alloc_df, allocations) return alloc_df except Exception as e: st.error(f"Error in income allocation: {str(e)}") return None def allocate_for_capital(df: pd.DataFrame, capital: float, allocations: List[Dict]) -> pd.DataFrame: """Allocate a fixed amount of capital across ETFs and calculate resulting income.""" try: # Store initial capital in session state st.session_state.initial_capital = capital st.session_state.mode = "Capital Target" st.session_state.target = capital results = [] total_income = 0 for _, row in df.iterrows(): ticker = row["Ticker"] alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) if alloc_pct == 0: continue # Calculate capital allocation allocated_capital = capital * alloc_pct shares = allocated_capital / row["Price"] income = shares * row["Dividend Rate"] total_income += income # Create result dictionary with all available data result = { "Ticker": ticker, "Yield (%)": row["Yield (%)"], "Dividend Rate": round(row["Dividend Rate"], 2), "Capital Allocated ($)": round(allocated_capital, 2), "Income Contributed ($)": round(income, 2), "Allocation (%)": round(alloc_pct * 100, 2), "Inception Date": row["Inception Date"], "Distribution Period": row.get("Distribution Period", "Unknown"), "Price": row["Price"] } # Add NAV data if available if "NAV" in row and row["NAV"] is not None: result["NAV"] = row["NAV"] if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: result["Premium/Discount (%)"] = row["Premium/Discount (%)"] results.append(result) alloc_df = pd.DataFrame(results) alloc_df = assign_risk_level(alloc_df, allocations) return alloc_df except Exception as e: st.error(f"Error in capital allocation: {str(e)}") return None # --- Portfolio Management Functions --- def recalculate_portfolio(allocations): """Recalculate portfolio with new allocations""" try: # Get the initial capital from session state initial_capital = st.session_state.get('initial_capital') if initial_capital is None: st.error("Initial capital not found. Please run the portfolio simulation first.") return None # Create a new DataFrame for the recalculated portfolio final_alloc = pd.DataFrame() # Add tickers and their allocations final_alloc["Ticker"] = list(allocations.keys()) final_alloc["Allocation (%)"] = list(allocations.values()) # Merge with the global df to get other information final_alloc = final_alloc.merge(df, on="Ticker", how="left") # Calculate capital allocation final_alloc["Capital Allocated ($)"] = final_alloc["Allocation (%)"] * initial_capital / 100 # Calculate income contribution final_alloc["Income Contributed ($)"] = final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"] / 100 # Store the recalculated portfolio in session state st.session_state.final_alloc = final_alloc return final_alloc except Exception as e: st.error(f"Error recalculating portfolio: {str(e)}") return None def update_allocation(ticker, new_alloc): """Update the allocation for a specific ticker.""" st.session_state.etf_allocations[ticker] = new_alloc # --- DRIP Calculation Function --- def calculate_drip_growth(portfolio_df: pd.DataFrame, months: int = DRIP_FORECAST_MONTHS, erosion_type: str = "None", erosion_level: Any = 0) -> pd.DataFrame: """ Calculate the growth of a portfolio with dividend reinvestment (DRIP) over time. Args: portfolio_df: DataFrame containing portfolio allocation data months: Number of months to forecast erosion_type: Type of erosion simulation ("None", "NAV & Yield Erosion") erosion_level: Erosion configuration (0 or dict with global and per-ticker settings) Returns: DataFrame with monthly portfolio growth data (exactly 'months' rows) """ # Extract needed data initial_capital = portfolio_df["Capital Allocated ($)"].sum() tickers = portfolio_df["Ticker"].tolist() # Calculate monthly erosion rate(s) if applicable max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion # Initialize erosion rates for each ticker ticker_nav_rates = {} ticker_yield_rates = {} # Handle different erosion configurations if erosion_type != "None" and isinstance(erosion_level, dict): # Check if using per-ticker rates if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: # Get global defaults global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion # Apply per-ticker rates where available, global rates otherwise for ticker in tickers: ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion else: # Use global rates for all tickers global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion for ticker in tickers: ticker_nav_rates[ticker] = global_nav ticker_yield_rates[ticker] = global_yield else: # No erosion for ticker in tickers: ticker_nav_rates[ticker] = 0 ticker_yield_rates[ticker] = 0 # Create a dictionary of ticker-specific data for easier access ticker_data = {} for _, row in portfolio_df.iterrows(): ticker = row["Ticker"] ticker_data[ticker] = { "price": row["Price"], "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal "initial_shares": row["Capital Allocated ($)"] / row["Price"], "initial_allocation": row["Allocation (%)"] / 100, # Convert from % to decimal "distribution": row.get("Distribution Period", "Monthly") } # Initialize result data structure results = [] # Initial portfolio state current_shares = {ticker: data["initial_shares"] for ticker, data in ticker_data.items()} current_prices = {ticker: data["price"] for ticker, data in ticker_data.items()} current_yields = {ticker: data["yield_annual"] for ticker, data in ticker_data.items()} current_total_value = initial_capital # Calculate the monthly dividend for each ETF based on distribution period dividend_frequency = { "Monthly": 12, "Quarterly": 4, "Semi-Annually": 2, "Annually": 1, "Unknown": 12 # Default to monthly if unknown } # Calculate growth for each month (exactly 'months' total rows) cumulative_income = 0 for month in range(1, months + 1): # Calculate expected monthly income based on current portfolio and yields monthly_income = sum( (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker]) for ticker in tickers ) # Store month data (this reflects the portfolio at the START of the month) month_data = { "Month": month, "Total Value ($)": current_total_value, "Monthly Income ($)": monthly_income, "Cumulative Income ($)": cumulative_income } # Add shares and current price/yield for each ticker for ticker in tickers: month_data[f"{ticker} Shares"] = current_shares[ticker] month_data[f"{ticker} Price ($)"] = current_prices[ticker] month_data[f"{ticker} Yield (%)"] = current_yields[ticker] * 100 # Convert back to percentage results.append(month_data) # After recording the month's data, apply erosion and calculate dividends # for the current month # Apply NAV and yield erosion to each ticker for ticker in tickers: # Apply NAV erosion if ticker_nav_rates[ticker] > 0: current_prices[ticker] *= (1 - ticker_nav_rates[ticker]) # Apply yield erosion if ticker_yield_rates[ticker] > 0: current_yields[ticker] *= (1 - ticker_yield_rates[ticker]) # Calculate dividends for each ETF month_dividends = {} for ticker, data in ticker_data.items(): freq = dividend_frequency[data["distribution"]] # Check if dividend is paid this month if month % (12 / freq) == 0: # Annual dividend / frequency = dividend per distribution # Use current yield if yield erosion is being simulated if ticker_yield_rates[ticker] > 0: dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker] else: dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker] else: dividend = 0 month_dividends[ticker] = dividend # Total dividends for this month total_month_dividend = sum(month_dividends.values()) cumulative_income += total_month_dividend # Only reinvest for the next month if we're not at the last month if month < months: # Reinvest dividends proportionally to original allocation for ticker, data in ticker_data.items(): # Calculate new shares purchased with reinvested dividends # Use current price for calculation if current_prices[ticker] > 0: # Avoid division by zero new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker] current_shares[ticker] += new_shares # Recalculate portfolio value with updated shares and prices current_total_value = sum(current_shares[ticker] * current_prices[ticker] for ticker in tickers) return pd.DataFrame(results) # --- AI Erosion Risk Assessment --- def analyze_etf_erosion_risk(tickers: List[str], debug: bool = False) -> pd.DataFrame: """ Analyze historical ETF data to estimate realistic NAV and yield erosion likelihood. Args: tickers: List of ETF tickers to analyze debug: Whether to show debug information Returns: DataFrame with erosion risk assessment for each ETF """ risk_data = [] current_date = pd.Timestamp.now(tz='UTC') for ticker in tickers: try: yf_ticker = yf.Ticker(ticker) # Get basic info with retry info, _ = fetch_with_retry(lambda: yf_ticker.info) if not info: continue # Get historical price data (5 years or since inception) hist, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y")) if hist.empty: continue # Check ETF age inception_date = info.get("fundInceptionDate") etf_age_years = None if inception_date: try: inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) etf_age_years = (current_date - inception_date_dt).days / 365.25 except: pass # Get historical dividends dividends, _ = fetch_with_retry(lambda: yf_ticker.dividends) if dividends is None or dividends.empty: continue # Calculate historical metrics # 1. NAV Erosion Analysis (using price as proxy for NAV) # Calculate max drawdowns in different timeframes rolling_max = hist["Close"].rolling(window=252, min_periods=1).max() daily_drawdown = hist["Close"] / rolling_max - 1.0 max_drawdown_1y = abs(daily_drawdown[-252:].min()) if len(daily_drawdown) >= 252 else None # Calculate annualized volatility returns = hist["Close"].pct_change().dropna() volatility = returns.std() * (252**0.5) # Annualized # 2. Yield Erosion Analysis # Convert to pandas Series with a DatetimeIndex if not isinstance(dividends, pd.Series): dividends = dividends.reset_index() dividends.columns = ["date", "amount"] dividends = dividends.set_index("date")["amount"] # Calculate rolling 12-month dividend total monthly_div = dividends.resample('M').sum() rolling_12m_div = monthly_div.rolling(window=12, min_periods=6).sum() # Calculate the trend over time if len(rolling_12m_div) > 12: earliest_ttm = rolling_12m_div[11] latest_ttm = rolling_12m_div[-1] if earliest_ttm > 0: dividend_trend = (latest_ttm / earliest_ttm) - 1 else: dividend_trend = 0 # Calculate worst dividend reduction div_changes = rolling_12m_div.pct_change() worst_div_change = div_changes.min() if not div_changes.empty else 0 else: dividend_trend = None worst_div_change = None # 3. Risk Assessment # Determine if ETF is new or established is_new = etf_age_years is not None and etf_age_years < 2 # Assign erosion risk levels if is_new: # For new ETFs, use higher default risk nav_erosion_risk = 5 # Medium yield_erosion_risk = 6 # Medium-high nav_risk_reason = "New ETF without significant history" yield_risk_reason = "New ETF dividend pattern not established" else: # For established ETFs, base on historical data # NAV Erosion Risk (0-9 scale) if max_drawdown_1y is not None: if max_drawdown_1y > 0.40: nav_erosion_risk = 7 # High risk nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year" elif max_drawdown_1y > 0.25: nav_erosion_risk = 5 # Medium risk nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year" elif max_drawdown_1y > 0.15: nav_erosion_risk = 3 # Lower-medium risk nav_risk_reason = f"Moderate {max_drawdown_1y:.1%} max drawdown in the past year" else: nav_erosion_risk = 2 # Low risk nav_risk_reason = f"Limited {max_drawdown_1y:.1%} max drawdown in the past year" else: nav_erosion_risk = 4 # Default medium-low nav_risk_reason = "Insufficient price history" # Yield Erosion Risk (0-9 scale) if worst_div_change is not None and not pd.isna(worst_div_change): if worst_div_change < -0.30: yield_erosion_risk = 8 # Very high risk yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" elif worst_div_change < -0.15: yield_erosion_risk = 6 # High risk yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" elif worst_div_change < -0.05: yield_erosion_risk = 4 # Medium risk yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" elif dividend_trend is not None and dividend_trend < -0.10: yield_erosion_risk = 5 # Medium risk due to declining trend yield_risk_reason = f"Dividend trend declined by {abs(dividend_trend):.1%}" elif dividend_trend is not None and dividend_trend > 0.10: yield_erosion_risk = 2 # Low risk due to growing trend yield_risk_reason = f"Dividend trend growing by {dividend_trend:.1%}" else: yield_erosion_risk = 3 # Low-medium risk yield_risk_reason = "Stable dividend history" else: yield_erosion_risk = 4 # Default medium yield_risk_reason = "Insufficient dividend history" # Adjust for volatility if volatility > 0.40: nav_erosion_risk = min(9, nav_erosion_risk + 2) # Increase risk for high volatility nav_risk_reason += f" with high volatility ({volatility:.1%})" elif volatility > 0.25: nav_erosion_risk = min(9, nav_erosion_risk + 1) # Slightly increase risk nav_risk_reason += f" with elevated volatility ({volatility:.1%})" # Convert to annual erosion percentage estimate nav_erosion_pct = nav_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion yield_erosion_pct = yield_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion risk_data.append({ "Ticker": ticker, "NAV Erosion Risk (0-9)": nav_erosion_risk, "Yield Erosion Risk (0-9)": yield_erosion_risk, "Estimated Annual NAV Erosion": f"{nav_erosion_pct:.1%}", "Estimated Annual Yield Erosion": f"{yield_erosion_pct:.1%}", "NAV Risk Explanation": nav_risk_reason, "Yield Risk Explanation": yield_risk_reason, "ETF Age (Years)": etf_age_years, "Is New ETF": is_new, "Max Drawdown (1Y)": max_drawdown_1y, "Volatility (Annual)": volatility, "Dividend Trend": dividend_trend }) except Exception as e: if debug: st.error(f"Error analyzing {ticker}: {str(e)}") continue return pd.DataFrame(risk_data) # --- Streamlit Setup --- st.title("💸 ETF Dividend Portfolio Builder") # Initialize session state for real-time updates if "simulation_run" not in st.session_state: st.session_state.simulation_run = False if "df_data" not in st.session_state: st.session_state.df_data = None if "edited_allocations" not in st.session_state: st.session_state.edited_allocations = None if "show_recalculation" not in st.session_state: st.session_state.show_recalculation = False if "simulation_mode" not in st.session_state: st.session_state.simulation_mode = "income_target" # Default mode if "drip_enabled" not in st.session_state: st.session_state.drip_enabled = False # Default DRIP setting if "erosion_level" not in st.session_state: st.session_state.erosion_level = 0 # Default erosion level if "erosion_type" not in st.session_state: st.session_state.erosion_type = "None" # Default erosion type if "run_fmp_test" not in st.session_state: st.session_state.run_fmp_test = False # Flag for FMP API test if "fmp_api_key" not in st.session_state: st.session_state.fmp_api_key = os.environ.get("FMP_API_KEY", "") # FMP API key if "api_calls" not in st.session_state: st.session_state.api_calls = 0 # API call counter if "force_refresh_data" not in st.session_state: st.session_state.force_refresh_data = False # Flag to force refresh data # Radio button to select simulation mode simulation_mode = st.sidebar.radio( "Choose Simulation Mode", options=["Income Target Mode", "Capital Investment Mode"], index=0 if st.session_state.simulation_mode == "income_target" else 1, help="Choose whether to start with a monthly income goal or a fixed capital amount" ) # Update session state with selected mode st.session_state.simulation_mode = "income_target" if simulation_mode == "Income Target Mode" else "capital_investment" # Add DRIP toggle drip_enabled = st.sidebar.toggle( "Enable Dividend Reinvestment (DRIP)", value=st.session_state.drip_enabled, help="When enabled, shows how reinvesting dividends compounds growth over time instead of taking income" ) st.session_state.drip_enabled = drip_enabled # Initialize erosion_type from session state erosion_type = st.session_state.erosion_type # Add erosion simulation slider st.sidebar.subheader("Portfolio Risk Simulation") erosion_enabled = st.sidebar.checkbox( "Enable NAV & Yield Erosion", value=erosion_type != "None", help="Simulate price drops and dividend cuts over time" ) # Define max erosion constants MAX_EROSION_LEVEL = 9 max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion if erosion_enabled: # Store the previous erosion type erosion_type = "NAV & Yield Erosion" st.session_state.erosion_type = erosion_type # Initialize per-ticker erosion settings if not already in session state if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): st.session_state.per_ticker_erosion = {} # Create advanced per-ticker erosion controls st.sidebar.subheader("ETF Erosion Settings") st.sidebar.write("Set custom erosion levels for each ETF") # Use the ETFs from the final allocation if simulation has run if st.session_state.simulation_run and hasattr(st.session_state, 'final_alloc'): tickers = st.session_state.final_alloc["Ticker"].unique().tolist() # Otherwise use the ETFs from user input elif "etf_allocations" in st.session_state: tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] else: tickers = [] # Initialize or update per-ticker erosion settings per_ticker_erosion = {} if tickers: # Create a DataFrame for the per-ticker settings per_ticker_data = [] for ticker in tickers: # Get existing settings or use defaults (5 = medium erosion) existing_settings = st.session_state.per_ticker_erosion.get(ticker, { "nav": 5, "yield": 5 }) per_ticker_data.append({ "Ticker": ticker, "NAV Erosion (0-9)": existing_settings["nav"], "Yield Erosion (0-9)": existing_settings["yield"], }) # Create a data editor for the per-ticker settings per_ticker_df = pd.DataFrame(per_ticker_data) edited_df = st.sidebar.data_editor( per_ticker_df, column_config={ "Ticker": st.column_config.TextColumn("Ticker", disabled=True), "NAV Erosion (0-9)": st.column_config.NumberColumn( "NAV Erosion (0-9)", min_value=0, max_value=MAX_EROSION_LEVEL, step=1, format="%d" ), "Yield Erosion (0-9)": st.column_config.NumberColumn( "Yield Erosion (0-9)", min_value=0, max_value=MAX_EROSION_LEVEL, step=1, format="%d" ), }, use_container_width=True, num_rows="fixed", hide_index=True, key="per_ticker_editor" ) # Save the edited values back to session state for _, row in edited_df.iterrows(): ticker = row["Ticker"] per_ticker_erosion[ticker] = { "nav": row["NAV Erosion (0-9)"], "yield": row["Yield Erosion (0-9)"] } st.session_state.per_ticker_erosion = per_ticker_erosion # Calculate some example annual erosion rates for display max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion # Show sample erosion rates for different levels st.sidebar.info(""" **Erosion Level Guide:** - Level 0: No erosion (0%) - Level 5: Medium erosion (~40% annually) - Level 9: Severe erosion (~90% annually) """) else: st.sidebar.write("Add ETFs to enable per-ETF erosion settings") # Always use per-ticker settings st.session_state.use_per_ticker_erosion = True # Store erosion settings for DRIP calculation erosion_level = { "global": { "nav": 5, # Default medium level for global fallback "yield": 5 }, "per_ticker": st.session_state.per_ticker_erosion, "use_per_ticker": True } # Update session state erosion level to match current settings st.session_state.erosion_level = erosion_level else: # No erosion erosion_type = "None" st.session_state.erosion_type = erosion_type erosion_level = 0 # Display appropriate input field based on mode if st.session_state.simulation_mode == "income_target": monthly_target = st.sidebar.number_input( "Monthly Income Target ($)", value=1500, min_value=100, help="Desired monthly dividend income. We'll calculate the required capital." ) initial_capital = None ANNUAL_TARGET = monthly_target * 12 else: initial_capital = st.sidebar.number_input( "Initial Capital ($)", value=250000, min_value=1000, help="Amount of capital you want to invest. We'll calculate the expected monthly income." ) monthly_target = None ANNUAL_TARGET = None # Will be calculated based on allocations and yields # Add PDF export information with st.sidebar.expander("PDF Export Information"): st.info(""" For PDF export functionality, you'll need to install wkhtmltopdf on your system: - **Windows**: Download from [wkhtmltopdf.org](https://wkhtmltopdf.org/downloads.html) and install - **Mac**: Run `brew install wkhtmltopdf` in Terminal - **Linux**: Run `sudo apt-get install wkhtmltopdf` for Debian/Ubuntu or `sudo yum install wkhtmltopdf` for CentOS/RHEL After installation, restart the app for PDF export to work correctly. """) # Manual ETF allocation input st.sidebar.header("ETF Allocation") if "etf_allocations" not in st.session_state: st.session_state.etf_allocations = [] # Only show input fields if not in real-time adjustment mode or if no ETFs added yet if not st.session_state.simulation_run or not st.session_state.etf_allocations: col1, col2 = st.sidebar.columns([2, 1]) with col1: new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., JEPI)") with col2: new_allocation = st.number_input( "Allocation (%)", min_value=0.0, max_value=100.0, value=0.0, step=1.0, help="Percentage of total capital" ) # Add button to add ETF add_etf_button = st.sidebar.button("ADD ETF", use_container_width=True) if add_etf_button and new_ticker and new_allocation > 0: # Check if ticker already exists if any(etf["ticker"] == new_ticker.upper() for etf in st.session_state.etf_allocations): st.sidebar.warning(f"{new_ticker.upper()} is already in your portfolio. Please adjust its allocation instead.") else: # Validate ticker exists and has data before adding validation_status = st.sidebar.empty() validation_status.info(f"Validating {new_ticker.upper()}...") is_valid = False # Check ticker format first if not re.match(r'^[A-Z]{1,7}$', new_ticker.upper()): validation_status.error(f"Invalid ticker: {new_ticker.upper()}. Must be 1-7 uppercase letters.") else: try: # Check if ticker has data yf_ticker = yf.Ticker(new_ticker.upper()) info, _ = fetch_with_retry(lambda: yf_ticker.info) if info and info.get("previousClose"): is_valid = True validation_status.success(f"Validated {new_ticker.upper()} successfully.") else: # If YFinance fails, try FMP API for high-yield ETFs if enabled if USE_FMP_API and st.session_state.get("fmp_api_key"): fmp_data, _ = fetch_fmp_data(new_ticker.upper()) if fmp_data["quote"] and len(fmp_data["quote"]) > 0: is_valid = True validation_status.success(f"Validated {new_ticker.upper()} using FMP API data.") if not is_valid: validation_status.error(f"Could not validate {new_ticker.upper()}. No price data available.") except Exception as e: validation_status.error(f"Error validating {new_ticker.upper()}: {str(e)}") if is_valid: # Add new ETF to allocations st.session_state.etf_allocations.append({ "ticker": new_ticker.upper(), "allocation": new_allocation }) st.sidebar.success(f"Added {new_ticker.upper()} with {new_allocation}% allocation.") st.rerun() elif add_etf_button: # Show error if missing data if not new_ticker: st.sidebar.error("Please enter an ETF ticker.") if new_allocation <= 0: st.sidebar.error("Allocation must be greater than 0%.") # Calculate total allocation after potential addition total_alloc = sum(etf["allocation"] for etf in st.session_state.etf_allocations) if st.session_state.etf_allocations else 0 # Display ETF allocations if st.session_state.etf_allocations: st.sidebar.subheader("Selected ETFs") alloc_df = pd.DataFrame(st.session_state.etf_allocations) alloc_df["Remove"] = [st.button(f"Remove {etf['ticker']}", key=f"remove_{i}") for i, etf in enumerate(st.session_state.etf_allocations)] st.sidebar.dataframe(alloc_df[["ticker", "allocation"]], use_container_width=True) st.sidebar.metric("Total Allocation (%)", f"{total_alloc:.2f}") if total_alloc > 100: st.error(f"Total allocation is {total_alloc:.2f}%, which exceeds 100%. Please adjust allocations.") # Advanced Options section in sidebar with st.sidebar.expander("Advanced Options"): # Option to toggle FMP API usage use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API, help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs") if use_fmp_api != USE_FMP_API: # Update global setting if changed globals()["USE_FMP_API"] = use_fmp_api st.success("FMP API usage setting updated") # Add FMP API Key input fmp_api_key = st.text_input( "FMP API Key", value=os.environ.get("FMP_API_KEY", st.session_state.get("fmp_api_key", "")), type="password", help="Enter your Financial Modeling Prep API key for more accurate yield data." ) if fmp_api_key: st.session_state.fmp_api_key = fmp_api_key # Add cache controls st.subheader("Cache Settings") # Display cache statistics cache_stats = get_cache_stats() st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)") # Force refresh option st.session_state.force_refresh_data = st.checkbox( "Force refresh data (ignore cache)", value=st.session_state.get("force_refresh_data", False), help="When enabled, always fetch fresh data from APIs" ) # Cache clearing options col1, col2 = st.columns(2) with col1: if st.button("Clear All Cache", key="clear_all_cache"): clear_cache() st.success("All cache files cleared!") st.session_state.api_calls = 0 with col2: ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker") if st.button("Clear") and ticker_to_clear: clear_cache(ticker_to_clear) st.success(f"Cache cleared for {ticker_to_clear.upper()}") # Show API call counter st.write(f"API calls this session: {st.session_state.api_calls}") # Add button to run the FMP API test if st.button("Test FMP API Connection"): # Check if API key is available if not fmp_api_key and not os.environ.get("FMP_API_KEY"): st.error("Please provide an FMP API key first.") else: st.info("Opening FMP API test panel...") # Set a flag to trigger the test in the main UI st.session_state.run_fmp_test = True st.rerun() # Add option for debug mode and parallel processing debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.") parallel_processing = st.checkbox("Enable Parallel Processing", value=True, help="Fetch data for multiple ETFs simultaneously") api_key = st.sidebar.text_input( "OpenAI API Key", type="password", help="Enter your OpenAI API key for ChatGPT summaries." ) # Show simulation button only initially, hide after simulation is run if not st.session_state.simulation_run: run_simulation = st.sidebar.button("Run Simulation", help="Launch capital allocation simulation", disabled=abs(total_alloc - 100) > 1 or total_alloc == 0) else: run_simulation = False st.sidebar.success("Simulation ready - adjust allocations in the table below") if st.sidebar.button("Reset Simulation", help="Start over with new ETFs"): st.session_state.simulation_run = False st.session_state.df_data = None st.session_state.edited_allocations = None st.session_state.show_recalculation = False st.rerun() refresh_button = st.sidebar.button("Refresh Data", help="Clear cache and fetch new data") # Handle remove buttons for i, etf in enumerate(st.session_state.etf_allocations.copy()): if st.session_state.get(f"remove_{i}"): st.session_state.etf_allocations.pop(i) st.rerun() # --- Run App Logic --- if refresh_button: st.session_state.force_refresh_data = True # Force refresh when manually requested st.cache_data.clear() st.rerun() # Check if FMP API test should be run if st.session_state.get("run_fmp_test", False): # Display the FMP API test UI st.header("🔍 FMP API Test Tool") st.write(""" This tool allows you to test the Financial Modeling Prep API responses for ETF yield data. Use this to verify accurate dividend yield information, especially for high-yield ETFs. """) # Clear the flag so it won't show again unless requested st.session_state.run_fmp_test = False # Run the API test function test_fmp_api() # Stop execution to prevent the main app from rendering st.stop() # Initial simulation run if run_simulation: try: with st.spinner("Validating ETFs..."): tickers = validate_etf_input(st.session_state.etf_allocations) if not tickers: st.stop() with st.spinner("Fetching ETF data..."): df, skipped = fetch_etfs(",".join(tickers), debug_mode, parallel_processing) # Store data in session state for reuse st.session_state.df_data = df if not df.empty: # Run appropriate allocation based on mode if st.session_state.simulation_mode == "income_target": final_alloc = allocate_for_income(df, ANNUAL_TARGET, st.session_state.etf_allocations) else: final_alloc = allocate_for_capital(df, initial_capital, st.session_state.etf_allocations) if final_alloc.empty: st.error("Failed to allocate capital. Check ETF data or allocations.") st.stop() # Mark simulation as run successfully st.session_state.simulation_run = True st.session_state.final_alloc = final_alloc else: st.error("❌ No valid ETF data retrieved. Check tickers or enable Debug Mode for details.") st.session_state.simulation_run = False if skipped: st.subheader("🛑 Skipped Tickers") st.write("The following tickers could not be processed. Enable Debug Mode for detailed logs.") st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]).drop(columns=["Debug Info"]), use_container_width=True) except Exception as e: st.error(f"Simulation failed: {str(e)}. Please check inputs or try again.") st.session_state.simulation_run = False # Display results and interactive allocation adjustment UI after simulation is run if st.session_state.simulation_run and st.session_state.df_data is not None: df = st.session_state.df_data final_alloc = st.session_state.final_alloc if hasattr(st.session_state, 'final_alloc') else None # Create tabs for better organization tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"]) with tab1: st.subheader("💰 Portfolio Summary") portfolio_summary(final_alloc) # Display mode-specific information if st.session_state.simulation_mode == "income_target": st.info(f"🎯 **Income Target Mode**: You need ${final_alloc['Capital Allocated ($)'].sum():,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).") else: annual_income = final_alloc["Income Contributed ($)"].sum() monthly_income = annual_income / 12 st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).") # Display full detailed allocation table st.subheader("📊 Capital Allocation Details") # Format currencies for better readability display_df = final_alloc.copy() # Calculate shares for each ETF display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"] display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}") display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}") display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}") display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%") display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}") # Create a form for the allocation table with st.form("allocation_form"): # Create an editable DataFrame edited_df = st.data_editor( display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]], column_config={ "Ticker": st.column_config.TextColumn("Ticker", disabled=True), "Allocation (%)": st.column_config.NumberColumn( "Allocation (%)", min_value=0.0, max_value=100.0, step=1.0, format="%.1f" ), "Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True), "Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True), "Risk Level": st.column_config.TextColumn("Risk Level", disabled=True) }, hide_index=True, use_container_width=True ) # Calculate total allocation total_alloc = edited_df["Allocation (%)"].sum() # Display total allocation st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=f"{total_alloc - 100:.2f}" if abs(total_alloc - 100) > 0.01 else None) if abs(total_alloc - 100) > 0.01: st.warning("Total allocation should be 100%") # Create columns for quick actions col1, col2, col3 = st.columns(3) with col1: equal_weight = st.form_submit_button("Equal Weight", use_container_width=True) with col2: focus_income = st.form_submit_button("Focus on Income", use_container_width=True) with col3: focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True) # Submit button for manual edits submitted = st.form_submit_button("Update Allocations", disabled=abs(total_alloc - 100) > 1, type="primary", use_container_width=True) # Handle form submission if submitted: try: # Convert the edited allocations to a dictionary new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()} # Convert to the format expected by allocation functions etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()] # Get the mode and target from session state mode = st.session_state.get('mode', 'Capital Target') target = st.session_state.get('target', 0) initial_capital = st.session_state.get('initial_capital', 0) # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio updated with new allocations!") st.rerun() else: st.error("Failed to update portfolio. Please try again.") except Exception as e: st.error(f"Error updating allocations: {str(e)}") # Handle quick actions if equal_weight: try: # Calculate equal weight allocation num_etfs = len(edited_df) equal_allocation = 100 / num_etfs # Create new allocations in the format expected by allocation functions etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state mode = st.session_state.get('mode', 'Capital Target') target = st.session_state.get('target', 0) initial_capital = st.session_state.get('initial_capital', 0) # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to equal weight!") st.rerun() except Exception as e: st.error(f"Error applying equal weight: {str(e)}") elif focus_income: try: # Sort by yield and adjust allocations sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False) total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum() # Calculate new allocations based on yield etf_allocations = [] for _, row in sorted_alloc.iterrows(): yield_val = float(row["Yield (%)"].rstrip('%')) allocation = (yield_val / total_yield) * 100 etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation}) # Get the mode and target from session state mode = st.session_state.get('mode', 'Capital Target') target = st.session_state.get('target', 0) initial_capital = st.session_state.get('initial_capital', 0) # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to focus on income!") st.rerun() except Exception as e: st.error(f"Error focusing on income: {str(e)}") elif focus_capital: try: # Calculate equal weight allocation (same as equal weight) num_etfs = len(edited_df) equal_allocation = 100 / num_etfs # Create new allocations in the format expected by allocation functions etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state mode = st.session_state.get('mode', 'Capital Target') target = st.session_state.get('target', 0) initial_capital = st.session_state.get('initial_capital', 0) # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to focus on capital!") st.rerun() except Exception as e: st.error(f"Error focusing on capital: {str(e)}") # Display charts col1, col2 = st.columns(2) with col1: fig1 = px.bar( final_alloc, x="Ticker", y="Capital Allocated ($)", title="Capital Allocation by ETF", template="plotly_dark", hover_data=["Yield (%)", "Income Contributed ($)", "Allocation (%)", "Risk Level"], labels={"Capital Allocated ($)": "Capital ($)"} ) fig1.update_traces(marker_color="#1f77b4") st.plotly_chart(fig1, use_container_width=True) with col2: fig2 = px.bar( final_alloc, x="Ticker", y="Income Contributed ($)", title="Income Contribution by ETF", template="plotly_dark", hover_data=["Yield (%)", "Capital Allocated ($)", "Allocation (%)", "Risk Level"], labels={"Income Contributed ($)": "Income ($)"} ) fig2.update_traces(marker_color="#ff7f0e") st.plotly_chart(fig2, use_container_width=True) # Display NAV Premium/Discount chart if data is available st.subheader("📈 NAV Premium/Discount") premium_discount_chart(final_alloc["Ticker"].tolist(), df, debug_mode) # Display trend charts trend_tabs = st.tabs(["📉 Yield Trends", "📊 NAV Trends"]) with trend_tabs[0]: yield_chart(final_alloc["Ticker"].tolist(), debug_mode) with trend_tabs[1]: nav_chart(final_alloc["Ticker"].tolist(), debug_mode) with tab2: st.subheader("📈 Dividend Reinvestment (DRIP) Forecast") # Calculate DRIP growth with erosion simulation if enabled drip_forecast = calculate_drip_growth( final_alloc, erosion_type=erosion_type, erosion_level=erosion_level ) # Display explanatory text st.write("This forecast shows the growth of your portfolio over time if dividends are reinvested instead of taken as income.") base_assumptions = "ETF prices remain constant, dividends are reinvested proportionally to original allocations" # Show erosion information if enabled if erosion_type != "None" and isinstance(erosion_level, dict): # Check if using per-ticker rates if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: st.write("**Erosion Simulation:** Custom erosion rates applied per ETF") # Format the per-ticker erosion rates for display per_ticker_display = [] for ticker in tickers: if ticker in erosion_level["per_ticker"]: ticker_settings = erosion_level["per_ticker"][ticker] nav_rate = (1 - (1 - (ticker_settings["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 yield_rate = (1 - (1 - (ticker_settings["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 per_ticker_display.append({ "Ticker": ticker, "NAV Erosion (Annual %)": f"{nav_rate:.1f}%", "Yield Erosion (Annual %)": f"{yield_rate:.1f}%" }) # Display the per-ticker settings in a table st.dataframe( pd.DataFrame(per_ticker_display), use_container_width=True, hide_index=True ) st.write(f"Assumptions: {base_assumptions}, with custom erosion applied monthly per ETF.") else: # Global rates only nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 st.write(f"**Erosion Simulation:** NAV erosion at {nav_annual:.1f}% annual rate, Yield erosion at {yield_annual:.1f}% annual rate") st.write(f"Assumptions: {base_assumptions}, with erosion applied monthly to all ETFs.") else: st.write(f"Assumptions: {base_assumptions}.") # Create columns for key metrics col1, col2, col3, col4 = st.columns(4) # Extract key metrics from forecast initial_value = drip_forecast["Total Value ($)"].iloc[0] final_value = drip_forecast["Total Value ($)"].iloc[-1] value_growth = final_value - initial_value value_growth_pct = (value_growth / initial_value) * 100 initial_income = drip_forecast["Monthly Income ($)"].iloc[0] final_income = drip_forecast["Monthly Income ($)"].iloc[-1] income_growth = final_income - initial_income income_growth_pct = (income_growth / initial_income) * 100 total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1] capital_recovery_pct = (total_dividends / initial_value) * 100 # Display key metrics with col1: st.metric( "Portfolio Value Growth", f"${value_growth:,.2f}", f"{value_growth_pct:.2f}%" ) with col2: st.metric( "Monthly Income Growth", f"${income_growth:,.2f}", f"{income_growth_pct:.2f}%" ) with col3: st.metric( "Total Dividends Earned", f"${total_dividends:,.2f}" ) with col4: st.metric( "Capital Recovery", f"{capital_recovery_pct:.2f}%" ) # Display a line chart showing portfolio growth st.subheader("Portfolio Value Growth") fig1 = px.line( drip_forecast, x="Month", y="Total Value ($)", title="Portfolio Value Growth with DRIP", markers=True, template="plotly_dark", ) fig1.update_traces(line=dict(color="#1f77b4", width=3)) fig1.update_layout( xaxis=dict(tickmode='linear', tick0=0, dtick=1), yaxis=dict(title="Portfolio Value ($)") ) st.plotly_chart(fig1, use_container_width=True) # Display a line chart showing monthly income growth st.subheader("Monthly Income Growth") fig2 = px.line( drip_forecast, x="Month", y="Monthly Income ($)", title="Monthly Income Growth with DRIP", markers=True, template="plotly_dark" ) fig2.update_traces(line=dict(color="#ff7f0e", width=3)) fig2.update_layout( xaxis=dict(tickmode='linear', tick0=0, dtick=1), yaxis=dict(title="Monthly Income ($)") ) st.plotly_chart(fig2, use_container_width=True) # Display detailed forecast table st.subheader("DRIP Forecast Details") # Format the data for display display_forecast = drip_forecast.copy() display_forecast["Total Value ($)"] = display_forecast["Total Value ($)"].apply(lambda x: f"${x:,.2f}") display_forecast["Monthly Income ($)"] = display_forecast["Monthly Income ($)"].apply(lambda x: f"${x:,.2f}") display_forecast["Cumulative Income ($)"] = display_forecast["Cumulative Income ($)"].apply(lambda x: f"${x:,.2f}") # Format share counts and prices share_columns = [col for col in display_forecast.columns if "Shares" in col] price_columns = [col for col in display_forecast.columns if "Price" in col] yield_columns = [col for col in display_forecast.columns if "Yield (%)" in col] for col in share_columns: display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.4f}") for col in price_columns: display_forecast[col] = display_forecast[col].apply(lambda x: f"${x:.2f}") for col in yield_columns: display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.2f}%") # Create a more organized view by grouping columns basic_columns = ["Month", "Total Value ($)", "Monthly Income ($)", "Cumulative Income ($)"] # Create tabs for different views of the data detail_tabs = st.tabs(["Summary View", "Full Details"]) with detail_tabs[0]: st.dataframe(display_forecast[basic_columns], use_container_width=True) with detail_tabs[1]: # Group columns by ticker for better readability ticker_columns = {} for ticker in tickers: ticker_columns[ticker] = [ f"{ticker} Shares", f"{ticker} Price ($)", f"{ticker} Yield (%)" ] # Create ordered columns list: first basic columns, then grouped by ticker ordered_columns = basic_columns.copy() for ticker in tickers: ordered_columns.extend(ticker_columns[ticker]) st.dataframe( display_forecast[ordered_columns], use_container_width=True, height=500 # Increase height to show more rows ) # Add comparison between DRIP and No-DRIP strategies st.subheader("📊 1-Year DRIP vs. No-DRIP Comparison") # Add note about erosion effects if applicable if erosion_type != "None" and isinstance(erosion_level, dict): if erosion_level.get("use_per_ticker", False): st.info(""" This comparison factors in the custom per-ETF erosion rates. Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. """) else: nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 st.info(f""" This comparison factors in: - NAV Erosion: {nav_annual:.1f}% annually - Yield Erosion: {yield_annual:.1f}% annually Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. """) # Calculate no-drip scenario (taking dividends as income) initial_value = drip_forecast["Total Value ($)"].iloc[0] initial_monthly_income = drip_forecast["Monthly Income ($)"].iloc[0] annual_income = initial_monthly_income * 12 # Get the final prices after erosion from the last month of the DRIP forecast final_prices = {} for ticker in tickers: price_col = f"{ticker} Price ($)" if price_col in drip_forecast.columns: final_prices[ticker] = drip_forecast[price_col].iloc[-1] else: # Fallback to initial price if column doesn't exist final_prices[ticker] = ticker_data_dict[ticker]["price"] # Extract initial shares for each ETF from month 1 initial_shares = {ticker: drip_forecast.iloc[0][f"{ticker} Shares"] for ticker in tickers} # Calculate the No-DRIP final value by multiplying initial shares by final prices # This correctly accounts for NAV erosion while keeping shares constant nodrip_final_value = sum(initial_shares[ticker] * final_prices[ticker] for ticker in tickers) # The final income should account for erosion but not compounding growth # This requires simulation of the erosion that would have happened if erosion_type != "None" and isinstance(erosion_level, dict): # Initialize the current prices and yields from the final_alloc dataframe ticker_data_dict = {} current_prices = {} current_yields = {} # Reconstruct ticker data from final_alloc for _, row in final_alloc.iterrows(): ticker = row["Ticker"] ticker_data_dict[ticker] = { "price": row["Price"], "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal "distribution": row.get("Distribution Period", "Monthly") } current_prices[ticker] = row["Price"] current_yields[ticker] = row["Yield (%)"] / 100 # Get the erosion rates for each ticker if erosion_level.get("use_per_ticker", False): ticker_nav_rates = {} ticker_yield_rates = {} for ticker in tickers: ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion else: # Use global rates for all tickers global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion ticker_nav_rates = {ticker: global_nav for ticker in tickers} ticker_yield_rates = {ticker: global_yield for ticker in tickers} # Apply 12 months of erosion for month in range(1, 13): # Apply erosion to each ticker for ticker in tickers: # Apply NAV erosion if ticker_nav_rates[ticker] > 0: current_prices[ticker] *= (1 - ticker_nav_rates[ticker]) # Apply yield erosion if ticker_yield_rates[ticker] > 0: current_yields[ticker] *= (1 - ticker_yield_rates[ticker]) # Calculate final monthly income with eroded prices and yields but original shares final_monthly_income_nodrip = sum( (current_yields[ticker] / 12) * (initial_shares[ticker] * current_prices[ticker]) for ticker in tickers ) else: # No erosion, so final income is the same as initial income final_monthly_income_nodrip = initial_monthly_income nodrip_final_annual_income = final_monthly_income_nodrip * 12 # Get values for DRIP scenario from forecast drip_final_value = drip_forecast["Total Value ($)"].iloc[-1] drip_final_monthly_income = drip_forecast["Monthly Income ($)"].iloc[-1] drip_annual_income_end = drip_final_monthly_income * 12 # Create comparison dataframe with withdrawn income for a more complete financial picture # For No-DRIP strategy, calculate cumulative withdrawn income (sum of monthly dividends) # This is equivalent to the cumulative income in the DRIP forecast, but in No-DRIP it's withdrawn withdrawn_income = 0 monthly_dividends = [] # Reconstruct the monthly dividend calculation for No-DRIP current_prices_monthly = {ticker: ticker_data_dict[ticker]["price"] for ticker in tickers} current_yields_monthly = {ticker: ticker_data_dict[ticker]["yield_annual"] for ticker in tickers} for month in range(1, 13): # Calculate dividends for this month based on current yields and prices month_dividend = sum( (current_yields_monthly[ticker] / 12) * (initial_shares[ticker] * current_prices_monthly[ticker]) for ticker in tickers ) withdrawn_income += month_dividend monthly_dividends.append(month_dividend) # Apply erosion for next month if erosion_type != "None": for ticker in tickers: # Apply NAV erosion if ticker in ticker_nav_rates and ticker_nav_rates[ticker] > 0: current_prices_monthly[ticker] *= (1 - ticker_nav_rates[ticker]) # Apply yield erosion if ticker in ticker_yield_rates and ticker_yield_rates[ticker] > 0: current_yields_monthly[ticker] *= (1 - ticker_yield_rates[ticker]) # Calculate total economic result (final value + withdrawn income) nodrip_economic_result = nodrip_final_value + withdrawn_income drip_economic_result = drip_final_value # No withdrawals comparison_data = { "Strategy": ["Take Income (No DRIP)", "Reinvest Dividends (DRIP)"], "Initial Portfolio Value": [f"${initial_value:,.2f}", f"${initial_value:,.2f}"], "Final Portfolio Value": [f"${nodrip_final_value:,.2f}", f"${drip_final_value:,.2f}"], "Value Change": [ f"${nodrip_final_value - initial_value:,.2f} ({(nodrip_final_value/initial_value - 1)*100:.2f}%)", f"${drip_final_value - initial_value:,.2f} ({(drip_final_value/initial_value - 1)*100:.2f}%)" ], "Income Withdrawn": [f"${withdrawn_income:,.2f}", "$0.00"], "Total Economic Result": [ f"${nodrip_economic_result:,.2f} ({(nodrip_economic_result/initial_value - 1)*100:.2f}%)", f"${drip_economic_result:,.2f} ({(drip_economic_result/initial_value - 1)*100:.2f}%)" ], "Initial Monthly Income": [f"${initial_monthly_income:,.2f}", f"${initial_monthly_income:,.2f}"], "Final Monthly Income": [f"${final_monthly_income_nodrip:,.2f}", f"${drip_final_monthly_income:,.2f}"], "Income Change": [ f"${final_monthly_income_nodrip - initial_monthly_income:,.2f} ({(final_monthly_income_nodrip/initial_monthly_income - 1)*100:.2f}%)", f"${drip_final_monthly_income - initial_monthly_income:,.2f} ({(drip_final_monthly_income/initial_monthly_income - 1)*100:.2f}%)" ], } # Add chart to visualize the No-DRIP income stream if erosion_type != "None": # Calculate the effect of erosion on value pure_nav_effect = sum( initial_shares[ticker] * (final_prices[ticker] - ticker_data_dict[ticker]["price"]) for ticker in tickers ) # Explain the NAV erosion impact on no-DRIP strategy and highlight the benefit of income st.info(f""" **Economic Comparison:** - No-DRIP: While portfolio value decreased by ${abs(pure_nav_effect):,.2f} ({pure_nav_effect/initial_value*100:.2f}%), you received ${withdrawn_income:,.2f} in income ({withdrawn_income/initial_value*100:.2f}% of initial investment) - DRIP: No income taken, but final portfolio value is ${drip_final_value:,.2f} ({(drip_final_value/initial_value - 1)*100:.2f}% vs. initial investment) - **Total Economic Result** combines final portfolio value with total withdrawn income for the complete financial picture """) # Show a chart of monthly income for the No-DRIP scenario monthly_income_df = pd.DataFrame({ "Month": list(range(1, 13)), "Monthly Income": monthly_dividends }) fig = px.line( monthly_income_df, x="Month", y="Monthly Income", title="Monthly Income Withdrawn (No-DRIP Strategy)", markers=True, template="plotly_dark" ) fig.update_traces(line=dict(color="#ff7f0e", width=3)) st.plotly_chart(fig, use_container_width=True) comparison_df = pd.DataFrame(comparison_data) st.dataframe(comparison_df, use_container_width=True, hide_index=True) # Show time to recover initial capital - CORRECTED CALCULATION # For DRIP: Calculate what remains to recover (initial value - current value) drip_remaining_to_recover = max(0, initial_value - drip_final_value) # Time to recover the remaining amount at the final income rate years_to_recover_with_drip = drip_remaining_to_recover / drip_annual_income_end if drip_annual_income_end > 0 else 0 # For No-DRIP: Calculate what remains to recover (initial value - [final value + withdrawn income]) nodrip_economic_result = nodrip_final_value + withdrawn_income nodrip_remaining_to_recover = max(0, initial_value - nodrip_economic_result) # Time to recover the remaining amount at the final income rate years_to_recover_no_drip = nodrip_remaining_to_recover / nodrip_final_annual_income if nodrip_final_annual_income > 0 else 0 # Convert to months and format display def format_recovery_time(years): if years <= 0: return "0 months" months = int(years * 12) if months < 12: return f"{months} months" else: years_part = months // 12 months_part = months % 12 if months_part == 0: return f"{years_part} years" else: return f"{years_part} years, {months_part} months" col1, col2 = st.columns(2) with col1: st.metric( "Remaining Time to Recover Capital (No DRIP)", format_recovery_time(years_to_recover_no_drip) ) with col2: st.metric( "Remaining Time to Recover Capital (DRIP)", format_recovery_time(years_to_recover_with_drip), f"Difference: {format_recovery_time(abs(years_to_recover_no_drip - years_to_recover_with_drip))}" ) st.write(""" **Note:** This shows the *remaining* time needed to fully recover your initial investment, taking into account both current portfolio value and income already withdrawn. For No-DRIP: Initial Value - (Current Value + Withdrawn Income) = Amount left to recover For DRIP: Initial Value - Current Value = Amount left to recover """) with tab3: st.subheader("📉 AI Erosion Risk Assessment") # Add explanatory text st.write(""" This analysis uses historical ETF data to estimate reasonable erosion settings based on past performance, volatility, and dividend history. """) # Run the analysis in a spinner with st.spinner("Analyzing historical ETF data..."): risk_df = analyze_etf_erosion_risk(final_alloc["Ticker"].tolist(), debug_mode) if not risk_df.empty: # Create a summary table with key insights display_risk_df = risk_df.copy() # Format columns for display if "ETF Age (Years)" in display_risk_df.columns: display_risk_df["ETF Age (Years)"] = display_risk_df["ETF Age (Years)"].apply( lambda x: f"{x:.1f} years" if pd.notna(x) else "Unknown" ) if "Volatility (Annual)" in display_risk_df.columns: display_risk_df["Volatility (Annual)"] = display_risk_df["Volatility (Annual)"].apply( lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" ) if "Max Drawdown (1Y)" in display_risk_df.columns: display_risk_df["Max Drawdown (1Y)"] = display_risk_df["Max Drawdown (1Y)"].apply( lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" ) if "Dividend Trend" in display_risk_df.columns: display_risk_df["Dividend Trend"] = display_risk_df["Dividend Trend"].apply( lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" ) # Display main assessment table st.subheader("Recommended Erosion Settings") main_columns = [ "Ticker", "NAV Erosion Risk (0-9)", "Yield Erosion Risk (0-9)", "Estimated Annual NAV Erosion", "Estimated Annual Yield Erosion", "NAV Risk Explanation", "Yield Risk Explanation" ] st.dataframe( display_risk_df[main_columns], use_container_width=True, hide_index=True ) # Allow applying these settings to the simulation if st.button("Apply Recommended Erosion Settings", type="primary"): # Initialize or update per-ticker erosion settings if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): st.session_state.per_ticker_erosion = {} # Update the session state with recommended settings for _, row in risk_df.iterrows(): ticker = row["Ticker"] st.session_state.per_ticker_erosion[ticker] = { "nav": int(row["NAV Erosion Risk (0-9)"]), "yield": int(row["Yield Erosion Risk (0-9)"]) } # Enable erosion and per-ticker settings st.session_state.erosion_type = "NAV & Yield Erosion" st.session_state.use_per_ticker_erosion = True # Update the erosion_level variable to match the new settings erosion_level = { "global": { "nav": 5, # Default medium level for global fallback "yield": 5 }, "per_ticker": st.session_state.per_ticker_erosion, "use_per_ticker": True } # Update session state erosion level for DRIP forecast st.session_state.erosion_level = erosion_level st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.") st.info("Go to the DRIP Forecast tab to see the impact of these settings.") # Display additional risk metrics in an expander with st.expander("View Detailed Risk Metrics"): detail_columns = [ "Ticker", "ETF Age (Years)", "Is New ETF", "Volatility (Annual)", "Max Drawdown (1Y)", "Dividend Trend" ] st.dataframe( display_risk_df[detail_columns], use_container_width=True, hide_index=True ) st.write(""" **Understanding the Metrics:** - **ETF Age**: Newer ETFs have less historical data and may be assigned higher risk - **Volatility**: Higher volatility suggests higher NAV erosion risk - **Max Drawdown**: Maximum peak-to-trough decline, indicating worst historical NAV erosion - **Dividend Trend**: Positive values indicate growing dividends, negative values indicate declining dividends """) else: st.warning("Unable to perform risk assessment. Check ticker data or try again.") with tab4: st.subheader("🤖 AI Portfolio Suggestions") # Update AI suggestion to match simulation mode if st.session_state.simulation_mode == "income_target": suggestion_df = ai_suggestion(df, ANNUAL_TARGET, st.session_state.etf_allocations) else: # For capital mode, we need to modify the AI suggestion logic # First generate optimized allocations ai_allocations = ai_suggestion(df, 1000, st.session_state.etf_allocations) # Use dummy target if not ai_allocations.empty: # Then use those allocations with the actual capital ai_allocs_list = [{"ticker": row["Ticker"], "allocation": row["Allocation (%)"]} for _, row in ai_allocations.iterrows()] suggestion_df = allocate_for_capital(df, initial_capital, ai_allocs_list) else: suggestion_df = pd.DataFrame() if not suggestion_df.empty: mode_message = "The AI has optimized the portfolio to minimize capital while mitigating risk" if st.session_state.simulation_mode == "income_target" else "The AI has optimized the income from your investment while mitigating risk" st.write(f"{mode_message}, using validated 2024 yield data and ETF longevity.") portfolio_summary(suggestion_df) # Format currencies for better readability ai_display_df = suggestion_df.copy() ai_display_df["Capital Allocated ($)"] = ai_display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}") ai_display_df["Income Contributed ($)"] = ai_display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}") ai_display_df["Yield (%)"] = ai_display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%") st.dataframe( ai_display_df[["Ticker", "Capital Allocated ($)", "Income Contributed ($)", "Allocation (%)", "Yield (%)", "Risk Level"]], use_container_width=True, hide_index=True ) # Add button to apply AI suggestions if st.button("Apply AI Suggested Allocations", type="primary"): # Convert AI suggestions to the format needed for recalculation ai_allocations = {row["Ticker"]: row["Allocation (%)"] for _, row in suggestion_df.iterrows()} # Update sidebar ETF allocations to match AI suggestions new_etf_allocations = [] for ticker, allocation in ai_allocations.items(): new_etf_allocations.append({ "ticker": ticker, "allocation": allocation }) # Update session state with new allocations st.session_state.etf_allocations = new_etf_allocations # Recalculate portfolio with new allocations final_alloc = recalculate_portfolio(ai_allocations) st.session_state.final_alloc = final_alloc st.success("Applied AI suggested allocations to your portfolio!") st.rerun() else: st.error("AI Suggestion failed to generate. Check ETF data.") with tab5: st.subheader("📊 ETF Details") # ... existing code ... # End of file