From 2687b63d3f6f36cd1958edc9a3a8cdae9efe2876 Mon Sep 17 00:00:00 2001 From: Pascal Date: Sat, 24 May 2025 23:24:40 +0000 Subject: [PATCH] Initial commit for fixing broken code: - Fixed ETF_Analyzer page config issue - Updated ETF_Portfolio_Builder with improved error handling and data validation --- pages/ETF_Analyzer.py | 47 + pages/ETF_Portfolio_Builder.py | 3886 +++++++------------------------- 2 files changed, 812 insertions(+), 3121 deletions(-) diff --git a/pages/ETF_Analyzer.py b/pages/ETF_Analyzer.py index 5a59781..e50d523 100644 --- a/pages/ETF_Analyzer.py +++ b/pages/ETF_Analyzer.py @@ -1,3 +1,11 @@ +# Set page config first, before any other Streamlit commands +st.set_page_config( + page_title="ETF Analyzer", + page_icon="📊", + layout="wide", + initial_sidebar_state="expanded" +) + """ ETF Analyzer - Comprehensive ETF Analysis Tool @@ -21,7 +29,46 @@ import time from typing import Dict, List, Tuple, Any, Optional, Union import sys import yfinance as yf +from dotenv import load_dotenv +import logging +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# FMP API configuration +FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', '')) +FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" + +def test_fmp_connection(): + """Test the FMP API connection and display status.""" + try: + if not FMP_API_KEY: + return False, "No API key found" + + session = get_fmp_session() + test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}" + response = session.get(test_url) + + if response.status_code == 200: + data = response.json() + if data and isinstance(data, list) and len(data) > 0: + return True, "Connected" + return False, f"Error: {response.status_code}" + except Exception as e: + return False, f"Error: {str(e)}" + +# Add FMP connection status to the navigation bar +st.sidebar.markdown("---") +st.sidebar.subheader("FMP API Status") +connection_status, message = test_fmp_connection() +if connection_status: + st.sidebar.success(f"✅ FMP API: {message}") +else: + st.sidebar.error(f"❌ FMP API: {message}") # --- Constants and Settings --- CACHE_DIR = Path("cache") diff --git a/pages/ETF_Portfolio_Builder.py b/pages/ETF_Portfolio_Builder.py index f452012..52647fa 100644 --- a/pages/ETF_Portfolio_Builder.py +++ b/pages/ETF_Portfolio_Builder.py @@ -1,2527 +1,782 @@ import streamlit as st -st.set_page_config( - page_title="ETF Dividend Portfolio Builder", - page_icon="💼", - layout="wide", - initial_sidebar_state="expanded" -) - -# Add navigation in sidebar -with st.sidebar: - st.markdown("### Navigation") - if st.button("🏠 ETF Suite Launcher", key="launcher_portfolio"): - st.switch_page("ETF_Suite_Launcher.py") - if st.button("📈 ETF Analyzer", key="analyzer_portfolio"): - st.switch_page("ETF_Analyzer.py") - -# --- Imports & Settings --- import pandas as pd +import numpy as np import plotly.express as px import plotly.graph_objects as go -import yfinance as yf -import time -import re -from typing import Tuple, List, Dict, Any, Optional, TypeVar, Callable -from io import StringIO, BytesIO -from openai import OpenAI -from functools import lru_cache -from datetime import datetime, timezone, timedelta -import asyncio -import concurrent.futures -import pdfkit -import base64 -import tempfile -import os -import requests -import json -import hashlib from pathlib import Path -import numpy as np +import json +from datetime import datetime +from typing import List, Dict, Tuple, Optional, Any, Callable, T +import time +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +import yfinance as yf +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +import os +import sys +import logging +import traceback +from dotenv import load_dotenv -# --- Settings --- -REQUESTS_PER_MINUTE = 5 # yfinance rate limit -MAX_TICKERS = 10 -RETRY_ATTEMPTS = 5 -RETRY_DELAY = 2 # Seconds between retries -MAX_YIELD_THRESHOLD = 50 # Warn if yield exceeds 50% -MAX_WORKERS = 5 # Maximum number of parallel workers for API requests -DRIP_FORECAST_MONTHS = 12 # Number of months to forecast DRIP compounding -USE_FMP_API = True # Whether to use FMP API as an additional data source -CACHE_EXPIRATION_DAYS = 7 # Number of days before cache expires +# Load environment variables +load_dotenv() -# --- Cache Setup --- -def setup_cache_dir(): - """Set up cache directory if it doesn't exist""" - cache_dir = Path("cache") - cache_dir.mkdir(exist_ok=True) - return cache_dir +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) -CACHE_DIR = setup_cache_dir() +# FMP API configuration +FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', '')) +FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" -# Expected yield ranges for validation (based on 2024 data) -EXPECTED_YIELDS = { - "JEPI": (7, 9), # 7-9% - "FEPI": (10, 12), # 10-12% - "CONY": (20, 30), # 20-30% - "SMCY": (20, 30), - "ULTY": (20, 30), - "MSTY": (20, 30) -} +def get_fmp_session(): + """Create a session with retry logic for FMP API calls.""" + session = requests.Session() + retries = Retry(total=3, backoff_factor=0.5) + session.mount('https://', HTTPAdapter(max_retries=retries)) + return session -# Reference dictionary with accurate ETF yields (2024 data) -ETF_REFERENCE_DB = { - "SPY": 1.35, "VOO": 1.40, "VTI": 1.34, "QQQ": 0.70, - "SCHD": 3.50, "VYM": 2.95, "HDV": 3.80, "SPYD": 4.20, - "SPHD": 4.65, "SDIV": 8.30, "JEPI": 7.80, "DGRO": 2.15, - "VIG": 1.85, "BND": 2.80, "AGG": 2.65, "TLT": 3.10, - "GLD": 0.00, "VNQ": 3.90, "XLF": 1.75, "XLV": 1.40, "XLE": 3.20, - "PFF": 6.20, "SDY": 2.65, "DVY": 3.95, "IDV": 5.10, "NOBL": 2.30, - "DGRW": 1.90, "DIV": 6.80, "VGT": 0.70, "VDC": 2.40 -} - -# High-yield ETFs that benefit from FMP API verification (2024 data) -HIGH_YIELD_ETFS = { - "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"}, - "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"}, - "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"}, - "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"}, - "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"}, - "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"}, - "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"}, - "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, - "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, - "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"} -} - -# --- Helper Functions --- -T = TypeVar('T') - -def generate_cache_key(source: str, ticker: str, endpoint: str = None) -> str: - """Generate a unique cache key for a data request. +def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]: + """ + Fetch ETF data from Financial Modeling Prep API. Args: - source: Data source (e.g., 'yfinance', 'fmp') - ticker: Ticker symbol - endpoint: API endpoint or data type + ticker: ETF ticker symbol Returns: - A string hash key + Dictionary with ETF data or None if failed """ - components = [source, ticker.upper()] - if endpoint: - components.append(endpoint) - key_string = "_".join(components) - return hashlib.md5(key_string.encode()).hexdigest() - -def get_cache_path(cache_key: str) -> Path: - """Get the file path for a cache key.""" - return CACHE_DIR / f"{cache_key}.json" - -def save_to_cache(cache_key: str, data: Any) -> None: - """Save data to cache with timestamp.""" - cache_file = get_cache_path(cache_key) - - # Process data to ensure it's JSON serializable - processed_data = convert_to_serializable(data) - - cache_data = { - "data": processed_data, - "timestamp": datetime.now().isoformat() - } - try: - with open(cache_file, 'w') as f: - json.dump(cache_data, f) - except Exception as e: - print(f"Error saving to cache: {str(e)}") - # If caching fails, we continue without raising an exception - # This allows the app to work even if caching doesn't - -def convert_to_serializable(obj: Any) -> Any: - """Convert an object to a JSON serializable format.""" - if obj is None: - return None - - # Handle pandas Series - if isinstance(obj, pd.Series): - try: - # Handle special case of pandas Series with non-serializable index - return { - "__pandas_series__": True, - "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index), - "values": obj.values.tolist() if hasattr(obj.values, 'tolist') else list(obj.values), - "name": obj.name - } - except Exception as e: - # If all else fails, convert to list - return list(obj) - - # Handle pandas DataFrame - elif isinstance(obj, pd.DataFrame): - try: - return { - "__pandas_dataframe__": True, - "columns": obj.columns.tolist() if hasattr(obj.columns, 'tolist') else list(obj.columns), - "data": obj.values.tolist() if hasattr(obj.values, 'tolist') else obj.values.tolist(), - "index": obj.index.tolist() if hasattr(obj.index, 'tolist') else list(obj.index) - } - except Exception as e: - # Fall back to records format - return obj.to_dict(orient='records') - - # Handle numpy arrays - elif isinstance(obj, np.ndarray): - try: - return obj.tolist() - except Exception as e: - return list(obj) - - # Handle dictionaries with non-serializable values - elif isinstance(obj, dict): - return {str(k): convert_to_serializable(v) for k, v in obj.items()} - - # Handle lists with non-serializable items - elif isinstance(obj, (list, tuple)): - return [convert_to_serializable(item) for item in obj] - - # Handle datetime objects - elif isinstance(obj, datetime): - return obj.isoformat() - - # Handle other objects by converting to string if needed - try: - json.dumps(obj) - return obj - except (TypeError, OverflowError): - return str(obj) - -def load_from_cache(cache_key: str) -> Tuple[Any, bool]: - """Load data from cache if it exists and is not expired. - - Returns: - Tuple of (data, is_valid) - """ - cache_file = get_cache_path(cache_key) - if not cache_file.exists(): - return None, False - - try: - with open(cache_file, 'r') as f: - cache_data = json.load(f) + if not FMP_API_KEY: + logger.warning("FMP API key not configured, skipping FMP data fetch") + return None - # Check if cache is expired - timestamp = datetime.fromisoformat(cache_data["timestamp"]) - if datetime.now() - timestamp > timedelta(days=CACHE_EXPIRATION_DAYS): - return cache_data["data"], False # Expired but usable as fallback + session = get_fmp_session() - # Restore any special data structures - data = restore_from_serializable(cache_data["data"]) - return data, True # Valid cache - except Exception as e: - print(f"Error loading from cache: {str(e)}") - return None, False - -def restore_from_serializable(obj): - """Restore special data structures from serialized format.""" - if obj is None: - return None + # Get profile data for current price + profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}" + logger.info(f"Fetching FMP profile data for {ticker}") + profile_response = session.get(profile_url) - # Restore pandas Series - if isinstance(obj, dict) and obj.get("__pandas_series__"): - return pd.Series( - data=obj["values"], - index=obj["index"], - name=obj["name"] - ) - - # Restore pandas DataFrame - elif isinstance(obj, dict) and obj.get("__pandas_dataframe__"): - return pd.DataFrame( - data=obj["data"], - columns=obj["columns"], - index=obj["index"] - ) - - # Restore nested dictionaries - elif isinstance(obj, dict): - return {k: restore_from_serializable(v) for k, v in obj.items()} - - # Restore nested lists - elif isinstance(obj, list): - return [restore_from_serializable(item) for item in obj] - - # Return original object - return obj - -def get_cache_stats() -> Dict: - """Get cache statistics.""" - cache_files = list(CACHE_DIR.glob("*.json")) - stats = { - "file_count": len(cache_files), - "total_size_kb": sum(f.stat().st_size for f in cache_files) / 1024, - "sources": {}, - "tickers": set() - } - - for file in cache_files: - # Extract info from filename - name = file.stem - if "_" in name: - parts = name.split("_") - if len(parts) >= 2: - source = parts[0] - ticker = parts[1] - - if source not in stats["sources"]: - stats["sources"][source] = 0 - stats["sources"][source] += 1 - stats["tickers"].add(ticker) - - stats["tickers"] = list(stats["tickers"]) - stats["ticker_count"] = len(stats["tickers"]) - - return stats - -def clear_cache(ticker: str = None) -> None: - """Clear cache files. - - Args: - ticker: If provided, only clear cache for this ticker - """ - if ticker: - # Clear only files for this ticker - for file in CACHE_DIR.glob(f"*_{ticker.upper()}_*.json"): - file.unlink() - else: - # Clear all cache files - for file in CACHE_DIR.glob("*.json"): - file.unlink() - -def fetch_with_retry(fetch_func: Callable[[], T], attempts: int = RETRY_ATTEMPTS, delay: int = RETRY_DELAY) -> Tuple[Optional[T], str]: - """Generic retry function for API calls that might fail temporarily. - - Args: - fetch_func: Function to execute - attempts: Number of retry attempts - delay: Delay between retries in seconds - - Returns: - Tuple of (result, debug_info) - """ - debug_info = "" - result = None - - for attempt in range(attempts): - try: - result = fetch_func() - return result, debug_info - except Exception as e: - debug_info += f"Attempt {attempt+1} failed: {str(e)}\n" - if attempt < attempts - 1: - time.sleep(delay) - - return None, debug_info - -def fetch_fmp_data(ticker: str) -> Tuple[Dict, str]: - """Fetch ETF data from FMP API with caching. - - Args: - ticker: The ETF ticker symbol - - Returns: - Tuple of (data_dict, debug_info) - """ - debug_info = "" - result = { - "profile": None, - "quote": None, - "dividend_history": None - } - - # Get API key - API_KEY = os.environ.get("FMP_API_KEY") - if not API_KEY: - API_KEY = st.session_state.get("fmp_api_key") - if not API_KEY: - return result, "FMP API key not found" - - # Check if we should use force refresh - force_refresh = st.session_state.get("force_refresh_data", False) - - try: - # Fetch profile data with cache - profile_cache_key = generate_cache_key("fmp", ticker, "profile") - profile_data, is_valid = load_from_cache(profile_cache_key) if not force_refresh else (None, False) - - if not is_valid: - # Need to fetch from API - profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}" - profile_response = requests.get(profile_url) - if profile_response.status_code == 200: - profile_data = profile_response.json() - save_to_cache(profile_cache_key, profile_data) - debug_info += f"Profile data fetched from API and cached\n" - # Track API call - if "api_calls" in st.session_state: - st.session_state.api_calls += 1 - else: - debug_info += f"Profile request failed with status {profile_response.status_code}\n" - profile_data = None - else: - debug_info += f"Profile data loaded from cache\n" + if profile_response.status_code != 200: + logger.error(f"FMP API error for {ticker}: {profile_response.status_code}") + return None - result["profile"] = profile_data + profile_data = profile_response.json() + + if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0: + logger.warning(f"No profile data found for {ticker} in FMP") + return None - # Fetch quote data with cache - quote_cache_key = generate_cache_key("fmp", ticker, "quote") - quote_data, is_valid = load_from_cache(quote_cache_key) if not force_refresh else (None, False) - - if not is_valid: - # Need to fetch from API - quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}" - quote_response = requests.get(quote_url) - if quote_response.status_code == 200: - quote_data = quote_response.json() - save_to_cache(quote_cache_key, quote_data) - debug_info += f"Quote data fetched from API and cached\n" - # Track API call - if "api_calls" in st.session_state: - st.session_state.api_calls += 1 - else: - debug_info += f"Quote request failed with status {quote_response.status_code}\n" - quote_data = None - else: - debug_info += f"Quote data loaded from cache\n" + profile = profile_data[0] + current_price = float(profile.get('price', 0)) + if current_price <= 0: + logger.error(f"Invalid price for {ticker}: {current_price}") + return None - result["quote"] = quote_data + # Get dividend history + dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}" + logger.info(f"Fetching FMP dividend data for {ticker}") + dividend_response = session.get(dividend_url) + + if dividend_response.status_code != 200: + logger.error(f"FMP API error for dividend data: {dividend_response.status_code}") + return None - # Fetch dividend history with cache - dividend_cache_key = generate_cache_key("fmp", ticker, "dividend_history") - dividend_data, is_valid = load_from_cache(dividend_cache_key) if not force_refresh else (None, False) + dividend_data = dividend_response.json() - if not is_valid: - # Need to fetch from API - dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}" - dividend_response = requests.get(dividend_url) - if dividend_response.status_code == 200: - dividend_data = dividend_response.json() - save_to_cache(dividend_cache_key, dividend_data) - debug_info += f"Dividend history fetched from API and cached\n" - # Track API call - if "api_calls" in st.session_state: - st.session_state.api_calls += 1 - else: - debug_info += f"Dividend history request failed with status {dividend_response.status_code}\n" - dividend_data = None - else: - debug_info += f"Dividend history loaded from cache\n" + if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]: + logger.warning(f"No dividend history found for {ticker}") + return None - result["dividend_history"] = dividend_data + # Calculate TTM dividend + dividends = pd.DataFrame(dividend_data["historical"]) + dividends["date"] = pd.to_datetime(dividends["date"]) + dividends = dividends.sort_values("date") - return result, debug_info - except Exception as e: - debug_info += f"FMP API request failed: {str(e)}\n" - return result, debug_info - -def yf_fetch_with_cache(ticker: str, data_type: str, fetch_func: Callable) -> Tuple[Any, str]: - """Fetch data from yfinance with caching. - - Args: - ticker: Ticker symbol - data_type: Type of data (info, dividends, history) - fetch_func: Function to execute for fetching data + # Get dividends in the last 12 months + one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365) + recent_dividends = dividends[dividends["date"] >= one_year_ago] - Returns: - Tuple of (data, debug_info) - """ - debug_info = "" - - # Generate cache key for this request - cache_key = generate_cache_key("yf", ticker, data_type) - - # Check if we should force refresh - force_refresh = st.session_state.get("force_refresh_data", False) - - # Try to get data from cache first - data, is_valid = load_from_cache(cache_key) if not force_refresh else (None, False) - - if is_valid: - # We have valid cached data - debug_info = f"Data for {ticker} ({data_type}) loaded from cache" - return data, debug_info - - # Need to fetch data from API - data, api_debug = fetch_with_retry(fetch_func) - debug_info += api_debug - - # If successful, cache the data - if data is not None: - save_to_cache(cache_key, data) - debug_info += f"\nData for {ticker} ({data_type}) saved to cache" - - return data, debug_info - -def process_ticker_data(ticker: str, debug: bool = False) -> Tuple[Optional[Dict], Tuple[str, str, str], List[str]]: - """Process a single ticker to get all relevant data. - - Args: - ticker: The ticker symbol - debug: Whether to include debug information - - Returns: - Tuple of (ticker_data, error_info, warnings) where: - - ticker_data is the processed data or None - - error_info is (ticker, reason, debug_info) or None - - warnings is a list of warning messages - """ - debug_info = "" - warnings = [] - - # Check if this is a high-yield ETF that would benefit from FMP API verification - is_high_yield = ticker in HIGH_YIELD_ETFS - - try: - # First try yfinance as primary data source - yf_ticker = yf.Ticker(ticker) - - # Fetch price and inception data with caching - info, price_debug = yf_fetch_with_cache( - ticker, - "info", - lambda: yf_ticker.info - ) - debug_info += price_debug - - if not info or not info.get("previousClose"): - if not USE_FMP_API or not is_high_yield: - return None, (ticker, "No price data available", debug_info), [] + if recent_dividends.empty: + logger.warning(f"No recent dividends found for {ticker}") + return None - # Default values from yfinance - price = info.get("previousClose", 0) if info else 0 - inception_date = info.get("fundInceptionDate") if info else None + # Calculate TTM dividend + ttm_dividend = recent_dividends["dividend"].sum() - # Get NAV data - nav = info.get("navPrice", None) if info else None - if nav is None and info: - # For some ETFs, this might be stored under a different key - nav = info.get("regularMarketPrice", price) + # Calculate yield + yield_pct = (ttm_dividend / current_price) * 100 - # Calculate premium/discount to NAV - nav_premium = 0 - if nav and nav > 0: - nav_premium = ((price / nav) - 1) * 100 # as percentage + logger.info(f"Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})") - if debug: - debug_info += f"\nYFinance - Price: {price}\nInception Date: {inception_date}\nNAV: {nav}\nPremium/Discount: {nav_premium:.2f}%\n" - - # If this is a high-yield ETF and FMP API is enabled, get additional data from FMP API - fmp_yield_calculated = None - fmp_price = None - fmp_dist_period = None - - if USE_FMP_API and is_high_yield: - fmp_data, fmp_debug = fetch_fmp_data(ticker) - debug_info += f"\nFMP API Debug: {fmp_debug}\n" - - # Extract data from FMP response - if fmp_data["profile"] and len(fmp_data["profile"]) > 0: - profile = fmp_data["profile"][0] - fmp_price = profile.get("price", price) # Default to yfinance price if not available - last_div = profile.get("lastDiv", 0) - - if fmp_price > 0 and last_div > 0: - # Calculate yield from profile data - fmp_profile_yield = (last_div / fmp_price) * 100 - debug_info += f"FMP Profile Yield: {fmp_profile_yield:.2f}%\n" - - # Extract yield from quote data - if fmp_data["quote"] and len(fmp_data["quote"]) > 0: - quote = fmp_data["quote"][0] - if "dividendYield" in quote: - div_yield = quote["dividendYield"] - fmp_quote_yield = div_yield * 100 if div_yield < 1 else div_yield - debug_info += f"FMP Quote Yield: {fmp_quote_yield:.2f}%\n" - - # Update price if available - if "price" in quote and not fmp_price: - fmp_price = quote["price"] - - # Calculate yield from dividend history - if fmp_data["dividend_history"] and "historical" in fmp_data["dividend_history"] and fmp_data["dividend_history"]["historical"]: - recent_divs = fmp_data["dividend_history"]["historical"][:3] # Get last 3 dividends - - if recent_divs and "dividend" in recent_divs[0] and fmp_price: - # Try to figure out payment frequency - if len(recent_divs) >= 2: - try: - date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d") - date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d") - days_between = abs((date1 - date2).days) - - if days_between < 45: # Monthly - frequency = 12 - fmp_dist_period = "Monthly" - elif days_between < 100: # Quarterly - frequency = 4 - fmp_dist_period = "Quarterly" - elif days_between < 200: # Semi-annual - frequency = 2 - fmp_dist_period = "Semi-Annually" - else: # Annual - frequency = 1 - fmp_dist_period = "Annually" - - annual_div = recent_divs[0]["dividend"] * frequency - fmp_yield_calculated = (annual_div / fmp_price) * 100 - - debug_info += f"FMP Calculated Yield: {fmp_yield_calculated:.2f}% (Distribution: {fmp_dist_period})\n" - except Exception as e: - debug_info += f"Error calculating FMP yield: {str(e)}\n" - - # Decide which data source to use - for high-yield ETFs, prefer FMP if available - use_fmp = USE_FMP_API and is_high_yield and fmp_yield_calculated is not None - - # If we're using FMP data for high-yield ETFs - if use_fmp: - if debug: - debug_info += f"Using FMP data for {ticker} (high-yield ETF)\n" - - # For high-yield ETFs, FMP data is typically more accurate - yield_pct = fmp_yield_calculated - final_price = fmp_price if fmp_price else price - dist_period = fmp_dist_period if fmp_dist_period else HIGH_YIELD_ETFS[ticker]["frequency"] - income_per_1k = (1000 / final_price) * (yield_pct * final_price) / 100 - - # Add a note that we're using validated data - debug_info += f"Using validated FMP yield data: {yield_pct:.2f}%\n" - - else: - # For normal ETFs, proceed with yfinance data - # Fetch dividend data from yfinance with caching - dividends, div_debug = yf_fetch_with_cache( - ticker, - "dividends", - lambda: yf_ticker.dividends - ) - debug_info += div_debug - - if dividends is None or dividends.empty: - return None, (ticker, "No dividend data available", debug_info), [] - - dividends = dividends.reset_index() - dividends.columns = ["date", "amount"] - dividends["date"] = pd.to_datetime(dividends["date"]) - last_year = dividends[dividends["date"] >= pd.Timestamp.now(tz='America/New_York') - pd.Timedelta(days=365)] - ttm_dividend = last_year["amount"].sum() - - if not ttm_dividend and not last_year.empty: - ttm_dividend = dividends["amount"].mean() * 12 - debug_info += f"\nFallback: Estimated TTM dividend = {ttm_dividend:.2f}" - - if not ttm_dividend or not price: - return None, (ticker, f"Missing data: Price={price}, Dividend={ttm_dividend}", debug_info), [] - - yield_pct = (ttm_dividend / price) * 100 - income_per_1k = (1000 / price) * ttm_dividend - - # Check for unrealistic yields - if yield_pct > MAX_YIELD_THRESHOLD and ticker not in HIGH_YIELD_ETFS: - warnings.append(f"Unrealistic yield for {ticker}: {yield_pct:.2f}%. Verify data accuracy.") - debug_info += f"Warning: Yield {yield_pct:.2f}% exceeds {MAX_YIELD_THRESHOLD}% threshold\n" - - # Use reference yield if available - if ticker in ETF_REFERENCE_DB: - yield_pct = ETF_REFERENCE_DB[ticker] - debug_info += f"Corrected to reference yield: {yield_pct:.2f}%\n" - income_per_1k = (1000 / price) * (yield_pct * price) / 100 - - # Calculate distribution period - if len(last_year) >= 2: - intervals = (last_year["date"].diff().dt.days).dropna() - avg_interval = intervals.mean() - if avg_interval <= 45: - dist_period = "Monthly" - elif avg_interval <= 100: - dist_period = "Quarterly" - elif avg_interval <= 200: - dist_period = "Semi-Annually" - else: - dist_period = "Annually" - else: - dist_period = "Unknown" - - # Convert inception date to datetime - inception_date_str = None - if inception_date: - try: - # Ensure timestamp is valid and convert to UTC - inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) - inception_date_str = inception_date_dt.strftime("%Y-%m-%d") - except Exception as e: - debug_info += f"Invalid inception date format: {inception_date}, error: {str(e)}\n" - inception_date_str = None - - # Final data with validated yield info - final_price = fmp_price if use_fmp and fmp_price else price - - return { + etf_data = { "Ticker": ticker, - "Price": round(final_price, 2), - "NAV": round(nav, 2) if nav else None, - "Premium/Discount (%)": round(nav_premium, 2) if nav else None, - "Dividend Rate": round((yield_pct * final_price) / 100, 2), - "Yield (%)": round(yield_pct, 2), - "Income per $1K": round(income_per_1k, 2), - "Distribution Period": dist_period, - "Inception Date": inception_date_str, - "Data Source": "FMP API" if use_fmp else "YFinance" - }, None, warnings - - except Exception as e: - return None, (ticker, f"Error processing ticker: {str(e)}", debug_info), [] - -# --- Validate ETF Input --- -def validate_etf_input(etf_allocations: List[Dict]) -> List[str]: - """Validate ETF tickers from session state.""" - if not etf_allocations: - st.error("Please add at least one ETF.") - return [] - tickers = [etf["ticker"] for etf in etf_allocations] - valid_tickers = [] - for t in tickers: - if re.match(r'^[A-Z]{1,7}$', t): - try: - yf_ticker = yf.Ticker(t) - info, _ = fetch_with_retry(lambda: yf_ticker.info) - if info and info.get("previousClose"): - valid_tickers.append(t) - else: - st.warning(f"Skipping {t}: No price data available.") - except Exception as e: - st.warning(f"Skipping {t}: Failed to fetch data ({str(e)}).") - else: - st.warning(f"Invalid ticker: {t}. Must be 1-7 uppercase letters.") - if not valid_tickers: - st.error("No valid tickers found. Please check tickers and try again.") - return valid_tickers - -# --- Fetch ETF Data --- -@st.cache_data(show_spinner=False) -def fetch_etfs(tickers: str, debug: bool, use_parallel: bool = True) -> Tuple[pd.DataFrame, List[Tuple[str, str, str]]]: - """Fetch ETF data from yfinance and FMP API with retries.""" - tickers_list = tickers.split(",") - valid, skipped = [], [] - all_warnings = [] - progress = st.progress(0) - status = st.empty() - - # Initialize API call counter if not in session state - if "api_calls" not in st.session_state: - st.session_state.api_calls = 0 - - # Check if we need FMP API key - if USE_FMP_API and any(t in HIGH_YIELD_ETFS for t in tickers_list): - # Get API key - either from environment or input - API_KEY = os.environ.get("FMP_API_KEY") - if not API_KEY and "fmp_api_key" not in st.session_state: - API_KEY = st.text_input("Enter FMP API Key for more accurate yield data:", type="password") - st.session_state.fmp_api_key = API_KEY - if not API_KEY: - st.warning("Without FMP API key, high-yield ETF data may be less accurate.") - - # Define sequential processing function - def process_sequentially(): - for idx, ticker in enumerate(tickers_list): - status.text(f"Fetching {ticker} ({idx+1}/{len(tickers_list)})...") - progress.progress((idx + 1) / len(tickers_list)) - - ticker_data, error, warnings = process_ticker_data(ticker, debug) - - if ticker_data: - valid.append(ticker_data) - all_warnings.extend(warnings) - elif error: - skipped.append(error) - - # Rate limit - if idx < len(tickers_list) - 1: - time.sleep(60 / REQUESTS_PER_MINUTE) - - # Define parallel processing function - def process_parallel(): - def process_with_status(ticker): - # No Streamlit operations in this thread - return process_ticker_data(ticker, debug) - - # Create a list to collect results - results = [] - - # Show processing status - status.text(f"Fetching {len(tickers_list)} ETFs in parallel...") - - # Use ThreadPoolExecutor for parallel processing - with concurrent.futures.ThreadPoolExecutor(max_workers=min(MAX_WORKERS, len(tickers_list))) as executor: - # Submit all tasks - future_to_ticker = {executor.submit(process_with_status, ticker): ticker for ticker in tickers_list} - - # Process results as they complete - for i, future in enumerate(concurrent.futures.as_completed(future_to_ticker)): - progress.progress((i + 1) / len(tickers_list)) - ticker = future_to_ticker[future] - try: - ticker_data, error, warnings = future.result() - if ticker_data: - valid.append(ticker_data) - all_warnings.extend(warnings) - elif error: - skipped.append(error) - except Exception as e: - skipped.append((ticker, f"Thread error: {str(e)}", "")) - - # Choose processing method based on setting - if use_parallel and len(tickers_list) > 1: - try: - process_parallel() - except Exception as e: - st.error(f"Error in parallel processing: {str(e)}. Falling back to sequential processing.") - valid, skipped = [], [] - process_sequentially() - else: - process_sequentially() - - # Display warnings collected from all threads - for warning in all_warnings: - st.warning(warning) - - if debug and skipped: - st.subheader("🛑 Skipped Tickers (Debug)") - st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]), use_container_width=True) - - progress.empty() - status.empty() - - # Check if we have data source information and add it to the display - df = pd.DataFrame(valid) - - # Debug info about data sources - if debug and not df.empty and "Data Source" in df.columns: - source_counts = df["Data Source"].value_counts() - st.info(f"Data sources used: {dict(source_counts)}") - - return df, skipped - -# --- Test FMP API Function (for debugging) --- -def test_fmp_api(): - """Test function to verify FMP API responses for ETF yield data.""" - st.subheader("FMP API Test Results") - - # Get API key from environment or input - API_KEY = os.environ.get("FMP_API_KEY") - if not API_KEY and "fmp_api_key" not in st.session_state: - API_KEY = st.text_input("Enter your FMP API Key:", type="password") - st.session_state.fmp_api_key = API_KEY - else: - API_KEY = st.session_state.get("fmp_api_key", API_KEY) - - if not API_KEY: - st.warning("Please enter your FMP API key to continue") - return - - # List of ETFs to test (including high-yield ETFs) - test_tickers_default = "MSTY,SCHD,JEPI,SMCY,SPY" - test_tickers_input = st.text_input("Enter ETF tickers to test (comma separated):", test_tickers_default) - test_tickers = [ticker.strip() for ticker in test_tickers_input.split(",") if ticker.strip()] - - if st.button("Run FMP API Test"): - results = [] - - for ticker in test_tickers: - st.write(f"### Testing {ticker}") - - # Try profile endpoint - profile_url = f"https://financialmodelingprep.com/api/v3/profile/{ticker}?apikey={API_KEY}" - response = requests.get(profile_url) - - with st.expander(f"{ticker} Profile Response (Status: {response.status_code})"): - if response.status_code == 200: - data = response.json() - if data and len(data) > 0: - # Check if there's yield info - if "lastDiv" in data[0]: - price = data[0].get("price", 0) - last_div = data[0].get("lastDiv", 0) - if price > 0 and last_div > 0: - div_yield = (last_div / price) * 100 - st.write(f"- lastDiv: {last_div}") - st.write(f"- price: {price}") - st.write(f"- calculated yield: {div_yield:.2f}%") - else: - st.write(f"- lastDiv: {last_div}") - st.write(f"- price: {price}") - st.write("- Cannot calculate yield (price or lastDiv is zero)") - else: - st.write("- No 'lastDiv' found in response") - - # Save other useful fields - for field in ["companyName", "symbol", "industry", "sector"]: - if field in data[0]: - st.write(f"- {field}: {data[0][field]}") - else: - st.write("- Empty response data") - else: - st.write(f"- Error response: {response.text}") - - # Also try quote endpoint - quote_url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={API_KEY}" - response = requests.get(quote_url) - - with st.expander(f"{ticker} Quote Response (Status: {response.status_code})"): - if response.status_code == 200: - data = response.json() - if data and len(data) > 0: - # Check if there's yield info - if "dividendYield" in data[0]: - div_yield = data[0]["dividendYield"] * 100 if data[0]["dividendYield"] < 1 else data[0]["dividendYield"] - st.write(f"- dividendYield: {data[0]['dividendYield']}") - st.write(f"- formatted yield: {div_yield:.2f}%") - else: - st.write("- No 'dividendYield' found in response") - - # Save other useful fields - for field in ["name", "price", "exchange", "marketCap"]: - if field in data[0]: - st.write(f"- {field}: {data[0][field]}") - else: - st.write("- Empty response data") - else: - st.write(f"- Error response: {response.text}") - - # Also try historical dividends endpoint - dividend_url = f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{ticker}?apikey={API_KEY}" - response = requests.get(dividend_url) - - with st.expander(f"{ticker} Dividend History Response (Status: {response.status_code})"): - if response.status_code == 200: - data = response.json() - if "historical" in data and data["historical"]: - recent_divs = data["historical"][:3] # Show last 3 dividends - for div in recent_divs: - st.write(f"- Date: {div.get('date')}, Dividend: {div.get('dividend')}") - - # Calculate annualized yield if possible - if recent_divs and "dividend" in recent_divs[0]: - # Try to get current price from previous quote response - current_price = None - quote_response = requests.get(quote_url) - if quote_response.status_code == 200: - quote_data = quote_response.json() - if quote_data and len(quote_data) > 0 and "price" in quote_data[0]: - current_price = quote_data[0]["price"] - - if current_price: - # Use most recent dividend payment and estimate annual yield - if len(recent_divs) >= 2: - # Try to figure out payment frequency - try: - date1 = datetime.strptime(recent_divs[0]["date"], "%Y-%m-%d") - date2 = datetime.strptime(recent_divs[1]["date"], "%Y-%m-%d") - days_between = abs((date1 - date2).days) - - if days_between < 45: # Monthly - frequency = 12 - freq_text = "monthly" - elif days_between < 100: # Quarterly - frequency = 4 - freq_text = "quarterly" - elif days_between < 200: # Semi-annual - frequency = 2 - freq_text = "semi-annual" - else: # Annual - frequency = 1 - freq_text = "annual" - - annual_div = recent_divs[0]["dividend"] * frequency - estimated_yield = (annual_div / current_price) * 100 - st.write(f"- Estimated {freq_text} yield: {estimated_yield:.2f}% (price: ${current_price})") - except Exception as e: - st.write(f"- Error calculating estimated yield: {str(e)}") - else: - st.write("- Not enough dividend history to determine frequency") - else: - st.write("- Cannot calculate yield (price not available)") - else: - st.write("- No dividend history found") - else: - st.write(f"- Error response: {response.text}") - - st.write("---") - - st.write("Note: Add the 'test_fmp_api' function to the sidebar to use it for debugging.") - -# --- PDF Export Function --- -def create_pdf_report(final_alloc, df_data, chat_summary=None): - """Generate a PDF report of the ETF portfolio. - - Args: - final_alloc: DataFrame of final allocations - df_data: DataFrame of raw ETF data - chat_summary: Optional text summary from ChatGPT - - Returns: - Base64 encoded PDF file - """ - # Calculate summary metrics - total_capital = final_alloc["Capital Allocated ($)"].sum() - total_income = final_alloc["Income Contributed ($)"].sum() - monthly_income = total_income / 12 - weighted_yield = (final_alloc["Income Contributed ($)"] * final_alloc["Yield (%)"]).sum() / total_income if total_income else 0 - - # Calculate DRIP forecast if needed - if st.session_state.drip_enabled: - drip_forecast = calculate_drip_growth(final_alloc) - - # Get key metrics for DRIP - initial_value = drip_forecast["Total Value ($)"].iloc[0] - final_value = drip_forecast["Total Value ($)"].iloc[-1] - value_growth = final_value - initial_value - value_growth_pct = (value_growth / initial_value) * 100 - - initial_income = drip_forecast["Monthly Income ($)"].iloc[0] * 12 - final_income = drip_forecast["Monthly Income ($)"].iloc[-1] * 12 - income_growth = final_income - initial_income - income_growth_pct = (income_growth / initial_income) * 100 - - total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1] - years_to_recover = 100 * initial_value / final_income # 100% recovery - - # Create HTML report - html = f""" - - - - - -
-

ETF Dividend Portfolio Report

-

Generated on {datetime.now().strftime('%Y-%m-%d %H:%M')}

-
- -
Portfolio Summary
- -
-
-

Total Capital

-

${total_capital:,.2f}

-
-
-

Annual Income

-

${total_income:,.2f}

-
-
-

Monthly Income

-

${monthly_income:,.2f}

-
-
-

Weighted Yield

-

{weighted_yield:.2f}%

-
-
- -
ETF Allocation Details
- - - - - - - - - - - - """ - - # Add rows for each ETF - for _, row in final_alloc.iterrows(): - risk_class = "" - if "High" in str(row.get("Risk Level", "")): - risk_class = "risk-high" - elif "Medium" in str(row.get("Risk Level", "")): - risk_class = "risk-medium" - elif "Average" in str(row.get("Risk Level", "")): - risk_class = "risk-average" - - html += f""" - - - - - - - - - - """ - - html += """ -
TickerCapital ($)Income ($)Allocation (%)Yield (%)Risk LevelDistribution
{row["Ticker"]}${row["Capital Allocated ($)"]:,.2f}${row["Income Contributed ($)"]:,.2f}{row["Allocation (%)"]:.2f}%{row["Yield (%)"]:.2f}%{row.get("Risk Level", "Unknown")}{row.get("Distribution Period", "Unknown")}
- """ - - # Add DRIP forecast if enabled - if st.session_state.drip_enabled: - html += f""" -
Dividend Reinvestment (DRIP) Forecast
- -
-
-

1-Year Value Growth

-

${value_growth:,.2f} ({value_growth_pct:.2f}%)

-
-
-

1-Year Income Growth

-

${income_growth:,.2f} ({income_growth_pct:.2f}%)

-
-
-

Total Dividends Earned

-

${total_dividends:,.2f}

-
-
-

Years to Recover Capital

-

{years_to_recover:.2f}

-
-
- -

This forecast assumes ETF prices remain constant and all dividends are reinvested proportionally to original allocations.

- - - - - - - - - """ - - # Add rows for each month - for _, row in drip_forecast.iterrows(): - month = row["Month"] - value = row["Total Value ($)"] - monthly = row["Monthly Income ($)"] - cumulative = row["Cumulative Income ($)"] - - html += f""" - - - - - - - """ - - html += """ -
MonthPortfolio Value ($)Monthly Income ($)Cumulative Income ($)
{month}${value:,.2f}${monthly:,.2f}${cumulative:,.2f}
- """ - - # Add ChatGPT summary if available - if chat_summary: - html += f""" -
ETF Analysis
-

{chat_summary.replace(chr(10), '
')}

- """ - - # Add footer - html += """ - - - - """ - - # Create PDF from HTML - try: - # Create temporary file for the PDF - with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp: - pdf_path = tmp.name - - # Convert HTML to PDF - pdfkit_options = { - 'quiet': '', - 'enable-local-file-access': None, - 'page-size': 'Letter', - 'margin-top': '0.75in', - 'margin-right': '0.75in', - 'margin-bottom': '0.75in', - 'margin-left': '0.75in', + "Price": current_price, + "Yield (%)": yield_pct, + "Risk Level": "High" # Default for high-yield ETFs } + logger.info(f"FMP data for {ticker}: {etf_data}") + return etf_data - pdfkit.from_string(html, pdf_path, options=pdfkit_options) - - # Read the PDF file - with open(pdf_path, 'rb') as pdf_file: - pdf_data = pdf_file.read() - - # Delete the temporary file - os.unlink(pdf_path) - - # Encode the PDF as base64 - return base64.b64encode(pdf_data).decode() - except Exception as e: - st.error(f"Failed to create PDF: {str(e)}") - if "wkhtmltopdf" in str(e).lower(): - st.error("Please install wkhtmltopdf: https://wkhtmltopdf.org/downloads.html") + logger.error(f"Error fetching FMP data for {ticker}: {str(e)}") return None -# --- ChatGPT Summary --- -@st.cache_data(show_spinner=False) -def get_chatgpt_summary(tickers: str, api_key: str) -> str: - """Generate ETF summary using ChatGPT.""" - tickers_list = tickers.split(",") - if not api_key: - return "Please enter a valid OpenAI API key." +def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]: + """ + Fetch ETF data from yfinance as fallback. + + Args: + ticker: ETF ticker symbol + + Returns: + Dictionary with ETF data or None if failed + """ try: - client = OpenAI(api_key=api_key) - prompt = f""" - Act as a financial analyst. Provide a concise summary (150-200 words) of these ETFs: {', '.join(tickers_list)}. - Include for each: - - Key characteristics (yield, sector exposure, strategy). - - Investment suitability (risk level, investor type). - - Recent performance trends (if available). - Highlight risks and benefits. Use public data or your knowledge. - """ - response = client.chat.completions.create( - model="gpt-4o-mini", - messages=[ - {"role": "system", "content": "You are a financial analyst specializing in ETFs."}, - {"role": "user", "content": prompt} - ], - max_tokens=300, - temperature=0.7 - ) - return response.choices[0].message.content - except Exception as e: - return f"ChatGPT error: {str(e)}. Check API key or try again." - -# --- Assign Risk Level --- -def assign_risk_level(df: pd.DataFrame, allocations: List[Dict]) -> pd.DataFrame: - """Assign risk level based on yield, allocation, and ETF age.""" - df = df.copy() - df["Risk Level"] = "Unknown" - current_date = pd.Timestamp.now(tz='UTC') - - # Calculate total allocation to high-yield, new ETFs - high_risk_alloc = 0 - for alloc in allocations: - ticker = alloc["ticker"] - alloc_pct = alloc["allocation"] - row = df[df["Ticker"] == ticker] - if row.empty: - continue - yield_pct = row["Yield (%)"].iloc[0] - inception_date = row["Inception Date"].iloc[0] - if pd.isna(inception_date): - continue - inception_date = pd.to_datetime(inception_date, utc=True) - age_years = (current_date - inception_date).days / 365.25 - if yield_pct > 20 and age_years < 2: - high_risk_alloc += alloc_pct - - for idx, row in df.iterrows(): - ticker = row["Ticker"] - yield_pct = row["Yield (%)"] - inception_date = row["Inception Date"] - alloc_pct = next((a["allocation"] for a in allocations if a["ticker"] == ticker), 0) - - if pd.isna(inception_date): - df.at[idx, "Risk Level"] = "Unknown (Missing Inception)" - continue - - inception_date = pd.to_datetime(inception_date, utc=True) - age_years = (current_date - inception_date).days / 365.25 - - # Risk criteria - is_new = age_years < 2 - is_mid_age = 2 <= age_years <= 5 - is_old = age_years > 5 - is_high_yield = yield_pct > 20 - is_mid_yield = 12 <= yield_pct <= 20 - is_low_yield = yield_pct < 12 - is_high_alloc = alloc_pct > 30 - is_mid_alloc = 20 <= alloc_pct <= 30 - is_portfolio_risky = high_risk_alloc > 50 - - if (is_new or is_high_yield) and (is_high_alloc or is_portfolio_risky): - df.at[idx, "Risk Level"] = "High Risk" - elif is_mid_age or is_mid_yield or is_mid_alloc: - df.at[idx, "Risk Level"] = "Medium Risk" - elif is_old and is_low_yield and alloc_pct < 20: - df.at[idx, "Risk Level"] = "Average Risk" + logger.info(f"Fetching yfinance data for {ticker}") + etf = yf.Ticker(ticker) + info = etf.info + + # Get the most recent dividend yield + if 'dividendYield' in info and info['dividendYield'] is not None: + yield_pct = info['dividendYield'] * 100 + logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%") else: - df.at[idx, "Risk Level"] = "Medium Risk" + # Try to calculate from dividend history + hist = etf.history(period="1y") + if not hist.empty and 'Dividends' in hist.columns: + annual_dividend = hist['Dividends'].sum() + current_price = info.get('regularMarketPrice', 0) + yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0 + logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%") + else: + yield_pct = 0 + logger.warning(f"No yield data found for {ticker} in yfinance") + + # Get current price + current_price = info.get('regularMarketPrice', 0) + if current_price <= 0: + current_price = info.get('regularMarketPreviousClose', 0) + logger.warning(f"Using previous close price for {ticker}: {current_price}") + + etf_data = { + "Ticker": ticker, + "Price": current_price, + "Yield (%)": yield_pct, + "Risk Level": "High" # Default for high-yield ETFs + } + logger.info(f"yfinance data for {ticker}: {etf_data}") + return etf_data + + except Exception as e: + logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}") + return None - return df - -# --- AI Suggestion --- -def ai_suggestion(df: pd.DataFrame, target: float, user_allocations: List[Dict]) -> pd.DataFrame: - """Generate AI-suggested portfolio with risk-mitigated allocations.""" - adjusted_df = df.copy() - adjustments = [] - current_date = pd.Timestamp.now(tz='UTC') - - # Validate and adjust yields - for idx, row in adjusted_df.iterrows(): - ticker = row["Ticker"] - yield_pct = row["Yield (%)"] - if ticker in EXPECTED_YIELDS: - min_yield, max_yield = EXPECTED_YIELDS[ticker] - if yield_pct > max_yield: - adjusted_yield = max_yield - adjustments.append(f"Adjusted {ticker} yield from {yield_pct:.2f}% to {max_yield:.2f}% (max expected).") - adjusted_df.at[idx, "Yield (%)"] = adjusted_yield - adjusted_df.at[idx, "Dividend Rate"] = (adjusted_yield / 100) * row["Price"] - adjusted_df.at[idx, "Income per $1K"] = (1000 / row["Price"]) * adjusted_df.at[idx, "Dividend Rate"] - - # Optimize allocations: prioritize older, stable ETFs, limit high-yield exposure - sorted_df = adjusted_df.sort_values(by=["Inception Date", "Yield (%)"], ascending=[True, False]) - ai_allocations = [] - total_alloc = 0 - high_yield_new_alloc = 0 - - for idx, row in sorted_df.iterrows(): - ticker = row["Ticker"] - yield_pct = row["Yield (%)"] - inception_date = row["Inception Date"] - if pd.isna(inception_date): - continue - inception_date = pd.to_datetime(inception_date, utc=True) - age_years = (current_date - inception_date).days / 365.25 - - # Allocate based on age and yield - if age_years > 5 and yield_pct < 12: # Stable, older ETFs (e.g., JEPI) - alloc = 20 - elif age_years >= 2 and yield_pct <= 20: # Mid-age, moderate yield (e.g., FEPI) - alloc = 15 - else: # Newer, high-yield ETFs (e.g., CONY, MSTY) - alloc = 10 # Limit to reduce risk - if yield_pct > 20 and age_years < 2: - high_yield_new_alloc += alloc - - # Cap high-yield, new ETF allocation - if high_yield_new_alloc > 40: - alloc = 0 - - if total_alloc + alloc > 100: - alloc = 100 - total_alloc - if alloc <= 0: - continue - - ai_allocations.append({"ticker": ticker, "allocation": alloc}) - total_alloc += alloc - - # Adjust to sum to 100% - if total_alloc < 100: - remaining = 100 - total_alloc - for alloc in ai_allocations: - if adjusted_df[adjusted_df["Ticker"] == alloc["ticker"]]["Yield (%)"].iloc[0] < 12: - alloc["allocation"] += remaining - break - elif total_alloc > 100: - excess = total_alloc - 100 - ai_allocations[-1]["allocation"] -= excess - - results = [] - weighted_yield = 0 - for _, row in adjusted_df.iterrows(): - ticker = row["Ticker"] - alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0) - if alloc_pct == 0: - continue - weighted_yield += alloc_pct * (row["Yield (%)"] / 100) - - if weighted_yield <= 0: - st.error("AI Suggestion: Weighted yield is zero or negative. Check ETF data.") +def fetch_etf_data(tickers: List[str]) -> pd.DataFrame: + """ + Fetch ETF data using FMP API with yfinance fallback. + + Args: + tickers: List of ETF tickers + + Returns: + DataFrame with ETF data + """ + try: + data = {} + for ticker in tickers: + if not ticker: # Skip empty tickers + continue + + logger.info(f"Processing {ticker}") + + # Try FMP first + etf_data = fetch_etf_data_fmp(ticker) + + # If FMP fails, try yfinance + if etf_data is None: + logger.info(f"Falling back to yfinance for {ticker}") + etf_data = fetch_etf_data_yfinance(ticker) + + if etf_data is not None: + # Validate and cap yield at a reasonable maximum (e.g., 30%) + etf_data["Yield (%)"] = min(etf_data["Yield (%)"], 30.0) + data[ticker] = etf_data + logger.info(f"Final data for {ticker}: {etf_data}") + else: + logger.error(f"Failed to fetch data for {ticker} from both sources") + + if not data: + st.error("No ETF data could be fetched") + return pd.DataFrame() + + df = pd.DataFrame(data.values()) + + # Validate the data + if df.empty: + st.error("No ETF data could be fetched") + return pd.DataFrame() + + if (df["Price"] <= 0).any(): + st.error("Some ETFs have invalid prices") + return pd.DataFrame() + + if (df["Yield (%)"] <= 0).any(): + st.warning("Some ETFs have zero or negative yields") + + logger.info(f"Final DataFrame:\n{df}") + return df + + except Exception as e: + st.error(f"Error fetching ETF data: {str(e)}") + logger.error(f"Error in fetch_etf_data: {str(e)}") + logger.error(traceback.format_exc()) return pd.DataFrame() - total_capital = target / weighted_yield - - for _, row in adjusted_df.iterrows(): - ticker = row["Ticker"] - alloc_pct = next((a["allocation"] / 100 for a in ai_allocations if a["ticker"] == ticker), 0) - if alloc_pct == 0: - continue - capital = total_capital * alloc_pct - shares = capital / row["Price"] - income = shares * row["Dividend Rate"] +def run_portfolio_simulation( + mode: str, + target: float, + risk_tolerance: str, + etf_inputs: List[Dict[str, str]], + enable_drip: bool, + enable_erosion: bool +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Run the portfolio simulation. + + Args: + mode: Simulation mode ("income_target" or "capital_target") + target: Target value (monthly income or initial capital) + risk_tolerance: Risk tolerance level + etf_inputs: List of ETF inputs + enable_drip: Whether to enable dividend reinvestment + enable_erosion: Whether to enable NAV & yield erosion - # Create result dictionary with all available data - result = { - "Ticker": ticker, - "Yield (%)": row["Yield (%)"], - "Dividend Rate": row["Dividend Rate"], - "Capital Allocated ($)": round(capital, 2), - "Income Contributed ($)": round(income, 2), - "Allocation (%)": round(alloc_pct * 100, 2), - "Inception Date": row["Inception Date"], - "Distribution Period": row.get("Distribution Period", "Unknown"), - "Price": row["Price"] - } + Returns: + Tuple of (ETF data DataFrame, Final allocation DataFrame) + """ + try: + # Fetch real ETF data + tickers = [input["ticker"] for input in etf_inputs] + etf_data = fetch_etf_data(tickers) - # Add NAV data if available - if "NAV" in row and row["NAV"] is not None: - result["NAV"] = row["NAV"] - if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: - result["Premium/Discount (%)"] = row["Premium/Discount (%)"] + if etf_data is None or etf_data.empty: + st.error("Failed to fetch ETF data") + return pd.DataFrame(), pd.DataFrame() + + # Calculate allocations based on risk tolerance + if risk_tolerance == "Conservative": + # Higher allocation to lower yield ETFs + sorted_data = etf_data.sort_values("Yield (%)") + allocations = [40.0, 40.0, 20.0] # More to lower yield + elif risk_tolerance == "Moderate": + # Balanced allocation + allocations = [33.33, 33.34, 33.33] + else: # Aggressive + # Higher allocation to higher yield ETFs + sorted_data = etf_data.sort_values("Yield (%)", ascending=False) + allocations = [20.0, 30.0, 50.0] # More to higher yield + + # Create final allocation DataFrame + final_alloc = etf_data.copy() + final_alloc["Allocation (%)"] = allocations + + if mode == "income_target": + # Calculate required capital for income target + monthly_income = target + annual_income = monthly_income * 12 - results.append(result) - - suggestion_df = pd.DataFrame(results) - suggestion_df = assign_risk_level(suggestion_df, ai_allocations) - - if adjustments or ai_allocations != user_allocations: - notes = ["The AI balanced high-yield ETFs with older, stable ETFs to mitigate risk while meeting the income target. Newer ETFs with high yields are capped to reduce exposure to unsustainable distributions."] - if adjustments: - notes.extend(adjustments) - st.info("AI Suggestion Notes:\n- " + "\n- ".join(notes)) - return suggestion_df - -# --- Yield Trends --- -def yield_chart(tickers: List[str], debug: bool = False): - """Plot TTM yield trends.""" - fig = go.Figure() - for ticker in tickers: - try: - yf_ticker = yf.Ticker(ticker) + # Calculate weighted average yield + weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100 - # Get dividends with retry - dividends, debug_info = fetch_with_retry(lambda: yf_ticker.dividends) - if debug: - st.write(f"DEBUG: {ticker} dividend data: {dividends.to_dict() if dividends is not None else 'None'}") - - if dividends is None or dividends.empty: - continue - - dividends = dividends.reset_index() - dividends.columns = ["date", "amount"] - dividends["date"] = pd.to_datetime(dividends["date"]) - - monthly = dividends.set_index("date")["amount"].resample("ME").sum() - ttm_dividend = monthly.rolling(12).sum() - - # Get prices with retry - prices, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y")["Close"]) - if prices is None: - continue + # Validate weighted yield + if weighted_yield <= 0 or weighted_yield > 30: + st.error(f"Invalid weighted yield calculated: {weighted_yield:.2f}%") + return pd.DataFrame(), pd.DataFrame() - avg_price = prices.rolling(252).mean() - ttm_yield = (ttm_dividend / avg_price) * 100 - ttm_yield = ttm_yield.dropna() + # Calculate required capital based on weighted yield + required_capital = (annual_income / weighted_yield) * 100 + else: + required_capital = target + + # Calculate capital allocation and income + final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital + final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"] + final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 + + # Apply erosion if enabled + if enable_erosion: + # Apply a small erosion factor to yield and price + erosion_factor = 0.98 # 2% erosion per year + final_alloc["Yield (%)"] = final_alloc["Yield (%)"] * erosion_factor + final_alloc["Price"] = final_alloc["Price"] * erosion_factor + final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 + + # Validate final calculations + total_capital = final_alloc["Capital Allocated ($)"].sum() + total_income = final_alloc["Income Contributed ($)"].sum() + effective_yield = (total_income / total_capital) * 100 + + if effective_yield <= 0 or effective_yield > 30: + st.error(f"Invalid effective yield calculated: {effective_yield:.2f}%") + return pd.DataFrame(), pd.DataFrame() + + return etf_data, final_alloc + except Exception as e: + st.error(f"Error in portfolio simulation: {str(e)}") + logger.error(f"Error in run_portfolio_simulation: {str(e)}") + logger.error(traceback.format_exc()) + return pd.DataFrame(), pd.DataFrame() - fig.add_trace(go.Scatter( - x=ttm_yield.index, - y=ttm_yield, - name=ticker, - mode="lines", - hovertemplate="%{x|%Y-%m}: %{y:.2f}%" - )) - except Exception as e: - if debug: - st.write(f"DEBUG: Error plotting {ticker}: {str(e)}") - continue - - fig.update_layout( - title="TTM Dividend Yield Trend", - xaxis_title="Date", - yaxis_title="Yield (%)", - template="plotly_dark", - hovermode="x unified", - xaxis=dict(tickformat="%Y-%m"), - yaxis=dict(gridcolor="rgba(255,255,255,0.2)") - ) - st.plotly_chart(fig, use_container_width=True) - -# --- NAV Trends --- -def nav_chart(tickers: List[str], debug: bool = False): - """Plot NAV (Net Asset Value) trends over time for selected ETFs.""" - fig = go.Figure() +def portfolio_summary(final_alloc: pd.DataFrame) -> None: + """ + Display a summary of the portfolio allocation. - for ticker in tickers: - try: - yf_ticker = yf.Ticker(ticker) - - # Get historical price data - price_history, debug_info = fetch_with_retry(lambda: yf_ticker.history(period="2y")) - - if price_history is None or price_history.empty: - if debug: - st.write(f"DEBUG: No price history for {ticker}") - continue - - # Extract NAV data - for most ETFs, we'll use Close price as a proxy for NAV - # For some closed-end funds, there might be specific NAV history available - nav_series = price_history["Close"] - - # Plot the NAV trend - fig.add_trace(go.Scatter( - x=nav_series.index, - y=nav_series, - name=f"{ticker} NAV", - mode="lines", - hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}" - )) - - # Add price for comparison (slight transparency) - fig.add_trace(go.Scatter( - x=price_history.index, - y=price_history["Close"], - name=f"{ticker} Price", - mode="lines", - line=dict(dash="dot", width=1), - opacity=0.7, - hovertemplate="%{x|%Y-%m-%d}: $%{y:.2f}" - )) - - except Exception as e: - if debug: - st.write(f"DEBUG: Error plotting NAV for {ticker}: {str(e)}") - continue - - fig.update_layout( - title="ETF NAV and Price Trends (2-Year)", - xaxis_title="Date", - yaxis_title="Value ($)", - template="plotly_dark", - hovermode="x unified", - legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), - yaxis=dict(gridcolor="rgba(255,255,255,0.2)") - ) - st.plotly_chart(fig, use_container_width=True) - -# --- Price/NAV Premium-Discount Chart --- -def premium_discount_chart(tickers: List[str], df: pd.DataFrame, debug: bool = False): - """Plot Premium/Discount to NAV for selected ETFs.""" - # Create dataframe for the bar chart - premium_data = [] - - for ticker in tickers: - try: - # Get data from our processed dataframe - ticker_row = df[df["Ticker"] == ticker] - if ticker_row.empty: - continue - - premium = ticker_row["Premium/Discount (%)"].iloc[0] - if premium is None: - continue - - premium_data.append({ - "Ticker": ticker, - "Premium/Discount (%)": premium - }) - - except Exception as e: - if debug: - st.write(f"DEBUG: Error getting premium/discount for {ticker}: {str(e)}") - continue - - if not premium_data: - st.info("No premium/discount data available for the selected ETFs.") + Args: + final_alloc: DataFrame containing the portfolio allocation + """ + if final_alloc is None or final_alloc.empty: + st.warning("No portfolio data available.") return - premium_df = pd.DataFrame(premium_data) - - # Create the bar chart - fig = px.bar( - premium_df, - x="Ticker", - y="Premium/Discount (%)", - title="Current Premium/Discount to NAV", - template="plotly_dark", - color="Premium/Discount (%)", - color_continuous_scale=["red", "white", "green"], - range_color=[-5, 5] # Typical range for premium/discount - ) - - # Add a reference line at 0 - fig.add_hline( - y=0, - line_width=1, - line_dash="dash", - line_color="white", - annotation_text="NAV", - annotation_position="bottom right" - ) - - fig.update_layout( - yaxis=dict(zeroline=True, zerolinewidth=2, zerolinecolor="rgba(255,255,255,0.5)") - ) - - st.plotly_chart(fig, use_container_width=True) - -# --- Portfolio Summary --- -def portfolio_summary(df: pd.DataFrame): - """Display portfolio summary metrics.""" - total_capital = df["Capital Allocated ($)"].sum() - total_income = df["Income Contributed ($)"].sum() - weighted_yield = (df["Income Contributed ($)"] * df["Yield (%)"]).sum() / total_income if total_income else 0 - - col1, col2, col3 = st.columns(3) - with col1: - st.metric("Total Capital", f"${total_capital:,.2f}") - with col2: - st.metric("Annual Income", f"${total_income:,.2f}") - st.metric("Monthly Income", f"${total_income/12:,.2f}") - with col3: - st.metric("Weighted Yield", f"{weighted_yield:.2f}%") - -# --- Allocation Functions --- -def allocate_for_income(df: pd.DataFrame, target: float, allocations: List[Dict]) -> pd.DataFrame: - """Allocate capital to ETFs to meet the annual income target.""" try: - # Store initial capital in session state - st.session_state.initial_capital = target - st.session_state.mode = "Income Target" - st.session_state.target = target + # Calculate key metrics + total_capital = final_alloc["Capital Allocated ($)"].sum() + total_income = final_alloc["Income Contributed ($)"].sum() - results = [] - weighted_yield = 0 - for _, row in df.iterrows(): - ticker = row["Ticker"] - alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) - if alloc_pct == 0: - continue - weighted_yield += alloc_pct * (row["Yield (%)"] / 100) - - if weighted_yield <= 0: - st.error("Weighted yield is zero or negative. Check ETF data.") - return None - - total_capital = target / weighted_yield - - for _, row in df.iterrows(): - ticker = row["Ticker"] - alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) - if alloc_pct == 0: - continue - capital = total_capital * alloc_pct - shares = capital / row["Price"] - income = shares * row["Dividend Rate"] - - # Create result dictionary with all available data - result = { - "Ticker": ticker, - "Yield (%)": row["Yield (%)"], - "Dividend Rate": round(row["Dividend Rate"], 2), - "Capital Allocated ($)": round(capital, 2), - "Income Contributed ($)": round(income, 2), - "Allocation (%)": round(alloc_pct * 100, 2), - "Inception Date": row["Inception Date"], - "Distribution Period": row.get("Distribution Period", "Unknown"), - "Price": row["Price"] + # Calculate weighted average yield + weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100 + + # Display metrics in columns + col1, col2, col3 = st.columns(3) + + with col1: + st.metric("Total Capital", f"${total_capital:,.2f}") + + with col2: + st.metric("Annual Income", f"${total_income:,.2f}") + st.metric("Monthly Income", f"${total_income/12:,.2f}") + + with col3: + st.metric("Average Yield", f"{weighted_yield:.2f}%") + st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%") + + # Display allocation chart + fig = px.pie( + final_alloc, + values="Allocation (%)", + names="Ticker", + title="Portfolio Allocation by ETF", + hover_data={ + "Ticker": True, + "Allocation (%)": ":.2f", + "Yield (%)": ":.2f", + "Capital Allocated ($)": ":,.2f", + "Income Contributed ($)": ":,.2f" } - - # Add NAV data if available - if "NAV" in row and row["NAV"] is not None: - result["NAV"] = row["NAV"] - if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: - result["Premium/Discount (%)"] = row["Premium/Discount (%)"] - - results.append(result) - - alloc_df = pd.DataFrame(results) - alloc_df = assign_risk_level(alloc_df, allocations) - return alloc_df + ) + st.plotly_chart(fig, use_container_width=True) + # Display detailed allocation table + st.subheader("Detailed Allocation") + display_df = final_alloc.copy() + display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12 + display_df = display_df[[ + "Ticker", + "Allocation (%)", + "Yield (%)", + "Price", + "Shares", + "Capital Allocated ($)", + "Monthly Income", + "Income Contributed ($)", + "Risk Level" + ]] + + # Format the display + st.dataframe( + display_df.style.format({ + "Allocation (%)": "{:.2f}%", + "Yield (%)": "{:.2f}%", + "Price": "${:,.2f}", + "Shares": "{:,.4f}", + "Capital Allocated ($)": "${:,.2f}", + "Monthly Income": "${:,.2f}", + "Income Contributed ($)": "${:,.2f}" + }), + use_container_width=True + ) + + except Exception as e: + st.error(f"Error calculating portfolio summary: {str(e)}") + logger.error(f"Error in portfolio_summary: {str(e)}") + logger.error(traceback.format_exc()) + +def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool: + """ + Save portfolio allocation to a JSON file. + + Args: + portfolio_name: Name of the portfolio + final_alloc: DataFrame containing portfolio allocation + mode: Portfolio mode ("Income Target" or "Capital Target") + target: Target value (income or capital) + + Returns: + bool: True if save was successful, False otherwise + """ + try: + # Create portfolios directory if it doesn't exist + portfolios_dir = Path("portfolios") + portfolios_dir.mkdir(exist_ok=True) + + # Prepare portfolio data + portfolio_data = { + "name": portfolio_name, + "created_at": datetime.now().isoformat(), + "mode": mode, + "target": target, + "allocations": [] + } + + # Convert DataFrame to list of dictionaries + for _, row in final_alloc.iterrows(): + allocation = { + "ticker": row["Ticker"], + "allocation": float(row["Allocation (%)"]), + "yield": float(row["Yield (%)"]), + "price": float(row["Price"]), + "risk_level": row["Risk Level"] + } + portfolio_data["allocations"].append(allocation) + + # Save to JSON file + file_path = portfolios_dir / f"{portfolio_name}.json" + with open(file_path, 'w') as f: + json.dump(portfolio_data, f, indent=2) + + return True + + except Exception as e: + st.error(f"Error saving portfolio: {str(e)}") + return False + +def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]: + """ + Load portfolio allocation from a JSON file. + + Args: + portfolio_name: Name of the portfolio to load + + Returns: + Tuple containing: + - DataFrame with portfolio allocation + - Portfolio mode + - Target value + """ + try: + # Check if portfolio exists + file_path = Path("portfolios") / f"{portfolio_name}.json" + if not file_path.exists(): + st.error(f"Portfolio '{portfolio_name}' not found.") + return None, None, None + + # Load portfolio data + with open(file_path, 'r') as f: + portfolio_data = json.load(f) + + # Convert allocations to DataFrame + allocations = portfolio_data["allocations"] + df = pd.DataFrame(allocations) + + # Rename columns to match expected format + df = df.rename(columns={ + "allocation": "Allocation (%)", + "yield": "Yield (%)", + "price": "Price" + }) + + return df, portfolio_data["mode"], portfolio_data["target"] + + except Exception as e: + st.error(f"Error loading portfolio: {str(e)}") + return None, None, None + +def list_saved_portfolios() -> List[str]: + """ + List all saved portfolios. + + Returns: + List of portfolio names + """ + try: + portfolios_dir = Path("portfolios") + if not portfolios_dir.exists(): + return [] + + # Get all JSON files in the portfolios directory + portfolio_files = list(portfolios_dir.glob("*.json")) + + # Extract portfolio names from filenames + portfolio_names = [f.stem for f in portfolio_files] + + return sorted(portfolio_names) + + except Exception as e: + st.error(f"Error listing portfolios: {str(e)}") + return [] + +def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame: + """ + Allocate portfolio for income target. + + Args: + df: DataFrame with ETF data + target: Monthly income target + etf_allocations: List of ETF allocations + + Returns: + DataFrame with final allocation + """ + try: + # Create final allocation DataFrame + final_alloc = df.copy() + + # Set allocations + for alloc in etf_allocations: + mask = final_alloc["Ticker"] == alloc["ticker"] + final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] + + # Calculate required capital for income target + monthly_income = target + annual_income = monthly_income * 12 + avg_yield = final_alloc["Yield (%)"].mean() + required_capital = (annual_income / avg_yield) * 100 + + # Calculate capital allocation and income + final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital + final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"] + final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 + + return final_alloc except Exception as e: st.error(f"Error in income allocation: {str(e)}") return None -def allocate_for_capital(df: pd.DataFrame, capital: float, allocations: List[Dict]) -> pd.DataFrame: - """Allocate a fixed amount of capital across ETFs and calculate resulting income.""" +def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame: + """ + Allocate portfolio for capital target. + + Args: + df: DataFrame with ETF data + initial_capital: Initial capital amount + etf_allocations: List of ETF allocations + + Returns: + DataFrame with final allocation + """ try: - # Store initial capital in session state - st.session_state.initial_capital = capital - st.session_state.mode = "Capital Target" - st.session_state.target = capital + # Create final allocation DataFrame + final_alloc = df.copy() - results = [] - total_income = 0 + # Set allocations + for alloc in etf_allocations: + mask = final_alloc["Ticker"] == alloc["ticker"] + final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] - for _, row in df.iterrows(): - ticker = row["Ticker"] - alloc_pct = next((etf["allocation"] / 100 for etf in allocations if etf["ticker"] == ticker), 0) - if alloc_pct == 0: - continue - - # Calculate capital allocation - allocated_capital = capital * alloc_pct - shares = allocated_capital / row["Price"] - income = shares * row["Dividend Rate"] - total_income += income - - # Create result dictionary with all available data - result = { - "Ticker": ticker, - "Yield (%)": row["Yield (%)"], - "Dividend Rate": round(row["Dividend Rate"], 2), - "Capital Allocated ($)": round(allocated_capital, 2), - "Income Contributed ($)": round(income, 2), - "Allocation (%)": round(alloc_pct * 100, 2), - "Inception Date": row["Inception Date"], - "Distribution Period": row.get("Distribution Period", "Unknown"), - "Price": row["Price"] - } - - # Add NAV data if available - if "NAV" in row and row["NAV"] is not None: - result["NAV"] = row["NAV"] - if "Premium/Discount (%)" in row and row["Premium/Discount (%)"] is not None: - result["Premium/Discount (%)"] = row["Premium/Discount (%)"] - - results.append(result) - - alloc_df = pd.DataFrame(results) - alloc_df = assign_risk_level(alloc_df, allocations) - return alloc_df + # Calculate capital allocation and income + final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital + final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"] + final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 + return final_alloc except Exception as e: st.error(f"Error in capital allocation: {str(e)}") return None -# --- Portfolio Management Functions --- -def recalculate_portfolio(allocations): - """Recalculate portfolio with new allocations""" - try: - # Get the initial capital from session state - initial_capital = st.session_state.get('initial_capital') - if initial_capital is None: - st.error("Initial capital not found. Please run the portfolio simulation first.") - return None - - # Create a new DataFrame for the recalculated portfolio - final_alloc = pd.DataFrame() - - # Add tickers and their allocations - final_alloc["Ticker"] = list(allocations.keys()) - final_alloc["Allocation (%)"] = list(allocations.values()) - - # Merge with the global df to get other information - final_alloc = final_alloc.merge(df, on="Ticker", how="left") - - # Calculate capital allocation - final_alloc["Capital Allocated ($)"] = final_alloc["Allocation (%)"] * initial_capital / 100 - - # Calculate income contribution - final_alloc["Income Contributed ($)"] = final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"] / 100 - - # Store the recalculated portfolio in session state - st.session_state.final_alloc = final_alloc - - return final_alloc - - except Exception as e: - st.error(f"Error recalculating portfolio: {str(e)}") - return None - -def update_allocation(ticker, new_alloc): - """Update the allocation for a specific ticker.""" - st.session_state.etf_allocations[ticker] = new_alloc - -# --- DRIP Calculation Function --- -def calculate_drip_growth(portfolio_df: pd.DataFrame, months: int = DRIP_FORECAST_MONTHS, - erosion_type: str = "None", erosion_level: Any = 0) -> pd.DataFrame: - """ - Calculate the growth of a portfolio with dividend reinvestment (DRIP) over time. - - Args: - portfolio_df: DataFrame containing portfolio allocation data - months: Number of months to forecast - erosion_type: Type of erosion simulation ("None", "NAV & Yield Erosion") - erosion_level: Erosion configuration (0 or dict with global and per-ticker settings) - - Returns: - DataFrame with monthly portfolio growth data (exactly 'months' rows) - """ - # Extract needed data - initial_capital = portfolio_df["Capital Allocated ($)"].sum() - tickers = portfolio_df["Ticker"].tolist() - - # Calculate monthly erosion rate(s) if applicable - max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion - - # Initialize erosion rates for each ticker - ticker_nav_rates = {} - ticker_yield_rates = {} - - # Handle different erosion configurations - if erosion_type != "None" and isinstance(erosion_level, dict): - # Check if using per-ticker rates - if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: - # Get global defaults - global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - - # Apply per-ticker rates where available, global rates otherwise - for ticker in tickers: - ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) - ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - else: - # Use global rates for all tickers - global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - - for ticker in tickers: - ticker_nav_rates[ticker] = global_nav - ticker_yield_rates[ticker] = global_yield - else: - # No erosion - for ticker in tickers: - ticker_nav_rates[ticker] = 0 - ticker_yield_rates[ticker] = 0 - - # Create a dictionary of ticker-specific data for easier access - ticker_data = {} - for _, row in portfolio_df.iterrows(): - ticker = row["Ticker"] - ticker_data[ticker] = { - "price": row["Price"], - "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal - "initial_shares": row["Capital Allocated ($)"] / row["Price"], - "initial_allocation": row["Allocation (%)"] / 100, # Convert from % to decimal - "distribution": row.get("Distribution Period", "Monthly") - } - - # Initialize result data structure - results = [] - - # Initial portfolio state - current_shares = {ticker: data["initial_shares"] for ticker, data in ticker_data.items()} - current_prices = {ticker: data["price"] for ticker, data in ticker_data.items()} - current_yields = {ticker: data["yield_annual"] for ticker, data in ticker_data.items()} - current_total_value = initial_capital - - # Calculate the monthly dividend for each ETF based on distribution period - dividend_frequency = { - "Monthly": 12, - "Quarterly": 4, - "Semi-Annually": 2, - "Annually": 1, - "Unknown": 12 # Default to monthly if unknown - } - - # Calculate growth for each month (exactly 'months' total rows) - cumulative_income = 0 - - for month in range(1, months + 1): - # Calculate expected monthly income based on current portfolio and yields - monthly_income = sum( - (current_yields[ticker] / 12) * - (current_shares[ticker] * current_prices[ticker]) - for ticker in tickers - ) - - # Store month data (this reflects the portfolio at the START of the month) - month_data = { - "Month": month, - "Total Value ($)": current_total_value, - "Monthly Income ($)": monthly_income, - "Cumulative Income ($)": cumulative_income - } - - # Add shares and current price/yield for each ticker - for ticker in tickers: - month_data[f"{ticker} Shares"] = current_shares[ticker] - month_data[f"{ticker} Price ($)"] = current_prices[ticker] - month_data[f"{ticker} Yield (%)"] = current_yields[ticker] * 100 # Convert back to percentage - - results.append(month_data) - - # After recording the month's data, apply erosion and calculate dividends - # for the current month - - # Apply NAV and yield erosion to each ticker - for ticker in tickers: - # Apply NAV erosion - if ticker_nav_rates[ticker] > 0: - current_prices[ticker] *= (1 - ticker_nav_rates[ticker]) - - # Apply yield erosion - if ticker_yield_rates[ticker] > 0: - current_yields[ticker] *= (1 - ticker_yield_rates[ticker]) - - # Calculate dividends for each ETF - month_dividends = {} - for ticker, data in ticker_data.items(): - freq = dividend_frequency[data["distribution"]] - # Check if dividend is paid this month - if month % (12 / freq) == 0: - # Annual dividend / frequency = dividend per distribution - # Use current yield if yield erosion is being simulated - if ticker_yield_rates[ticker] > 0: - dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker] - else: - dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker] - else: - dividend = 0 - month_dividends[ticker] = dividend - - # Total dividends for this month - total_month_dividend = sum(month_dividends.values()) - cumulative_income += total_month_dividend - - # Only reinvest for the next month if we're not at the last month - if month < months: - # Reinvest dividends proportionally to original allocation - for ticker, data in ticker_data.items(): - # Calculate new shares purchased with reinvested dividends - # Use current price for calculation - if current_prices[ticker] > 0: # Avoid division by zero - new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker] - current_shares[ticker] += new_shares - - # Recalculate portfolio value with updated shares and prices - current_total_value = sum(current_shares[ticker] * current_prices[ticker] for ticker in tickers) - - return pd.DataFrame(results) - -# --- AI Erosion Risk Assessment --- -def analyze_etf_erosion_risk(tickers: List[str], debug: bool = False) -> pd.DataFrame: - """ - Analyze historical ETF data to estimate realistic NAV and yield erosion likelihood. - - Args: - tickers: List of ETF tickers to analyze - debug: Whether to show debug information - - Returns: - DataFrame with erosion risk assessment for each ETF - """ - risk_data = [] - current_date = pd.Timestamp.now(tz='UTC') - - for ticker in tickers: - try: - yf_ticker = yf.Ticker(ticker) - - # Get basic info with retry - info, _ = fetch_with_retry(lambda: yf_ticker.info) - if not info: - continue - - # Get historical price data (5 years or since inception) - hist, _ = fetch_with_retry(lambda: yf_ticker.history(period="5y")) - if hist.empty: - continue - - # Check ETF age - inception_date = info.get("fundInceptionDate") - etf_age_years = None - if inception_date: - try: - inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) - etf_age_years = (current_date - inception_date_dt).days / 365.25 - except: - pass - - # Get historical dividends - dividends, _ = fetch_with_retry(lambda: yf_ticker.dividends) - if dividends is None or dividends.empty: - continue - - # Calculate historical metrics - - # 1. NAV Erosion Analysis (using price as proxy for NAV) - # Calculate max drawdowns in different timeframes - rolling_max = hist["Close"].rolling(window=252, min_periods=1).max() - daily_drawdown = hist["Close"] / rolling_max - 1.0 - max_drawdown_1y = abs(daily_drawdown[-252:].min()) if len(daily_drawdown) >= 252 else None - - # Calculate annualized volatility - returns = hist["Close"].pct_change().dropna() - volatility = returns.std() * (252**0.5) # Annualized - - # 2. Yield Erosion Analysis - # Convert to pandas Series with a DatetimeIndex - if not isinstance(dividends, pd.Series): - dividends = dividends.reset_index() - dividends.columns = ["date", "amount"] - dividends = dividends.set_index("date")["amount"] - - # Calculate rolling 12-month dividend total - monthly_div = dividends.resample('M').sum() - rolling_12m_div = monthly_div.rolling(window=12, min_periods=6).sum() - - # Calculate the trend over time - if len(rolling_12m_div) > 12: - earliest_ttm = rolling_12m_div[11] - latest_ttm = rolling_12m_div[-1] - if earliest_ttm > 0: - dividend_trend = (latest_ttm / earliest_ttm) - 1 - else: - dividend_trend = 0 - - # Calculate worst dividend reduction - div_changes = rolling_12m_div.pct_change() - worst_div_change = div_changes.min() if not div_changes.empty else 0 - else: - dividend_trend = None - worst_div_change = None - - # 3. Risk Assessment - # Determine if ETF is new or established - is_new = etf_age_years is not None and etf_age_years < 2 - - # Assign erosion risk levels - if is_new: - # For new ETFs, use higher default risk - nav_erosion_risk = 5 # Medium - yield_erosion_risk = 6 # Medium-high - nav_risk_reason = "New ETF without significant history" - yield_risk_reason = "New ETF dividend pattern not established" - else: - # For established ETFs, base on historical data - - # NAV Erosion Risk (0-9 scale) - if max_drawdown_1y is not None: - if max_drawdown_1y > 0.40: - nav_erosion_risk = 7 # High risk - nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year" - elif max_drawdown_1y > 0.25: - nav_erosion_risk = 5 # Medium risk - nav_risk_reason = f"Experienced {max_drawdown_1y:.1%} max drawdown in the past year" - elif max_drawdown_1y > 0.15: - nav_erosion_risk = 3 # Lower-medium risk - nav_risk_reason = f"Moderate {max_drawdown_1y:.1%} max drawdown in the past year" - else: - nav_erosion_risk = 2 # Low risk - nav_risk_reason = f"Limited {max_drawdown_1y:.1%} max drawdown in the past year" - else: - nav_erosion_risk = 4 # Default medium-low - nav_risk_reason = "Insufficient price history" - - # Yield Erosion Risk (0-9 scale) - if worst_div_change is not None and not pd.isna(worst_div_change): - if worst_div_change < -0.30: - yield_erosion_risk = 8 # Very high risk - yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" - elif worst_div_change < -0.15: - yield_erosion_risk = 6 # High risk - yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" - elif worst_div_change < -0.05: - yield_erosion_risk = 4 # Medium risk - yield_risk_reason = f"Previously cut dividends by {abs(worst_div_change):.1%}" - elif dividend_trend is not None and dividend_trend < -0.10: - yield_erosion_risk = 5 # Medium risk due to declining trend - yield_risk_reason = f"Dividend trend declined by {abs(dividend_trend):.1%}" - elif dividend_trend is not None and dividend_trend > 0.10: - yield_erosion_risk = 2 # Low risk due to growing trend - yield_risk_reason = f"Dividend trend growing by {dividend_trend:.1%}" - else: - yield_erosion_risk = 3 # Low-medium risk - yield_risk_reason = "Stable dividend history" - else: - yield_erosion_risk = 4 # Default medium - yield_risk_reason = "Insufficient dividend history" - - # Adjust for volatility - if volatility > 0.40: - nav_erosion_risk = min(9, nav_erosion_risk + 2) # Increase risk for high volatility - nav_risk_reason += f" with high volatility ({volatility:.1%})" - elif volatility > 0.25: - nav_erosion_risk = min(9, nav_erosion_risk + 1) # Slightly increase risk - nav_risk_reason += f" with elevated volatility ({volatility:.1%})" - - # Convert to annual erosion percentage estimate - nav_erosion_pct = nav_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion - yield_erosion_pct = yield_erosion_risk / MAX_EROSION_LEVEL * 0.9 # Max 90% annual erosion - - risk_data.append({ - "Ticker": ticker, - "NAV Erosion Risk (0-9)": nav_erosion_risk, - "Yield Erosion Risk (0-9)": yield_erosion_risk, - "Estimated Annual NAV Erosion": f"{nav_erosion_pct:.1%}", - "Estimated Annual Yield Erosion": f"{yield_erosion_pct:.1%}", - "NAV Risk Explanation": nav_risk_reason, - "Yield Risk Explanation": yield_risk_reason, - "ETF Age (Years)": etf_age_years, - "Is New ETF": is_new, - "Max Drawdown (1Y)": max_drawdown_1y, - "Volatility (Annual)": volatility, - "Dividend Trend": dividend_trend - }) - - except Exception as e: - if debug: - st.error(f"Error analyzing {ticker}: {str(e)}") - continue - - return pd.DataFrame(risk_data) - -# --- Streamlit Setup --- -st.title("💸 ETF Dividend Portfolio Builder") - -# Initialize session state for real-time updates -if "simulation_run" not in st.session_state: +def reset_simulation(): + """Reset all simulation data and state.""" st.session_state.simulation_run = False -if "df_data" not in st.session_state: st.session_state.df_data = None -if "edited_allocations" not in st.session_state: - st.session_state.edited_allocations = None -if "show_recalculation" not in st.session_state: - st.session_state.show_recalculation = False -if "simulation_mode" not in st.session_state: - st.session_state.simulation_mode = "income_target" # Default mode -if "drip_enabled" not in st.session_state: - st.session_state.drip_enabled = False # Default DRIP setting -if "erosion_level" not in st.session_state: - st.session_state.erosion_level = 0 # Default erosion level -if "erosion_type" not in st.session_state: - st.session_state.erosion_type = "None" # Default erosion type -if "run_fmp_test" not in st.session_state: - st.session_state.run_fmp_test = False # Flag for FMP API test -if "fmp_api_key" not in st.session_state: - st.session_state.fmp_api_key = os.environ.get("FMP_API_KEY", "") # FMP API key -if "api_calls" not in st.session_state: - st.session_state.api_calls = 0 # API call counter -if "force_refresh_data" not in st.session_state: - st.session_state.force_refresh_data = False # Flag to force refresh data - -# Radio button to select simulation mode -simulation_mode = st.sidebar.radio( - "Choose Simulation Mode", - options=["Income Target Mode", "Capital Investment Mode"], - index=0 if st.session_state.simulation_mode == "income_target" else 1, - help="Choose whether to start with a monthly income goal or a fixed capital amount" -) - -# Update session state with selected mode -st.session_state.simulation_mode = "income_target" if simulation_mode == "Income Target Mode" else "capital_investment" - -# Add DRIP toggle -drip_enabled = st.sidebar.toggle( - "Enable Dividend Reinvestment (DRIP)", - value=st.session_state.drip_enabled, - help="When enabled, shows how reinvesting dividends compounds growth over time instead of taking income" -) -st.session_state.drip_enabled = drip_enabled - -# Initialize erosion_type from session state -erosion_type = st.session_state.erosion_type - -# Add erosion simulation slider -st.sidebar.subheader("Portfolio Risk Simulation") -erosion_enabled = st.sidebar.checkbox( - "Enable NAV & Yield Erosion", - value=erosion_type != "None", - help="Simulate price drops and dividend cuts over time" -) - -# Define max erosion constants -MAX_EROSION_LEVEL = 9 -max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion - -if erosion_enabled: - # Store the previous erosion type - erosion_type = "NAV & Yield Erosion" - st.session_state.erosion_type = erosion_type - - # Initialize per-ticker erosion settings if not already in session state - if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): - st.session_state.per_ticker_erosion = {} - - # Create advanced per-ticker erosion controls - st.sidebar.subheader("ETF Erosion Settings") - st.sidebar.write("Set custom erosion levels for each ETF") - - # Use the ETFs from the final allocation if simulation has run - if st.session_state.simulation_run and hasattr(st.session_state, 'final_alloc'): - tickers = st.session_state.final_alloc["Ticker"].unique().tolist() - # Otherwise use the ETFs from user input - elif "etf_allocations" in st.session_state: - tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] - else: - tickers = [] - - # Initialize or update per-ticker erosion settings - per_ticker_erosion = {} - - if tickers: - # Create a DataFrame for the per-ticker settings - per_ticker_data = [] - - for ticker in tickers: - # Get existing settings or use defaults (5 = medium erosion) - existing_settings = st.session_state.per_ticker_erosion.get(ticker, { - "nav": 5, - "yield": 5 - }) - - per_ticker_data.append({ - "Ticker": ticker, - "NAV Erosion (0-9)": existing_settings["nav"], - "Yield Erosion (0-9)": existing_settings["yield"], - }) - - # Create a data editor for the per-ticker settings - per_ticker_df = pd.DataFrame(per_ticker_data) - edited_df = st.sidebar.data_editor( - per_ticker_df, - column_config={ - "Ticker": st.column_config.TextColumn("Ticker", disabled=True), - "NAV Erosion (0-9)": st.column_config.NumberColumn( - "NAV Erosion (0-9)", - min_value=0, - max_value=MAX_EROSION_LEVEL, - step=1, - format="%d" - ), - "Yield Erosion (0-9)": st.column_config.NumberColumn( - "Yield Erosion (0-9)", - min_value=0, - max_value=MAX_EROSION_LEVEL, - step=1, - format="%d" - ), - }, - use_container_width=True, - num_rows="fixed", - hide_index=True, - key="per_ticker_editor" - ) - - # Save the edited values back to session state - for _, row in edited_df.iterrows(): - ticker = row["Ticker"] - per_ticker_erosion[ticker] = { - "nav": row["NAV Erosion (0-9)"], - "yield": row["Yield Erosion (0-9)"] - } - - st.session_state.per_ticker_erosion = per_ticker_erosion - - # Calculate some example annual erosion rates for display - max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion - - # Show sample erosion rates for different levels - st.sidebar.info(""" - **Erosion Level Guide:** - - Level 0: No erosion (0%) - - Level 5: Medium erosion (~40% annually) - - Level 9: Severe erosion (~90% annually) - """) - else: - st.sidebar.write("Add ETFs to enable per-ETF erosion settings") - - # Always use per-ticker settings - st.session_state.use_per_ticker_erosion = True - - # Store erosion settings for DRIP calculation - erosion_level = { - "global": { - "nav": 5, # Default medium level for global fallback - "yield": 5 - }, - "per_ticker": st.session_state.per_ticker_erosion, - "use_per_ticker": True - } - - # Update session state erosion level to match current settings - st.session_state.erosion_level = erosion_level -else: - # No erosion - erosion_type = "None" - st.session_state.erosion_type = erosion_type - erosion_level = 0 - -# Display appropriate input field based on mode -if st.session_state.simulation_mode == "income_target": - monthly_target = st.sidebar.number_input( - "Monthly Income Target ($)", - value=1500, - min_value=100, - help="Desired monthly dividend income. We'll calculate the required capital." - ) - initial_capital = None - ANNUAL_TARGET = monthly_target * 12 -else: - initial_capital = st.sidebar.number_input( - "Initial Capital ($)", - value=250000, - min_value=1000, - help="Amount of capital you want to invest. We'll calculate the expected monthly income." - ) - monthly_target = None - ANNUAL_TARGET = None # Will be calculated based on allocations and yields - -# Add PDF export information -with st.sidebar.expander("PDF Export Information"): - st.info(""" - For PDF export functionality, you'll need to install wkhtmltopdf on your system: - - - **Windows**: Download from [wkhtmltopdf.org](https://wkhtmltopdf.org/downloads.html) and install - - **Mac**: Run `brew install wkhtmltopdf` in Terminal - - **Linux**: Run `sudo apt-get install wkhtmltopdf` for Debian/Ubuntu or `sudo yum install wkhtmltopdf` for CentOS/RHEL - - After installation, restart the app for PDF export to work correctly. - """) - -# Manual ETF allocation input -st.sidebar.header("ETF Allocation") -if "etf_allocations" not in st.session_state: - st.session_state.etf_allocations = [] - -# Only show input fields if not in real-time adjustment mode or if no ETFs added yet -if not st.session_state.simulation_run or not st.session_state.etf_allocations: - col1, col2 = st.sidebar.columns([2, 1]) - with col1: - new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., JEPI)") - with col2: - new_allocation = st.number_input( - "Allocation (%)", - min_value=0.0, - max_value=100.0, - value=0.0, - step=1.0, - help="Percentage of total capital" - ) - - # Add button to add ETF - add_etf_button = st.sidebar.button("ADD ETF", use_container_width=True) - if add_etf_button and new_ticker and new_allocation > 0: - # Check if ticker already exists - if any(etf["ticker"] == new_ticker.upper() for etf in st.session_state.etf_allocations): - st.sidebar.warning(f"{new_ticker.upper()} is already in your portfolio. Please adjust its allocation instead.") - else: - # Validate ticker exists and has data before adding - validation_status = st.sidebar.empty() - validation_status.info(f"Validating {new_ticker.upper()}...") - - is_valid = False - - # Check ticker format first - if not re.match(r'^[A-Z]{1,7}$', new_ticker.upper()): - validation_status.error(f"Invalid ticker: {new_ticker.upper()}. Must be 1-7 uppercase letters.") - else: - try: - # Check if ticker has data - yf_ticker = yf.Ticker(new_ticker.upper()) - info, _ = fetch_with_retry(lambda: yf_ticker.info) - - if info and info.get("previousClose"): - is_valid = True - validation_status.success(f"Validated {new_ticker.upper()} successfully.") - else: - # If YFinance fails, try FMP API for high-yield ETFs if enabled - if USE_FMP_API and st.session_state.get("fmp_api_key"): - fmp_data, _ = fetch_fmp_data(new_ticker.upper()) - if fmp_data["quote"] and len(fmp_data["quote"]) > 0: - is_valid = True - validation_status.success(f"Validated {new_ticker.upper()} using FMP API data.") - - if not is_valid: - validation_status.error(f"Could not validate {new_ticker.upper()}. No price data available.") - except Exception as e: - validation_status.error(f"Error validating {new_ticker.upper()}: {str(e)}") - - if is_valid: - # Add new ETF to allocations - st.session_state.etf_allocations.append({ - "ticker": new_ticker.upper(), - "allocation": new_allocation - }) - st.sidebar.success(f"Added {new_ticker.upper()} with {new_allocation}% allocation.") - st.rerun() - elif add_etf_button: - # Show error if missing data - if not new_ticker: - st.sidebar.error("Please enter an ETF ticker.") - if new_allocation <= 0: - st.sidebar.error("Allocation must be greater than 0%.") - -# Calculate total allocation after potential addition -total_alloc = sum(etf["allocation"] for etf in st.session_state.etf_allocations) if st.session_state.etf_allocations else 0 - -# Display ETF allocations -if st.session_state.etf_allocations: - st.sidebar.subheader("Selected ETFs") - alloc_df = pd.DataFrame(st.session_state.etf_allocations) - alloc_df["Remove"] = [st.button(f"Remove {etf['ticker']}", key=f"remove_{i}") for i, etf in enumerate(st.session_state.etf_allocations)] - st.sidebar.dataframe(alloc_df[["ticker", "allocation"]], use_container_width=True) - st.sidebar.metric("Total Allocation (%)", f"{total_alloc:.2f}") - if total_alloc > 100: - st.error(f"Total allocation is {total_alloc:.2f}%, which exceeds 100%. Please adjust allocations.") - -# Advanced Options section in sidebar -with st.sidebar.expander("Advanced Options"): - # Option to toggle FMP API usage - use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API, - help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs") - if use_fmp_api != USE_FMP_API: - # Update global setting if changed - globals()["USE_FMP_API"] = use_fmp_api - st.success("FMP API usage setting updated") - - # Add FMP API Key input - fmp_api_key = st.text_input( - "FMP API Key", - value=os.environ.get("FMP_API_KEY", st.session_state.get("fmp_api_key", "")), - type="password", - help="Enter your Financial Modeling Prep API key for more accurate yield data." - ) - if fmp_api_key: - st.session_state.fmp_api_key = fmp_api_key - - # Add cache controls - st.subheader("Cache Settings") - - # Display cache statistics - cache_stats = get_cache_stats() - st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)") - - # Force refresh option - st.session_state.force_refresh_data = st.checkbox( - "Force refresh data (ignore cache)", - value=st.session_state.get("force_refresh_data", False), - help="When enabled, always fetch fresh data from APIs" - ) - - # Cache clearing options - col1, col2 = st.columns(2) - with col1: - if st.button("Clear All Cache", key="clear_all_cache"): - clear_cache() - st.success("All cache files cleared!") - st.session_state.api_calls = 0 - - with col2: - ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker") - if st.button("Clear") and ticker_to_clear: - clear_cache(ticker_to_clear) - st.success(f"Cache cleared for {ticker_to_clear.upper()}") - - # Show API call counter - st.write(f"API calls this session: {st.session_state.api_calls}") - - # Add button to run the FMP API test - if st.button("Test FMP API Connection"): - # Check if API key is available - if not fmp_api_key and not os.environ.get("FMP_API_KEY"): - st.error("Please provide an FMP API key first.") - else: - st.info("Opening FMP API test panel...") - # Set a flag to trigger the test in the main UI - st.session_state.run_fmp_test = True - st.rerun() - - # Add option for debug mode and parallel processing - debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.") - parallel_processing = st.checkbox("Enable Parallel Processing", value=True, - help="Fetch data for multiple ETFs simultaneously") - -api_key = st.sidebar.text_input( - "OpenAI API Key", - type="password", - help="Enter your OpenAI API key for ChatGPT summaries." -) - -# Show simulation button only initially, hide after simulation is run -if not st.session_state.simulation_run: - run_simulation = st.sidebar.button("Run Simulation", help="Launch capital allocation simulation", disabled=abs(total_alloc - 100) > 1 or total_alloc == 0) -else: - run_simulation = False - st.sidebar.success("Simulation ready - adjust allocations in the table below") - if st.sidebar.button("Reset Simulation", help="Start over with new ETFs"): - st.session_state.simulation_run = False - st.session_state.df_data = None - st.session_state.edited_allocations = None - st.session_state.show_recalculation = False - st.rerun() - -refresh_button = st.sidebar.button("Refresh Data", help="Clear cache and fetch new data") - -# Handle remove buttons -for i, etf in enumerate(st.session_state.etf_allocations.copy()): - if st.session_state.get(f"remove_{i}"): - st.session_state.etf_allocations.pop(i) - st.rerun() - -# --- Run App Logic --- -if refresh_button: - st.session_state.force_refresh_data = True # Force refresh when manually requested - st.cache_data.clear() + st.session_state.final_alloc = None + st.session_state.mode = 'Capital Target' + st.session_state.target = 0 + st.session_state.initial_capital = 0 + st.session_state.enable_drip = False + st.session_state.enable_erosion = False st.rerun() -# Check if FMP API test should be run -if st.session_state.get("run_fmp_test", False): - # Display the FMP API test UI - st.header("🔍 FMP API Test Tool") - st.write(""" - This tool allows you to test the Financial Modeling Prep API responses for ETF yield data. - Use this to verify accurate dividend yield information, especially for high-yield ETFs. - """) - - # Clear the flag so it won't show again unless requested - st.session_state.run_fmp_test = False - - # Run the API test function - test_fmp_api() - - # Stop execution to prevent the main app from rendering - st.stop() - -# Initial simulation run -if run_simulation: +def test_fmp_connection(): + """Test the FMP API connection and display status.""" try: - with st.spinner("Validating ETFs..."): - tickers = validate_etf_input(st.session_state.etf_allocations) - if not tickers: - st.stop() - - with st.spinner("Fetching ETF data..."): - df, skipped = fetch_etfs(",".join(tickers), debug_mode, parallel_processing) - # Store data in session state for reuse - st.session_state.df_data = df - - if not df.empty: - # Run appropriate allocation based on mode - if st.session_state.simulation_mode == "income_target": - final_alloc = allocate_for_income(df, ANNUAL_TARGET, st.session_state.etf_allocations) - else: - final_alloc = allocate_for_capital(df, initial_capital, st.session_state.etf_allocations) - - if final_alloc.empty: - st.error("Failed to allocate capital. Check ETF data or allocations.") - st.stop() - - # Mark simulation as run successfully - st.session_state.simulation_run = True - st.session_state.final_alloc = final_alloc - else: - st.error("❌ No valid ETF data retrieved. Check tickers or enable Debug Mode for details.") - st.session_state.simulation_run = False - if skipped: - st.subheader("🛑 Skipped Tickers") - st.write("The following tickers could not be processed. Enable Debug Mode for detailed logs.") - st.dataframe(pd.DataFrame(skipped, columns=["Ticker", "Reason", "Debug Info"]).drop(columns=["Debug Info"]), use_container_width=True) - + if not FMP_API_KEY: + return False, "No API key found" + + session = get_fmp_session() + test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}" + response = session.get(test_url) + + if response.status_code == 200: + data = response.json() + if data and isinstance(data, list) and len(data) > 0: + return True, "Connected" + return False, f"Error: {response.status_code}" except Exception as e: - st.error(f"Simulation failed: {str(e)}. Please check inputs or try again.") - st.session_state.simulation_run = False + return False, f"Error: {str(e)}" + +# Set page config +st.set_page_config( + page_title="ETF Portfolio Builder", + page_icon="📈", + layout="wide", + initial_sidebar_state="expanded" +) + +# Initialize session state variables +if 'simulation_run' not in st.session_state: + st.session_state.simulation_run = False +if 'df_data' not in st.session_state: + st.session_state.df_data = None +if 'final_alloc' not in st.session_state: + st.session_state.final_alloc = None +if 'mode' not in st.session_state: + st.session_state.mode = 'Capital Target' +if 'target' not in st.session_state: + st.session_state.target = 0 +if 'initial_capital' not in st.session_state: + st.session_state.initial_capital = 0 +if 'enable_drip' not in st.session_state: + st.session_state.enable_drip = False +if 'enable_erosion' not in st.session_state: + st.session_state.enable_erosion = False + +# Main title +st.title("📈 ETF Portfolio Builder") + +# Sidebar for simulation parameters +with st.sidebar: + st.header("Simulation Parameters") + + # Add refresh data button at the top + if st.button("🔄 Refresh Data", use_container_width=True): + st.info("Refreshing ETF data...") + # Add your data refresh logic here + st.success("Data refreshed successfully!") + + # Mode selection + simulation_mode = st.radio( + "Select Simulation Mode", + ["Capital Target", "Income Target"] + ) + + if simulation_mode == "Income Target": + monthly_target = st.number_input( + "Monthly Income Target ($)", + min_value=100.0, + max_value=100000.0, + value=1000.0, + step=100.0 + ) + ANNUAL_TARGET = monthly_target * 12 + else: + initial_capital = st.number_input( + "Initial Capital ($)", + min_value=1000.0, + max_value=1000000.0, + value=100000.0, + step=1000.0 + ) + + # Risk tolerance + risk_tolerance = st.select_slider( + "Risk Tolerance", + options=["Conservative", "Moderate", "Aggressive"], + value="Moderate" + ) + + # Additional options + st.subheader("Additional Options") + + # DRIP option + enable_drip = st.radio( + "Enable Dividend Reinvestment (DRIP)", + ["Yes", "No"], + index=1 + ) + + # Erosion options + enable_erosion = st.radio( + "Enable NAV & Yield Erosion", + ["Yes", "No"], + index=1 + ) + + # ETF Selection + st.subheader("ETF Selection") + + # Create a form for ETF selection + with st.form("etf_selection_form"): + # Number of ETFs + num_etfs = st.number_input("Number of ETFs", min_value=1, max_value=10, value=3, step=1) + + # Create columns for ETF inputs + etf_inputs = [] + for i in range(num_etfs): + ticker = st.text_input(f"ETF {i+1} Ticker", key=f"ticker_{i}") + etf_inputs.append({"ticker": ticker}) + + # Submit button + submitted = st.form_submit_button("Run Portfolio Simulation", type="primary") + + if submitted: + try: + # Store parameters in session state + st.session_state.mode = simulation_mode + st.session_state.enable_drip = enable_drip == "Yes" + st.session_state.enable_erosion = enable_erosion == "Yes" + + if simulation_mode == "Income Target": + st.session_state.target = monthly_target + else: + st.session_state.target = initial_capital + st.session_state.initial_capital = initial_capital + + # Run simulation + df_data, final_alloc = run_portfolio_simulation( + simulation_mode.lower().replace(" ", "_"), + st.session_state.target, + risk_tolerance, + etf_inputs, + st.session_state.enable_drip, + st.session_state.enable_erosion + ) + + # Store results in session state + st.session_state.simulation_run = True + st.session_state.df_data = df_data + st.session_state.final_alloc = final_alloc + + st.success("Portfolio simulation completed!") + st.rerun() + + except Exception as e: + st.error(f"Error running simulation: {str(e)}") + + # Add reset simulation button at the bottom of sidebar + if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"): + reset_simulation() + + # Add FMP connection status to the navigation bar + st.sidebar.markdown("---") + st.sidebar.subheader("FMP API Status") + connection_status, message = test_fmp_connection() + if connection_status: + st.sidebar.success(f"✅ FMP API: {message}") + else: + st.sidebar.error(f"❌ FMP API: {message}") # Display results and interactive allocation adjustment UI after simulation is run if st.session_state.simulation_run and st.session_state.df_data is not None: @@ -2536,13 +791,48 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: portfolio_summary(final_alloc) # Display mode-specific information - if st.session_state.simulation_mode == "income_target": + if st.session_state.mode == "Income Target": st.info(f"🎯 **Income Target Mode**: You need ${final_alloc['Capital Allocated ($)'].sum():,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).") else: annual_income = final_alloc["Income Contributed ($)"].sum() monthly_income = annual_income / 12 st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).") + # Add save/load section + st.subheader("💾 Save/Load Portfolio") + + # Create two columns for save and load + save_col, load_col = st.columns(2) + + with save_col: + st.write("Save current portfolio") + portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name") + if st.button("Save Portfolio", key="save_portfolio"): + if portfolio_name: + if save_portfolio(portfolio_name, final_alloc, + st.session_state.mode, + st.session_state.target): + st.success(f"Portfolio '{portfolio_name}' saved successfully!") + else: + st.warning("Please enter a portfolio name.") + + with load_col: + st.write("Load saved portfolio") + if st.button("Show Saved Portfolios", key="show_portfolios"): + saved_portfolios = list_saved_portfolios() + if saved_portfolios: + selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio") + if st.button("Load Portfolio", key="load_portfolio_btn"): + loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio) + if loaded_df is not None: + st.session_state.final_alloc = loaded_df + st.session_state.mode = loaded_mode + st.session_state.target = loaded_target + st.success(f"Portfolio '{selected_portfolio}' loaded successfully!") + st.rerun() + else: + st.info("No saved portfolios found.") + # Display full detailed allocation table st.subheader("📊 Capital Allocation Details") @@ -2567,7 +857,7 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: "Allocation (%)", min_value=0.0, max_value=100.0, - step=1.0, + step=0.1, format="%.1f" ), "Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True), @@ -2581,11 +871,15 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: # Calculate total allocation total_alloc = edited_df["Allocation (%)"].sum() - # Display total allocation - st.metric("Total Allocation (%)", f"{total_alloc:.2f}", - delta=f"{total_alloc - 100:.2f}" if abs(total_alloc - 100) > 0.01 else None) + # Display total allocation with color coding + if abs(total_alloc - 100) <= 0.1: + st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None) + else: + st.metric("Total Allocation (%)", f"{total_alloc:.2f}", + delta=f"{total_alloc - 100:.2f}", + delta_color="off") - if abs(total_alloc - 100) > 0.01: + if abs(total_alloc - 100) > 0.1: st.warning("Total allocation should be 100%") # Create columns for quick actions @@ -2602,7 +896,7 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: # Submit button for manual edits submitted = st.form_submit_button("Update Allocations", - disabled=abs(total_alloc - 100) > 1, + disabled=abs(total_alloc - 100) > 0.1, type="primary", use_container_width=True) @@ -2616,9 +910,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()] # Get the mode and target from session state - mode = st.session_state.get('mode', 'Capital Target') - target = st.session_state.get('target', 0) - initial_capital = st.session_state.get('initial_capital', 0) + mode = st.session_state.mode + target = st.session_state.target + initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": @@ -2646,9 +940,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state - mode = st.session_state.get('mode', 'Capital Target') - target = st.session_state.get('target', 0) - initial_capital = st.session_state.get('initial_capital', 0) + mode = st.session_state.mode + target = st.session_state.target + initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": @@ -2677,9 +971,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation}) # Get the mode and target from session state - mode = st.session_state.get('mode', 'Capital Target') - target = st.session_state.get('target', 0) - initial_capital = st.session_state.get('initial_capital', 0) + mode = st.session_state.mode + target = st.session_state.target + initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": @@ -2704,9 +998,9 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state - mode = st.session_state.get('mode', 'Capital Target') - target = st.session_state.get('target', 0) - initial_capital = st.session_state.get('initial_capital', 0) + mode = st.session_state.mode + target = st.session_state.target + initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": @@ -2719,654 +1013,4 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: st.success("Portfolio adjusted to focus on capital!") st.rerun() except Exception as e: - st.error(f"Error focusing on capital: {str(e)}") - - # Display charts - col1, col2 = st.columns(2) - with col1: - fig1 = px.bar( - final_alloc, - x="Ticker", - y="Capital Allocated ($)", - title="Capital Allocation by ETF", - template="plotly_dark", - hover_data=["Yield (%)", "Income Contributed ($)", "Allocation (%)", "Risk Level"], - labels={"Capital Allocated ($)": "Capital ($)"} - ) - fig1.update_traces(marker_color="#1f77b4") - st.plotly_chart(fig1, use_container_width=True) - with col2: - fig2 = px.bar( - final_alloc, - x="Ticker", - y="Income Contributed ($)", - title="Income Contribution by ETF", - template="plotly_dark", - hover_data=["Yield (%)", "Capital Allocated ($)", "Allocation (%)", "Risk Level"], - labels={"Income Contributed ($)": "Income ($)"} - ) - fig2.update_traces(marker_color="#ff7f0e") - st.plotly_chart(fig2, use_container_width=True) - - # Display NAV Premium/Discount chart if data is available - st.subheader("📈 NAV Premium/Discount") - premium_discount_chart(final_alloc["Ticker"].tolist(), df, debug_mode) - - # Display trend charts - trend_tabs = st.tabs(["📉 Yield Trends", "📊 NAV Trends"]) - - with trend_tabs[0]: - yield_chart(final_alloc["Ticker"].tolist(), debug_mode) - - with trend_tabs[1]: - nav_chart(final_alloc["Ticker"].tolist(), debug_mode) - - with tab2: - st.subheader("📈 Dividend Reinvestment (DRIP) Forecast") - - # Calculate DRIP growth with erosion simulation if enabled - drip_forecast = calculate_drip_growth( - final_alloc, - erosion_type=erosion_type, - erosion_level=erosion_level - ) - - # Display explanatory text - st.write("This forecast shows the growth of your portfolio over time if dividends are reinvested instead of taken as income.") - base_assumptions = "ETF prices remain constant, dividends are reinvested proportionally to original allocations" - - # Show erosion information if enabled - if erosion_type != "None" and isinstance(erosion_level, dict): - # Check if using per-ticker rates - if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: - st.write("**Erosion Simulation:** Custom erosion rates applied per ETF") - - # Format the per-ticker erosion rates for display - per_ticker_display = [] - for ticker in tickers: - if ticker in erosion_level["per_ticker"]: - ticker_settings = erosion_level["per_ticker"][ticker] - nav_rate = (1 - (1 - (ticker_settings["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - yield_rate = (1 - (1 - (ticker_settings["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - - per_ticker_display.append({ - "Ticker": ticker, - "NAV Erosion (Annual %)": f"{nav_rate:.1f}%", - "Yield Erosion (Annual %)": f"{yield_rate:.1f}%" - }) - - # Display the per-ticker settings in a table - st.dataframe( - pd.DataFrame(per_ticker_display), - use_container_width=True, - hide_index=True - ) - - st.write(f"Assumptions: {base_assumptions}, with custom erosion applied monthly per ETF.") - else: - # Global rates only - nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - st.write(f"**Erosion Simulation:** NAV erosion at {nav_annual:.1f}% annual rate, Yield erosion at {yield_annual:.1f}% annual rate") - st.write(f"Assumptions: {base_assumptions}, with erosion applied monthly to all ETFs.") - else: - st.write(f"Assumptions: {base_assumptions}.") - - # Create columns for key metrics - col1, col2, col3, col4 = st.columns(4) - - # Extract key metrics from forecast - initial_value = drip_forecast["Total Value ($)"].iloc[0] - final_value = drip_forecast["Total Value ($)"].iloc[-1] - value_growth = final_value - initial_value - value_growth_pct = (value_growth / initial_value) * 100 - - initial_income = drip_forecast["Monthly Income ($)"].iloc[0] - final_income = drip_forecast["Monthly Income ($)"].iloc[-1] - income_growth = final_income - initial_income - income_growth_pct = (income_growth / initial_income) * 100 - - total_dividends = drip_forecast["Cumulative Income ($)"].iloc[-1] - capital_recovery_pct = (total_dividends / initial_value) * 100 - - # Display key metrics - with col1: - st.metric( - "Portfolio Value Growth", - f"${value_growth:,.2f}", - f"{value_growth_pct:.2f}%" - ) - with col2: - st.metric( - "Monthly Income Growth", - f"${income_growth:,.2f}", - f"{income_growth_pct:.2f}%" - ) - with col3: - st.metric( - "Total Dividends Earned", - f"${total_dividends:,.2f}" - ) - with col4: - st.metric( - "Capital Recovery", - f"{capital_recovery_pct:.2f}%" - ) - - # Display a line chart showing portfolio growth - st.subheader("Portfolio Value Growth") - fig1 = px.line( - drip_forecast, - x="Month", - y="Total Value ($)", - title="Portfolio Value Growth with DRIP", - markers=True, - template="plotly_dark", - ) - fig1.update_traces(line=dict(color="#1f77b4", width=3)) - fig1.update_layout( - xaxis=dict(tickmode='linear', tick0=0, dtick=1), - yaxis=dict(title="Portfolio Value ($)") - ) - st.plotly_chart(fig1, use_container_width=True) - - # Display a line chart showing monthly income growth - st.subheader("Monthly Income Growth") - fig2 = px.line( - drip_forecast, - x="Month", - y="Monthly Income ($)", - title="Monthly Income Growth with DRIP", - markers=True, - template="plotly_dark" - ) - fig2.update_traces(line=dict(color="#ff7f0e", width=3)) - fig2.update_layout( - xaxis=dict(tickmode='linear', tick0=0, dtick=1), - yaxis=dict(title="Monthly Income ($)") - ) - st.plotly_chart(fig2, use_container_width=True) - - # Display detailed forecast table - st.subheader("DRIP Forecast Details") - - # Format the data for display - display_forecast = drip_forecast.copy() - display_forecast["Total Value ($)"] = display_forecast["Total Value ($)"].apply(lambda x: f"${x:,.2f}") - display_forecast["Monthly Income ($)"] = display_forecast["Monthly Income ($)"].apply(lambda x: f"${x:,.2f}") - display_forecast["Cumulative Income ($)"] = display_forecast["Cumulative Income ($)"].apply(lambda x: f"${x:,.2f}") - - # Format share counts and prices - share_columns = [col for col in display_forecast.columns if "Shares" in col] - price_columns = [col for col in display_forecast.columns if "Price" in col] - yield_columns = [col for col in display_forecast.columns if "Yield (%)" in col] - - for col in share_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.4f}") - - for col in price_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"${x:.2f}") - - for col in yield_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.2f}%") - - # Create a more organized view by grouping columns - basic_columns = ["Month", "Total Value ($)", "Monthly Income ($)", "Cumulative Income ($)"] - - # Create tabs for different views of the data - detail_tabs = st.tabs(["Summary View", "Full Details"]) - - with detail_tabs[0]: - st.dataframe(display_forecast[basic_columns], use_container_width=True) - - with detail_tabs[1]: - # Group columns by ticker for better readability - ticker_columns = {} - for ticker in tickers: - ticker_columns[ticker] = [ - f"{ticker} Shares", - f"{ticker} Price ($)", - f"{ticker} Yield (%)" - ] - - # Create ordered columns list: first basic columns, then grouped by ticker - ordered_columns = basic_columns.copy() - for ticker in tickers: - ordered_columns.extend(ticker_columns[ticker]) - - st.dataframe( - display_forecast[ordered_columns], - use_container_width=True, - height=500 # Increase height to show more rows - ) - - # Add comparison between DRIP and No-DRIP strategies - st.subheader("📊 1-Year DRIP vs. No-DRIP Comparison") - - # Add note about erosion effects if applicable - if erosion_type != "None" and isinstance(erosion_level, dict): - if erosion_level.get("use_per_ticker", False): - st.info(""" - This comparison factors in the custom per-ETF erosion rates. - Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. - """) - else: - nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - st.info(f""" - This comparison factors in: - - NAV Erosion: {nav_annual:.1f}% annually - - Yield Erosion: {yield_annual:.1f}% annually - - Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. - """) - - # Calculate no-drip scenario (taking dividends as income) - initial_value = drip_forecast["Total Value ($)"].iloc[0] - initial_monthly_income = drip_forecast["Monthly Income ($)"].iloc[0] - annual_income = initial_monthly_income * 12 - - # Get the final prices after erosion from the last month of the DRIP forecast - final_prices = {} - for ticker in tickers: - price_col = f"{ticker} Price ($)" - if price_col in drip_forecast.columns: - final_prices[ticker] = drip_forecast[price_col].iloc[-1] - else: - # Fallback to initial price if column doesn't exist - final_prices[ticker] = ticker_data_dict[ticker]["price"] - - # Extract initial shares for each ETF from month 1 - initial_shares = {ticker: drip_forecast.iloc[0][f"{ticker} Shares"] for ticker in tickers} - - # Calculate the No-DRIP final value by multiplying initial shares by final prices - # This correctly accounts for NAV erosion while keeping shares constant - nodrip_final_value = sum(initial_shares[ticker] * final_prices[ticker] for ticker in tickers) - - # The final income should account for erosion but not compounding growth - # This requires simulation of the erosion that would have happened - if erosion_type != "None" and isinstance(erosion_level, dict): - # Initialize the current prices and yields from the final_alloc dataframe - ticker_data_dict = {} - current_prices = {} - current_yields = {} - - # Reconstruct ticker data from final_alloc - for _, row in final_alloc.iterrows(): - ticker = row["Ticker"] - ticker_data_dict[ticker] = { - "price": row["Price"], - "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal - "distribution": row.get("Distribution Period", "Monthly") - } - current_prices[ticker] = row["Price"] - current_yields[ticker] = row["Yield (%)"] / 100 - - # Get the erosion rates for each ticker - if erosion_level.get("use_per_ticker", False): - ticker_nav_rates = {} - ticker_yield_rates = {} - for ticker in tickers: - ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) - ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - else: - # Use global rates for all tickers - global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - ticker_nav_rates = {ticker: global_nav for ticker in tickers} - ticker_yield_rates = {ticker: global_yield for ticker in tickers} - - # Apply 12 months of erosion - for month in range(1, 13): - # Apply erosion to each ticker - for ticker in tickers: - # Apply NAV erosion - if ticker_nav_rates[ticker] > 0: - current_prices[ticker] *= (1 - ticker_nav_rates[ticker]) - - # Apply yield erosion - if ticker_yield_rates[ticker] > 0: - current_yields[ticker] *= (1 - ticker_yield_rates[ticker]) - - # Calculate final monthly income with eroded prices and yields but original shares - final_monthly_income_nodrip = sum( - (current_yields[ticker] / 12) * - (initial_shares[ticker] * current_prices[ticker]) - for ticker in tickers - ) - else: - # No erosion, so final income is the same as initial income - final_monthly_income_nodrip = initial_monthly_income - - nodrip_final_annual_income = final_monthly_income_nodrip * 12 - - # Get values for DRIP scenario from forecast - drip_final_value = drip_forecast["Total Value ($)"].iloc[-1] - drip_final_monthly_income = drip_forecast["Monthly Income ($)"].iloc[-1] - drip_annual_income_end = drip_final_monthly_income * 12 - - # Create comparison dataframe with withdrawn income for a more complete financial picture - - # For No-DRIP strategy, calculate cumulative withdrawn income (sum of monthly dividends) - # This is equivalent to the cumulative income in the DRIP forecast, but in No-DRIP it's withdrawn - withdrawn_income = 0 - monthly_dividends = [] - - # Reconstruct the monthly dividend calculation for No-DRIP - current_prices_monthly = {ticker: ticker_data_dict[ticker]["price"] for ticker in tickers} - current_yields_monthly = {ticker: ticker_data_dict[ticker]["yield_annual"] for ticker in tickers} - - for month in range(1, 13): - # Calculate dividends for this month based on current yields and prices - month_dividend = sum( - (current_yields_monthly[ticker] / 12) * - (initial_shares[ticker] * current_prices_monthly[ticker]) - for ticker in tickers - ) - withdrawn_income += month_dividend - monthly_dividends.append(month_dividend) - - # Apply erosion for next month - if erosion_type != "None": - for ticker in tickers: - # Apply NAV erosion - if ticker in ticker_nav_rates and ticker_nav_rates[ticker] > 0: - current_prices_monthly[ticker] *= (1 - ticker_nav_rates[ticker]) - - # Apply yield erosion - if ticker in ticker_yield_rates and ticker_yield_rates[ticker] > 0: - current_yields_monthly[ticker] *= (1 - ticker_yield_rates[ticker]) - - # Calculate total economic result (final value + withdrawn income) - nodrip_economic_result = nodrip_final_value + withdrawn_income - drip_economic_result = drip_final_value # No withdrawals - - comparison_data = { - "Strategy": ["Take Income (No DRIP)", "Reinvest Dividends (DRIP)"], - "Initial Portfolio Value": [f"${initial_value:,.2f}", f"${initial_value:,.2f}"], - "Final Portfolio Value": [f"${nodrip_final_value:,.2f}", f"${drip_final_value:,.2f}"], - "Value Change": [ - f"${nodrip_final_value - initial_value:,.2f} ({(nodrip_final_value/initial_value - 1)*100:.2f}%)", - f"${drip_final_value - initial_value:,.2f} ({(drip_final_value/initial_value - 1)*100:.2f}%)" - ], - "Income Withdrawn": [f"${withdrawn_income:,.2f}", "$0.00"], - "Total Economic Result": [ - f"${nodrip_economic_result:,.2f} ({(nodrip_economic_result/initial_value - 1)*100:.2f}%)", - f"${drip_economic_result:,.2f} ({(drip_economic_result/initial_value - 1)*100:.2f}%)" - ], - "Initial Monthly Income": [f"${initial_monthly_income:,.2f}", f"${initial_monthly_income:,.2f}"], - "Final Monthly Income": [f"${final_monthly_income_nodrip:,.2f}", f"${drip_final_monthly_income:,.2f}"], - "Income Change": [ - f"${final_monthly_income_nodrip - initial_monthly_income:,.2f} ({(final_monthly_income_nodrip/initial_monthly_income - 1)*100:.2f}%)", - f"${drip_final_monthly_income - initial_monthly_income:,.2f} ({(drip_final_monthly_income/initial_monthly_income - 1)*100:.2f}%)" - ], - } - - # Add chart to visualize the No-DRIP income stream - if erosion_type != "None": - # Calculate the effect of erosion on value - pure_nav_effect = sum( - initial_shares[ticker] * (final_prices[ticker] - ticker_data_dict[ticker]["price"]) - for ticker in tickers - ) - - # Explain the NAV erosion impact on no-DRIP strategy and highlight the benefit of income - st.info(f""" - **Economic Comparison:** - - No-DRIP: While portfolio value decreased by ${abs(pure_nav_effect):,.2f} ({pure_nav_effect/initial_value*100:.2f}%), - you received ${withdrawn_income:,.2f} in income ({withdrawn_income/initial_value*100:.2f}% of initial investment) - - DRIP: No income taken, but final portfolio value is ${drip_final_value:,.2f} - ({(drip_final_value/initial_value - 1)*100:.2f}% vs. initial investment) - - **Total Economic Result** combines final portfolio value with total withdrawn income for the complete financial picture - """) - - # Show a chart of monthly income for the No-DRIP scenario - monthly_income_df = pd.DataFrame({ - "Month": list(range(1, 13)), - "Monthly Income": monthly_dividends - }) - - fig = px.line( - monthly_income_df, - x="Month", - y="Monthly Income", - title="Monthly Income Withdrawn (No-DRIP Strategy)", - markers=True, - template="plotly_dark" - ) - fig.update_traces(line=dict(color="#ff7f0e", width=3)) - st.plotly_chart(fig, use_container_width=True) - - comparison_df = pd.DataFrame(comparison_data) - st.dataframe(comparison_df, use_container_width=True, hide_index=True) - - # Show time to recover initial capital - CORRECTED CALCULATION - - # For DRIP: Calculate what remains to recover (initial value - current value) - drip_remaining_to_recover = max(0, initial_value - drip_final_value) - # Time to recover the remaining amount at the final income rate - years_to_recover_with_drip = drip_remaining_to_recover / drip_annual_income_end if drip_annual_income_end > 0 else 0 - - # For No-DRIP: Calculate what remains to recover (initial value - [final value + withdrawn income]) - nodrip_economic_result = nodrip_final_value + withdrawn_income - nodrip_remaining_to_recover = max(0, initial_value - nodrip_economic_result) - # Time to recover the remaining amount at the final income rate - years_to_recover_no_drip = nodrip_remaining_to_recover / nodrip_final_annual_income if nodrip_final_annual_income > 0 else 0 - - # Convert to months and format display - def format_recovery_time(years): - if years <= 0: - return "0 months" - - months = int(years * 12) - if months < 12: - return f"{months} months" - else: - years_part = months // 12 - months_part = months % 12 - if months_part == 0: - return f"{years_part} years" - else: - return f"{years_part} years, {months_part} months" - - col1, col2 = st.columns(2) - with col1: - st.metric( - "Remaining Time to Recover Capital (No DRIP)", - format_recovery_time(years_to_recover_no_drip) - ) - with col2: - st.metric( - "Remaining Time to Recover Capital (DRIP)", - format_recovery_time(years_to_recover_with_drip), - f"Difference: {format_recovery_time(abs(years_to_recover_no_drip - years_to_recover_with_drip))}" - ) - - st.write(""" - **Note:** This shows the *remaining* time needed to fully recover your initial investment, - taking into account both current portfolio value and income already withdrawn. - - For No-DRIP: Initial Value - (Current Value + Withdrawn Income) = Amount left to recover - For DRIP: Initial Value - Current Value = Amount left to recover - """) - - with tab3: - st.subheader("📉 AI Erosion Risk Assessment") - - # Add explanatory text - st.write(""" - This analysis uses historical ETF data to estimate reasonable erosion settings - based on past performance, volatility, and dividend history. - """) - - # Run the analysis in a spinner - with st.spinner("Analyzing historical ETF data..."): - risk_df = analyze_etf_erosion_risk(final_alloc["Ticker"].tolist(), debug_mode) - - if not risk_df.empty: - # Create a summary table with key insights - display_risk_df = risk_df.copy() - - # Format columns for display - if "ETF Age (Years)" in display_risk_df.columns: - display_risk_df["ETF Age (Years)"] = display_risk_df["ETF Age (Years)"].apply( - lambda x: f"{x:.1f} years" if pd.notna(x) else "Unknown" - ) - if "Volatility (Annual)" in display_risk_df.columns: - display_risk_df["Volatility (Annual)"] = display_risk_df["Volatility (Annual)"].apply( - lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" - ) - if "Max Drawdown (1Y)" in display_risk_df.columns: - display_risk_df["Max Drawdown (1Y)"] = display_risk_df["Max Drawdown (1Y)"].apply( - lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" - ) - if "Dividend Trend" in display_risk_df.columns: - display_risk_df["Dividend Trend"] = display_risk_df["Dividend Trend"].apply( - lambda x: f"{x:.1%}" if pd.notna(x) else "Unknown" - ) - - # Display main assessment table - st.subheader("Recommended Erosion Settings") - main_columns = [ - "Ticker", - "NAV Erosion Risk (0-9)", - "Yield Erosion Risk (0-9)", - "Estimated Annual NAV Erosion", - "Estimated Annual Yield Erosion", - "NAV Risk Explanation", - "Yield Risk Explanation" - ] - - st.dataframe( - display_risk_df[main_columns], - use_container_width=True, - hide_index=True - ) - - # Allow applying these settings to the simulation - if st.button("Apply Recommended Erosion Settings", type="primary"): - # Initialize or update per-ticker erosion settings - if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): - st.session_state.per_ticker_erosion = {} - - # Update the session state with recommended settings - for _, row in risk_df.iterrows(): - ticker = row["Ticker"] - st.session_state.per_ticker_erosion[ticker] = { - "nav": int(row["NAV Erosion Risk (0-9)"]), - "yield": int(row["Yield Erosion Risk (0-9)"]) - } - - # Enable erosion and per-ticker settings - st.session_state.erosion_type = "NAV & Yield Erosion" - st.session_state.use_per_ticker_erosion = True - - # Update the erosion_level variable to match the new settings - erosion_level = { - "global": { - "nav": 5, # Default medium level for global fallback - "yield": 5 - }, - "per_ticker": st.session_state.per_ticker_erosion, - "use_per_ticker": True - } - - # Update session state erosion level for DRIP forecast - st.session_state.erosion_level = erosion_level - - st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.") - st.info("Go to the DRIP Forecast tab to see the impact of these settings.") - - # Display additional risk metrics in an expander - with st.expander("View Detailed Risk Metrics"): - detail_columns = [ - "Ticker", - "ETF Age (Years)", - "Is New ETF", - "Volatility (Annual)", - "Max Drawdown (1Y)", - "Dividend Trend" - ] - - st.dataframe( - display_risk_df[detail_columns], - use_container_width=True, - hide_index=True - ) - - st.write(""" - **Understanding the Metrics:** - - **ETF Age**: Newer ETFs have less historical data and may be assigned higher risk - - **Volatility**: Higher volatility suggests higher NAV erosion risk - - **Max Drawdown**: Maximum peak-to-trough decline, indicating worst historical NAV erosion - - **Dividend Trend**: Positive values indicate growing dividends, negative values indicate declining dividends - """) - else: - st.warning("Unable to perform risk assessment. Check ticker data or try again.") - - with tab4: - st.subheader("🤖 AI Portfolio Suggestions") - - # Update AI suggestion to match simulation mode - if st.session_state.simulation_mode == "income_target": - suggestion_df = ai_suggestion(df, ANNUAL_TARGET, st.session_state.etf_allocations) - else: - # For capital mode, we need to modify the AI suggestion logic - # First generate optimized allocations - ai_allocations = ai_suggestion(df, 1000, st.session_state.etf_allocations) # Use dummy target - if not ai_allocations.empty: - # Then use those allocations with the actual capital - ai_allocs_list = [{"ticker": row["Ticker"], "allocation": row["Allocation (%)"]} - for _, row in ai_allocations.iterrows()] - suggestion_df = allocate_for_capital(df, initial_capital, ai_allocs_list) - else: - suggestion_df = pd.DataFrame() - - if not suggestion_df.empty: - mode_message = "The AI has optimized the portfolio to minimize capital while mitigating risk" if st.session_state.simulation_mode == "income_target" else "The AI has optimized the income from your investment while mitigating risk" - st.write(f"{mode_message}, using validated 2024 yield data and ETF longevity.") - portfolio_summary(suggestion_df) - - # Format currencies for better readability - ai_display_df = suggestion_df.copy() - ai_display_df["Capital Allocated ($)"] = ai_display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}") - ai_display_df["Income Contributed ($)"] = ai_display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}") - ai_display_df["Yield (%)"] = ai_display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%") - - st.dataframe( - ai_display_df[["Ticker", "Capital Allocated ($)", "Income Contributed ($)", - "Allocation (%)", "Yield (%)", "Risk Level"]], - use_container_width=True, - hide_index=True - ) - - # Add button to apply AI suggestions - if st.button("Apply AI Suggested Allocations", type="primary"): - # Convert AI suggestions to the format needed for recalculation - ai_allocations = {row["Ticker"]: row["Allocation (%)"] for _, row in suggestion_df.iterrows()} - - # Update sidebar ETF allocations to match AI suggestions - new_etf_allocations = [] - for ticker, allocation in ai_allocations.items(): - new_etf_allocations.append({ - "ticker": ticker, - "allocation": allocation - }) - - # Update session state with new allocations - st.session_state.etf_allocations = new_etf_allocations - - # Recalculate portfolio with new allocations - final_alloc = recalculate_portfolio(ai_allocations) - st.session_state.final_alloc = final_alloc - st.success("Applied AI suggested allocations to your portfolio!") - st.rerun() - else: - st.error("AI Suggestion failed to generate. Check ETF data.") - - with tab5: - st.subheader("📊 ETF Details") - # ... existing code ... - -# End of file \ No newline at end of file + st.error(f"Error focusing on capital: {str(e)}") \ No newline at end of file