import streamlit as st import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objects as go from pathlib import Path import json from datetime import datetime, timedelta from typing import List, Dict, Tuple, Optional, Any, Callable, T import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed import yfinance as yf import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import os import sys import logging import traceback from dotenv import load_dotenv import re # Load environment variables load_dotenv(override=True) # Force reload of environment variables # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global settings USE_FMP_API = True # Default to using FMP API if available # FMP API configuration FMP_API_KEY = os.getenv('FMP_API_KEY') if not FMP_API_KEY: logger.warning("FMP_API_KEY not found in environment variables") logger.warning("Current environment variables: %s", dict(os.environ)) logger.warning("Current working directory: %s", os.getcwd()) logger.warning("Files in current directory: %s", os.listdir('.')) if os.path.exists('.env'): logger.warning(".env file exists") with open('.env', 'r') as f: logger.warning("Contents of .env file: %s", f.read()) else: logger.warning(".env file does not exist") else: logger.info("FMP_API_KEY loaded successfully") # Mask the API key for security in logs masked_key = FMP_API_KEY[:4] + '*' * (len(FMP_API_KEY) - 8) + FMP_API_KEY[-4:] logger.info("FMP_API_KEY (masked): %s", masked_key) FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" # High-yield ETFs reference data HIGH_YIELD_ETFS = { "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"}, "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"}, "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"}, "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"}, "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"}, "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"}, "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"}, "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"} } def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: pd.DataFrame) -> Dict[str, Any]: """ Calculate ETF metrics based on available data. Args: ticker: ETF ticker price_data: DataFrame with price history dividend_data: DataFrame with dividend history Returns: Dictionary with calculated metrics """ metrics = { "Ticker": ticker, "Yield (%)": 0.0, "Price": 0.0, "volatility": 0.0, "sharpe_ratio": 0.0, "sortino_ratio": 0.0, "correlation": 0.0, "payout_ratio": 0.0, "score": 0.0, "Risk Level": "Unknown", "missing_metrics": [] } try: # Get current price from price data if not price_data.empty: metrics["Price"] = price_data["close"].iloc[-1] else: metrics["missing_metrics"].append("Price") # Calculate yield if dividend data is available if not dividend_data.empty and metrics["Price"] > 0: # Convert date column to datetime if it's not already dividend_data["date"] = pd.to_datetime(dividend_data["date"]) # Get dividends from the last 12 months one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365) recent_dividends = dividend_data[dividend_data["date"] >= one_year_ago] if not recent_dividends.empty: # Calculate TTM dividend ttm_dividend = recent_dividends["dividend"].sum() # Calculate annual yield metrics["Yield (%)"] = (ttm_dividend / metrics["Price"]) * 100 logger.info(f"Calculated yield for {ticker}: {metrics['Yield (%)']:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${metrics['Price']:.2f})") else: logger.warning(f"No recent dividends found for {ticker}") metrics["missing_metrics"].append("Yield (%)") else: metrics["missing_metrics"].append("Yield (%)") # Calculate volatility if price data is available if len(price_data) > 1: returns = price_data["close"].pct_change().dropna() metrics["volatility"] = returns.std() * np.sqrt(252) * 100 # Annualized volatility else: metrics["missing_metrics"].append("volatility") # Calculate Sharpe ratio if we have returns and risk-free rate if len(price_data) > 1: risk_free_rate = 0.05 # Assuming 5% risk-free rate excess_returns = returns - (risk_free_rate / 252) if excess_returns.std() != 0: metrics["sharpe_ratio"] = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252) else: metrics["missing_metrics"].append("sharpe_ratio") # Calculate Sortino ratio if we have returns if len(price_data) > 1: downside_returns = returns[returns < 0] if len(downside_returns) > 0 and downside_returns.std() != 0: metrics["sortino_ratio"] = (returns.mean() / downside_returns.std()) * np.sqrt(252) else: metrics["missing_metrics"].append("sortino_ratio") # Categorize risk based on available metrics metrics["Risk Level"] = categorize_etf_risk(metrics) # Calculate overall score metrics["score"] = calculate_etf_score(metrics) logger.info(f"Calculated metrics for {ticker}: {metrics}") return metrics except Exception as e: logger.error(f"Error calculating metrics for {ticker}: {str(e)}") logger.error(traceback.format_exc()) return metrics def categorize_etf_risk(metrics: Dict[str, Any]) -> str: """ Categorize ETF risk based on available metrics. Args: metrics: Dictionary with ETF metrics Returns: Risk category: "Low", "Medium", or "High" """ try: # Initialize risk score risk_score = 0 available_metrics = 0 # Yield-based risk (higher yield = higher risk) if "Yield (%)" not in metrics["missing_metrics"]: if metrics["Yield (%)"] > 10: risk_score += 3 elif metrics["Yield (%)"] > 6: risk_score += 2 else: risk_score += 1 available_metrics += 1 # Volatility-based risk if "volatility" not in metrics["missing_metrics"]: if metrics["volatility"] > 20: risk_score += 3 elif metrics["volatility"] > 15: risk_score += 2 else: risk_score += 1 available_metrics += 1 # Sharpe ratio-based risk (lower Sharpe = higher risk) if "sharpe_ratio" not in metrics["missing_metrics"]: if metrics["sharpe_ratio"] < 0.5: risk_score += 3 elif metrics["sharpe_ratio"] < 1.0: risk_score += 2 else: risk_score += 1 available_metrics += 1 # Sortino ratio-based risk (lower Sortino = higher risk) if "sortino_ratio" not in metrics["missing_metrics"]: if metrics["sortino_ratio"] < 0.5: risk_score += 3 elif metrics["sortino_ratio"] < 1.0: risk_score += 2 else: risk_score += 1 available_metrics += 1 # Calculate average risk score if available_metrics > 0: avg_risk_score = risk_score / available_metrics if avg_risk_score > 2.5: return "High" elif avg_risk_score > 1.5: return "Medium" else: return "Low" # If no metrics available, use yield as fallback if metrics["Yield (%)"] > 10: return "High" elif metrics["Yield (%)"] > 6: return "Medium" else: return "Low" except Exception as e: logger.error(f"Error categorizing ETF risk: {str(e)}") return "Unknown" def calculate_etf_score(metrics: Dict[str, Any]) -> float: """ Calculate overall ETF score based on available metrics. Args: metrics: Dictionary with ETF metrics Returns: Overall score (0-100) """ try: score = 0 available_metrics = 0 # Yield score (0-25 points) if "Yield (%)" not in metrics["missing_metrics"]: if metrics["Yield (%)"] > 10: score += 25 elif metrics["Yield (%)"] > 6: score += 20 elif metrics["Yield (%)"] > 3: score += 15 else: score += 10 available_metrics += 1 # Volatility score (0-25 points) if "volatility" not in metrics["missing_metrics"]: if metrics["volatility"] < 10: score += 25 elif metrics["volatility"] < 15: score += 20 elif metrics["volatility"] < 20: score += 15 else: score += 10 available_metrics += 1 # Sharpe ratio score (0-25 points) if "sharpe_ratio" not in metrics["missing_metrics"]: if metrics["sharpe_ratio"] > 1.5: score += 25 elif metrics["sharpe_ratio"] > 1.0: score += 20 elif metrics["sharpe_ratio"] > 0.5: score += 15 else: score += 10 available_metrics += 1 # Sortino ratio score (0-25 points) if "sortino_ratio" not in metrics["missing_metrics"]: if metrics["sortino_ratio"] > 1.5: score += 25 elif metrics["sortino_ratio"] > 1.0: score += 20 elif metrics["sortino_ratio"] > 0.5: score += 15 else: score += 10 available_metrics += 1 # Calculate final score if available_metrics > 0: return score / available_metrics return 0 except Exception as e: logger.error(f"Error calculating ETF score: {str(e)}") return 0 def calculate_correlation_matrix(price_data_dict: Dict[str, pd.DataFrame]) -> pd.DataFrame: """ Calculate correlation matrix between ETFs. Args: price_data_dict: Dictionary of price DataFrames for each ETF Returns: DataFrame with correlation matrix """ try: # Create a DataFrame with returns for all ETFs returns_df = pd.DataFrame() for ticker, price_data in price_data_dict.items(): if len(price_data) > 1: returns = price_data["close"].pct_change().dropna() returns_df[ticker] = returns if returns_df.empty: logger.warning("No valid price data for correlation calculation") return pd.DataFrame() # Calculate correlation matrix corr_matrix = returns_df.corr() logger.info(f"Correlation matrix calculated:\n{corr_matrix}") return corr_matrix except Exception as e: logger.error(f"Error calculating correlation matrix: {str(e)}") logger.error(traceback.format_exc()) return pd.DataFrame() def calculate_etf_risk_score(etf: Dict[str, Any]) -> float: """ Calculate a comprehensive risk score for an ETF based on multiple metrics. Args: etf: Dictionary containing ETF metrics Returns: float: Risk score (0-100, higher means higher risk) """ try: score = 0 metrics_used = 0 # Primary Metrics (60% of total score) # 1. Volatility (20%) if 'volatility' in etf: volatility = etf['volatility'] if volatility < 10: score += 20 elif volatility < 15: score += 15 elif volatility < 20: score += 10 else: score += 5 metrics_used += 1 # 2. Yield (20%) if 'yield' in etf: yield_value = etf['yield'] if yield_value < 3: score += 5 elif yield_value < 6: score += 10 elif yield_value < 10: score += 15 else: score += 20 metrics_used += 1 # 3. Sharpe/Sortino Ratio (20%) if 'sharpe_ratio' in etf: sharpe = etf['sharpe_ratio'] if sharpe > 1.5: score += 5 elif sharpe > 1.0: score += 10 elif sharpe > 0.8: score += 15 else: score += 20 metrics_used += 1 # Secondary Metrics (40% of total score) # 1. Dividend Growth (10%) if 'dividend_growth' in etf: growth = etf['dividend_growth'] if growth > 10: score += 5 elif growth > 5: score += 7 elif growth > 0: score += 10 else: score += 15 metrics_used += 1 # 2. Payout Ratio (10%) if 'payout_ratio' in etf: ratio = etf['payout_ratio'] if ratio < 40: score += 5 elif ratio < 60: score += 7 elif ratio < 80: score += 10 else: score += 15 metrics_used += 1 # 3. Expense Ratio (10%) if 'expense_ratio' in etf: ratio = etf['expense_ratio'] if ratio < 0.2: score += 5 elif ratio < 0.4: score += 7 elif ratio < 0.6: score += 10 else: score += 15 metrics_used += 1 # 4. AUM/Volume (10%) if 'aum' in etf: aum = etf['aum'] if aum > 5e9: # > $5B score += 5 elif aum > 1e9: # > $1B score += 7 elif aum > 500e6: # > $500M score += 10 else: score += 15 metrics_used += 1 # Normalize score based on available metrics if metrics_used > 0: return score / metrics_used return 50 # Default middle score if no metrics available except Exception as e: logger.error(f"Error calculating ETF risk score: {str(e)}") return 50 def optimize_portfolio_allocation(etf_metrics: List[Dict[str, Any]], risk_tolerance: str, correlation_matrix: pd.DataFrame) -> List[Dict[str, Any]]: """ Optimize portfolio allocation based on risk tolerance and ETF metrics. Args: etf_metrics: List of ETF metrics dictionaries risk_tolerance: Risk tolerance level ("Conservative", "Moderate", "Aggressive") correlation_matrix: Correlation matrix between ETFs Returns: List of dictionaries with ETF tickers and their allocations """ try: logger.info(f"Optimizing portfolio allocation for {risk_tolerance} risk tolerance") logger.info(f"ETF metrics: {etf_metrics}") # Sort ETFs by yield (higher yield = higher risk) sorted_etfs = sorted(etf_metrics, key=lambda x: x.get('Yield (%)', 0), reverse=True) logger.info(f"Sorted ETFs by yield: {[etf['Ticker'] for etf in sorted_etfs]}") # Calculate base allocations based on risk tolerance num_etfs = len(sorted_etfs) if num_etfs == 0: return [] if risk_tolerance == "Conservative": # For conservative, allocate more to lower yielding ETFs # This naturally requires more capital for the same income base_allocations = [] remaining_alloc = 100 for i in range(num_etfs): if i < num_etfs - 1: # Allocate more to lower yielding ETFs alloc = remaining_alloc * 0.4 # 40% of remaining base_allocations.append(alloc) remaining_alloc -= alloc else: # Last ETF gets remaining allocation base_allocations.append(remaining_alloc) elif risk_tolerance == "Moderate": # For moderate, allocate more to middle yielding ETFs # This naturally requires medium capital for the same income base_allocations = [] remaining_alloc = 100 for i in range(num_etfs): if i < num_etfs - 1: # Allocate more to middle yielding ETFs alloc = remaining_alloc * 0.5 # 50% of remaining base_allocations.append(alloc) remaining_alloc -= alloc else: # Last ETF gets remaining allocation base_allocations.append(remaining_alloc) else: # Aggressive # For aggressive, allocate more to higher yielding ETFs # This naturally requires less capital for the same income base_allocations = [] remaining_alloc = 100 for i in range(num_etfs): if i < num_etfs - 1: # Allocate more to higher yielding ETFs alloc = remaining_alloc * 0.6 # 60% of remaining base_allocations.append(alloc) remaining_alloc -= alloc else: # Last ETF gets remaining allocation base_allocations.append(remaining_alloc) # Create final allocation list final_allocations = [] for etf, allocation in zip(sorted_etfs, base_allocations): final_allocations.append({ "ticker": etf["Ticker"], "allocation": allocation # Already in percentage }) logger.info(f"Final allocations: {final_allocations}") return final_allocations except Exception as e: logger.error(f"Error in optimize_portfolio_allocation: {str(e)}") logger.error(traceback.format_exc()) return [] def adjust_allocations_for_correlation( allocations: Dict[str, float], correlation_matrix: pd.DataFrame ) -> Dict[str, float]: """ Adjust allocations to reduce correlation between ETFs. Args: allocations: Dictionary with current allocations correlation_matrix: Correlation matrix between ETFs Returns: Dictionary with adjusted allocations """ try: adjusted_allocations = allocations.copy() # Get highly correlated pairs (correlation > 0.7) high_corr_pairs = [] for i in range(len(correlation_matrix.columns)): for j in range(i + 1, len(correlation_matrix.columns)): ticker1 = correlation_matrix.columns[i] ticker2 = correlation_matrix.columns[j] if abs(correlation_matrix.iloc[i, j]) > 0.7: high_corr_pairs.append((ticker1, ticker2)) # Adjust allocations for highly correlated pairs for ticker1, ticker2 in high_corr_pairs: if ticker1 in adjusted_allocations and ticker2 in adjusted_allocations: # Reduce allocation to the ETF with lower score if adjusted_allocations[ticker1] > adjusted_allocations[ticker2]: reduction = adjusted_allocations[ticker1] * 0.1 # Reduce by 10% adjusted_allocations[ticker1] -= reduction adjusted_allocations[ticker2] += reduction else: reduction = adjusted_allocations[ticker2] * 0.1 # Reduce by 10% adjusted_allocations[ticker2] -= reduction adjusted_allocations[ticker1] += reduction logger.info(f"Adjusted allocations for correlation: {adjusted_allocations}") return adjusted_allocations except Exception as e: logger.error(f"Error adjusting allocations for correlation: {str(e)}") logger.error(traceback.format_exc()) return allocations def get_fmp_session(): """Create a session with retry logic for FMP API calls.""" session = requests.Session() retries = Retry(total=3, backoff_factor=0.5) session.mount('https://', HTTPAdapter(max_retries=retries)) return session def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]: """ Fetch ETF data from Financial Modeling Prep API. Args: ticker: ETF ticker symbol Returns: Dictionary with ETF data or None if failed """ try: if not FMP_API_KEY: logger.warning("FMP API key not configured in environment variables") st.warning("FMP API key not found in environment variables. Some features may be limited.") return None session = get_fmp_session() # Get profile data for current price profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}" logger.info(f"Making FMP API call to {profile_url}") profile_response = session.get(profile_url) st.session_state.api_calls += 1 logger.info(f"FMP API call count: {st.session_state.api_calls}") if profile_response.status_code != 200: logger.error(f"FMP API error for {ticker}: {profile_response.status_code}") logger.error(f"Response content: {profile_response.text}") return None profile_data = profile_response.json() logger.info(f"FMP profile response for {ticker}: {profile_data}") if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0: logger.warning(f"No profile data found for {ticker} in FMP") return None profile = profile_data[0] current_price = float(profile.get('price', 0)) if current_price <= 0: logger.error(f"Invalid price for {ticker}: {current_price}") return None # Get dividend history dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}" logger.info(f"Making FMP API call to {dividend_url}") dividend_response = session.get(dividend_url) st.session_state.api_calls += 1 logger.info(f"FMP API call count: {st.session_state.api_calls}") if dividend_response.status_code != 200: logger.error(f"FMP API error for dividend data: {dividend_response.status_code}") logger.error(f"Response content: {dividend_response.text}") return None dividend_data = dividend_response.json() logger.info(f"FMP dividend response for {ticker}: {dividend_data}") if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]: logger.warning(f"No dividend history found for {ticker}") return None # Calculate TTM dividend dividends = pd.DataFrame(dividend_data["historical"]) dividends["date"] = pd.to_datetime(dividends["date"]) dividends = dividends.sort_values("date") # Get dividends in the last 12 months one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365) recent_dividends = dividends[dividends["date"] >= one_year_ago] if recent_dividends.empty: logger.warning(f"No recent dividends found for {ticker}") return None # Calculate TTM dividend ttm_dividend = recent_dividends["dividend"].sum() # Calculate yield yield_pct = (ttm_dividend / current_price) * 100 logger.info(f"Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})") # For high-yield ETFs, verify the yield is reasonable if ticker in HIGH_YIELD_ETFS: expected_yield = HIGH_YIELD_ETFS[ticker]["expected_yield"] if yield_pct < expected_yield * 0.5: # If yield is less than 50% of expected logger.error(f"Calculated yield {yield_pct:.2f}% for {ticker} is much lower than expected {expected_yield}%") logger.error(f"TTM dividend: ${ttm_dividend:.2f}") logger.error(f"Current price: ${current_price:.2f}") logger.error(f"Recent dividends:\n{recent_dividends}") # Determine distribution period if len(recent_dividends) >= 2: intervals = recent_dividends["date"].diff().dt.days.dropna() avg_interval = intervals.mean() if avg_interval <= 45: dist_period = "Monthly" elif avg_interval <= 100: dist_period = "Quarterly" elif avg_interval <= 200: dist_period = "Semi-Annually" else: dist_period = "Annually" else: dist_period = "Unknown" etf_data = { "Ticker": ticker, "Price": current_price, "Yield (%)": yield_pct, "Distribution Period": dist_period, "Risk Level": "High" if ticker in HIGH_YIELD_ETFS else "Moderate" } logger.info(f"FMP data for {ticker}: {etf_data}") return etf_data except Exception as e: logger.error(f"Error fetching FMP data for {ticker}: {str(e)}") logger.error(traceback.format_exc()) return None def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]: """ Fetch ETF data from yfinance as fallback. Args: ticker: ETF ticker symbol Returns: Dictionary with ETF data or None if failed """ try: logger.info(f"Fetching yfinance data for {ticker}") etf = yf.Ticker(ticker) info = etf.info # Get the most recent dividend yield if 'dividendYield' in info and info['dividendYield'] is not None: yield_pct = info['dividendYield'] * 100 logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%") else: # Try to calculate from dividend history hist = etf.history(period="1y") if not hist.empty and 'Dividends' in hist.columns: annual_dividend = hist['Dividends'].sum() current_price = info.get('regularMarketPrice', 0) yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0 logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%") else: yield_pct = 0 logger.warning(f"No yield data found for {ticker} in yfinance") # Get current price current_price = info.get('regularMarketPrice', 0) if current_price <= 0: current_price = info.get('regularMarketPreviousClose', 0) logger.warning(f"Using previous close price for {ticker}: {current_price}") etf_data = { "Ticker": ticker, "Price": current_price, "Yield (%)": yield_pct, "Risk Level": "High" # Default for high-yield ETFs } logger.info(f"yfinance data for {ticker}: {etf_data}") return etf_data except Exception as e: logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}") return None def fetch_etf_data(tickers: List[str]) -> pd.DataFrame: """ Fetch ETF data using FMP API with yfinance fallback. Uses HIGH_YIELD_ETFS data only as a last resort. Args: tickers: List of ETF tickers Returns: DataFrame with ETF data """ try: data = {} cache_dir = Path("cache") cache_dir.mkdir(exist_ok=True) logger.info("=== Starting ETF data fetch ===") logger.info(f"Force refresh enabled: {st.session_state.get('force_refresh_data', False)}") logger.info(f"Cache directory: {cache_dir.absolute()}") for ticker in tickers: if not ticker: # Skip empty tickers continue logger.info(f"\n=== Processing {ticker} ===") # Check cache first if not forcing refresh cache_file = cache_dir / f"{ticker}_data.json" logger.info(f"Cache file path: {cache_file.absolute()}") logger.info(f"Cache file exists: {cache_file.exists()}") if not st.session_state.get("force_refresh_data", False) and cache_file.exists(): try: with open(cache_file, 'r') as f: cached_data = json.load(f) cache_time = datetime.fromisoformat(cached_data.get('timestamp', '2000-01-01')) cache_age = datetime.now() - cache_time logger.info(f"Cache age: {cache_age.total_seconds() / 3600:.2f} hours") if cache_age < timedelta(hours=24): logger.info(f"Using cached data for {ticker}") data[ticker] = cached_data['data'] continue else: logger.info(f"Cache expired for {ticker} (age: {cache_age.total_seconds() / 3600:.2f} hours)") except Exception as e: logger.warning(f"Error reading cache for {ticker}: {str(e)}") logger.warning(traceback.format_exc()) else: logger.info(f"No cache found or force refresh enabled for {ticker}") # Try FMP first if enabled if USE_FMP_API and FMP_API_KEY: logger.info(f"Making FMP API call for {ticker}") etf_data = fetch_etf_data_fmp(ticker) if etf_data is not None: # Cache the data try: cache_data = { 'timestamp': datetime.now().isoformat(), 'data': etf_data } with open(cache_file, 'w') as f: json.dump(cache_data, f) logger.info(f"Cached FMP data for {ticker}") except Exception as e: logger.warning(f"Error caching FMP data for {ticker}: {str(e)}") logger.warning(traceback.format_exc()) data[ticker] = etf_data st.session_state.api_calls += 1 logger.info(f"Total API calls: {st.session_state.api_calls}") continue # If FMP fails, try yfinance logger.info(f"Falling back to yfinance for {ticker}") etf_data = fetch_etf_data_yfinance(ticker) if etf_data is not None: # Cache the data try: cache_data = { 'timestamp': datetime.now().isoformat(), 'data': etf_data } with open(cache_file, 'w') as f: json.dump(cache_data, f) logger.info(f"Cached yfinance data for {ticker}") except Exception as e: logger.warning(f"Error caching yfinance data for {ticker}: {str(e)}") logger.warning(traceback.format_exc()) data[ticker] = etf_data continue # Only use HIGH_YIELD_ETFS data if both FMP and yfinance failed if ticker in HIGH_YIELD_ETFS: logger.info(f"Using fallback data from HIGH_YIELD_ETFS for {ticker}") etf_data = { "Ticker": ticker, "Price": 25.0, # Default price for fallback "Yield (%)": HIGH_YIELD_ETFS[ticker]["expected_yield"], "Distribution Period": HIGH_YIELD_ETFS[ticker]["frequency"], "Risk Level": "High" } data[ticker] = etf_data else: logger.error(f"Failed to fetch data for {ticker} from all sources") if not data: st.error("No ETF data could be fetched") return pd.DataFrame() df = pd.DataFrame(data.values()) # Validate the data if df.empty: st.error("No ETF data could be fetched") return pd.DataFrame() if (df["Price"] <= 0).any(): st.error("Some ETFs have invalid prices") return pd.DataFrame() if (df["Yield (%)"] <= 0).any(): st.warning("Some ETFs have zero or negative yields") logger.info(f"Final DataFrame:\n{df}") return df except Exception as e: st.error(f"Error fetching ETF data: {str(e)}") logger.error(f"Error in fetch_etf_data: {str(e)}") logger.error(traceback.format_exc()) return pd.DataFrame() def run_portfolio_simulation( tickers: List[str], weights: List[float], initial_investment: float, start_date: str, end_date: str, rebalance_frequency: str = 'monthly', use_fmp: bool = True ) -> Dict[str, Any]: """ Run portfolio simulation with the given parameters. Args: tickers: List of ETF tickers weights: List of portfolio weights initial_investment: Initial investment amount start_date: Start date for simulation end_date: End date for simulation rebalance_frequency: Frequency of rebalancing use_fmp: Whether to use FMP API for data Returns: Dictionary with simulation results """ try: # Validate inputs if not tickers or not weights: raise ValueError("No tickers or weights provided") if len(tickers) != len(weights): raise ValueError("Number of tickers must match number of weights") if not all(0 <= w <= 1 for w in weights): raise ValueError("Weights must be between 0 and 1") if sum(weights) != 1: raise ValueError("Weights must sum to 1") # Get historical data historical_data = {} for ticker in tickers: if use_fmp and FMP_API_KEY: data = fetch_etf_data_fmp(ticker) if data and 'historical' in data: historical_data[ticker] = data['historical'] else: logger.warning(f"Falling back to yfinance for {ticker}") data = fetch_etf_data_yfinance(ticker) if data and 'historical' in data: historical_data[ticker] = data['historical'] else: data = fetch_etf_data_yfinance(ticker) if data and 'historical' in data: historical_data[ticker] = data['historical'] if not historical_data: raise ValueError("No historical data available for any tickers") # Create portfolio DataFrame portfolio = pd.DataFrame() for ticker, data in historical_data.items(): portfolio[ticker] = data['close'] # Calculate portfolio returns portfolio_returns = portfolio.pct_change() portfolio_returns = portfolio_returns.fillna(0) # Calculate weighted returns weighted_returns = pd.DataFrame() for i, ticker in enumerate(tickers): weighted_returns[ticker] = portfolio_returns[ticker] * weights[i] portfolio_returns['portfolio'] = weighted_returns.sum(axis=1) # Calculate cumulative returns cumulative_returns = (1 + portfolio_returns).cumprod() # Calculate portfolio value portfolio_value = initial_investment * cumulative_returns['portfolio'] # Calculate metrics total_return = (portfolio_value.iloc[-1] / initial_investment) - 1 annual_return = (1 + total_return) ** (252 / len(portfolio_value)) - 1 volatility = portfolio_returns['portfolio'].std() * np.sqrt(252) sharpe_ratio = annual_return / volatility if volatility != 0 else 0 # Calculate drawdown rolling_max = portfolio_value.expanding().max() drawdown = (portfolio_value - rolling_max) / rolling_max max_drawdown = drawdown.min() return { 'portfolio_value': portfolio_value, 'returns': portfolio_returns, 'cumulative_returns': cumulative_returns, 'total_return': total_return, 'annual_return': annual_return, 'volatility': volatility, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'drawdown': drawdown } except Exception as e: logger.error(f"Error in portfolio simulation: {str(e)}") st.error(f"Error running portfolio simulation: {str(e)}") return None def portfolio_summary(final_alloc: pd.DataFrame) -> None: """ Display a summary of the portfolio allocation. Args: final_alloc: DataFrame containing the portfolio allocation """ if final_alloc is None or final_alloc.empty: st.warning("No portfolio data available.") return try: # Calculate key metrics total_capital = final_alloc["Capital Allocated ($)"].sum() total_income = final_alloc["Income Contributed ($)"].sum() # Calculate weighted average yield weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100 # Display metrics in columns col1, col2, col3 = st.columns(3) with col1: st.metric("Total Capital", f"${total_capital:,.2f}") with col2: st.metric("Annual Income", f"${total_income:,.2f}") st.metric("Monthly Income", f"${total_income/12:,.2f}") with col3: st.metric("Average Yield", f"{weighted_yield:.2f}%") st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%") # Display allocation chart fig = px.pie( final_alloc, values="Allocation (%)", names="Ticker", title="Portfolio Allocation by ETF", hover_data={ "Ticker": True, "Allocation (%)": ":.2f", "Yield (%)": ":.2f", "Capital Allocated ($)": ":,.2f", "Income Contributed ($)": ":,.2f" } ) st.plotly_chart(fig, use_container_width=True) # Display detailed allocation table st.subheader("Detailed Allocation") display_df = final_alloc.copy() display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12 # Format the display st.dataframe( display_df.style.format({ "Allocation (%)": "{:.2f}%", "Yield (%)": "{:.2f}%", "Price": "${:,.2f}", "Shares": "{:,.4f}", "Capital Allocated ($)": "${:,.2f}", "Monthly Income": "${:,.2f}", "Income Contributed ($)": "${:,.2f}" }), use_container_width=True ) except Exception as e: st.error(f"Error calculating portfolio summary: {str(e)}") logger.error(f"Error in portfolio_summary: {str(e)}") logger.error(traceback.format_exc()) def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool: """ Save portfolio allocation to a JSON file. Args: portfolio_name: Name of the portfolio final_alloc: DataFrame containing portfolio allocation mode: Portfolio mode ("Income Target" or "Capital Target") target: Target value (income or capital) Returns: bool: True if save was successful, False otherwise """ try: # Create portfolios directory if it doesn't exist portfolios_dir = Path("portfolios") portfolios_dir.mkdir(exist_ok=True) # Prepare portfolio data portfolio_data = { "name": portfolio_name, "created_at": datetime.now().isoformat(), "mode": mode, "target": target, "allocations": [] } # Convert DataFrame to list of dictionaries for _, row in final_alloc.iterrows(): allocation = { "ticker": row["Ticker"], "allocation": float(row["Allocation (%)"]), "yield": float(row["Yield (%)"]), "price": float(row["Price"]), "risk_level": row["Risk Level"] } portfolio_data["allocations"].append(allocation) # Save to JSON file file_path = portfolios_dir / f"{portfolio_name}.json" with open(file_path, 'w') as f: json.dump(portfolio_data, f, indent=2) return True except Exception as e: st.error(f"Error saving portfolio: {str(e)}") return False def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]: """ Load portfolio allocation from a JSON file. Args: portfolio_name: Name of the portfolio to load Returns: Tuple containing: - DataFrame with portfolio allocation - Portfolio mode - Target value """ try: # Check if portfolio exists file_path = Path("portfolios") / f"{portfolio_name}.json" if not file_path.exists(): st.error(f"Portfolio '{portfolio_name}' not found.") return None, None, None # Load portfolio data with open(file_path, 'r') as f: portfolio_data = json.load(f) # Convert allocations to DataFrame allocations = portfolio_data["allocations"] df = pd.DataFrame(allocations) # Rename columns to match expected format df = df.rename(columns={ "allocation": "Allocation (%)", "yield": "Yield (%)", "price": "Price" }) return df, portfolio_data["mode"], portfolio_data["target"] except Exception as e: st.error(f"Error loading portfolio: {str(e)}") return None, None, None def list_saved_portfolios() -> List[str]: """ List all saved portfolios. Returns: List of portfolio names """ try: portfolios_dir = Path("portfolios") if not portfolios_dir.exists(): return [] # Get all JSON files in the portfolios directory portfolio_files = list(portfolios_dir.glob("*.json")) # Extract portfolio names from filenames portfolio_names = [f.stem for f in portfolio_files] return sorted(portfolio_names) except Exception as e: st.error(f"Error listing portfolios: {str(e)}") return [] def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame: """ Allocate portfolio for income target. Args: df: DataFrame with ETF data target: Monthly income target etf_allocations: List of ETF allocations Returns: DataFrame with final allocation """ try: # Create final allocation DataFrame final_alloc = df.copy() # Initialize allocation column if it doesn't exist if "Allocation (%)" not in final_alloc.columns: final_alloc["Allocation (%)"] = 0.0 # Set allocations for alloc in etf_allocations: mask = final_alloc["Ticker"] == alloc["ticker"] if mask.any(): final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] else: logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame") # Verify allocations are set if final_alloc["Allocation (%)"].sum() == 0: logger.error("No allocations were set") return None # Calculate required capital for income target monthly_income = target annual_income = monthly_income * 12 # Calculate weighted average yield weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100 if weighted_yield == 0: logger.error("Weighted yield is zero") return None # Calculate required capital based on weighted yield required_capital = (annual_income / weighted_yield) * 100 # Calculate capital allocation and income final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"] final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 # Verify calculations total_income = final_alloc["Income Contributed ($)"].sum() if abs(total_income - annual_income) > 1.0: # Allow for small rounding errors logger.warning(f"Total income ({total_income}) does not match target ({annual_income})") logger.info(f"Income allocation completed. Required capital: ${required_capital:,.2f}") logger.info(f"Final allocations:\n{final_alloc}") return final_alloc except Exception as e: logger.error(f"Error in income allocation: {str(e)}") logger.error(traceback.format_exc()) return None def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame: """ Allocate portfolio for capital target. Args: df: DataFrame with ETF data initial_capital: Initial capital amount etf_allocations: List of ETF allocations Returns: DataFrame with final allocation """ try: # Create final allocation DataFrame final_alloc = df.copy() # Initialize allocation column if it doesn't exist if "Allocation (%)" not in final_alloc.columns: final_alloc["Allocation (%)"] = 0.0 # Set allocations for alloc in etf_allocations: mask = final_alloc["Ticker"] == alloc["ticker"] if mask.any(): final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] else: logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame") # Verify allocations are set if final_alloc["Allocation (%)"].sum() == 0: logger.error("No allocations were set") return None # Calculate capital allocation and income final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"] final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100 # Verify calculations total_capital = final_alloc["Capital Allocated ($)"].sum() if abs(total_capital - initial_capital) > 1.0: # Allow for small rounding errors logger.warning(f"Total capital ({total_capital}) does not match initial capital ({initial_capital})") logger.info(f"Capital allocation completed. Initial capital: ${initial_capital:,.2f}") logger.info(f"Final allocations:\n{final_alloc}") return final_alloc except Exception as e: logger.error(f"Error in capital allocation: {str(e)}") logger.error(traceback.format_exc()) return None def reset_simulation(): """Reset all simulation data and state.""" st.session_state.simulation_run = False st.session_state.df_data = None st.session_state.final_alloc = None st.session_state.mode = 'Capital Target' st.session_state.target = 0 st.session_state.initial_capital = 0 st.session_state.enable_drip = False st.session_state.enable_erosion = False st.rerun() def test_fmp_connection() -> bool: """Test connection to FMP API.""" try: if not FMP_API_KEY: st.error("FMP API key not found in environment variables") return False session = get_fmp_session() test_url = f"{FMP_BASE_URL}/profile/SPY?apikey={FMP_API_KEY}" logger.info(f"Making FMP API test call to {test_url}") response = session.get(test_url) st.session_state.api_calls += 1 logger.info(f"FMP API call count: {st.session_state.api_calls}") if response.status_code == 200: st.success("Successfully connected to FMP API") return True else: st.error(f"Failed to connect to FMP API: {response.status_code}") logger.error(f"FMP API test failed: {response.text}") return False except Exception as e: st.error(f"Error testing FMP connection: {str(e)}") logger.error(f"FMP API test error: {str(e)}") return False def get_cache_stats() -> Dict[str, Any]: """ Get statistics about the cache usage. Returns: Dictionary containing cache statistics """ try: cache_dir = Path("cache") if not cache_dir.exists(): return { "ticker_count": 0, "file_count": 0, "total_size_kb": 0 } # Get all cache files cache_files = list(cache_dir.glob("**/*.json")) # Count unique tickers tickers = set() for file in cache_files: # Extract ticker from filename (assuming format: ticker_data_type.json) ticker = file.stem.split('_')[0] tickers.add(ticker) # Calculate total size total_size = sum(file.stat().st_size for file in cache_files) return { "ticker_count": len(tickers), "file_count": len(cache_files), "total_size_kb": total_size / 1024 # Convert to KB } except Exception as e: logger.error(f"Error getting cache stats: {str(e)}") return { "ticker_count": 0, "file_count": 0, "total_size_kb": 0 } def clear_cache(ticker: Optional[str] = None) -> None: """ Clear cache files for a specific ticker or all tickers. Args: ticker: Optional ticker symbol to clear cache for. If None, clears all cache. """ try: cache_dir = Path("cache") if not cache_dir.exists(): return if ticker: # Clear cache for specific ticker pattern = f"{ticker.upper()}_*.json" cache_files = list(cache_dir.glob(f"**/{pattern}")) else: # Clear all cache files cache_files = list(cache_dir.glob("**/*.json")) # Delete cache files for file in cache_files: try: file.unlink() logger.info(f"Deleted cache file: {file}") except Exception as e: logger.error(f"Error deleting cache file {file}: {str(e)}") except Exception as e: logger.error(f"Error clearing cache: {str(e)}") # Set page config st.set_page_config( page_title="ETF Portfolio Builder", page_icon="📈", layout="wide", initial_sidebar_state="expanded" ) # Initialize session state variables if 'simulation_run' not in st.session_state: st.session_state.simulation_run = False logger.info("Initialized simulation_run in session state") if 'df_data' not in st.session_state: st.session_state.df_data = None logger.info("Initialized df_data in session state") if 'final_alloc' not in st.session_state: st.session_state.final_alloc = None logger.info("Initialized final_alloc in session state") if 'mode' not in st.session_state: st.session_state.mode = 'Capital Target' logger.info("Initialized mode in session state") if 'target' not in st.session_state: st.session_state.target = 0 logger.info("Initialized target in session state") if 'initial_capital' not in st.session_state: st.session_state.initial_capital = 0 logger.info("Initialized initial_capital in session state") if 'enable_drip' not in st.session_state: st.session_state.enable_drip = False logger.info("Initialized enable_drip in session state") if 'enable_erosion' not in st.session_state: st.session_state.enable_erosion = False logger.info("Initialized enable_erosion in session state") if 'api_calls' not in st.session_state: st.session_state.api_calls = 0 logger.info("Initialized api_calls in session state") if 'force_refresh_data' not in st.session_state: st.session_state.force_refresh_data = False logger.info("Initialized force_refresh_data in session state") if 'etf_allocations' not in st.session_state: st.session_state.etf_allocations = [] logger.info("Initialized empty etf_allocations in session state") if 'risk_tolerance' not in st.session_state: st.session_state.risk_tolerance = "Moderate" logger.info("Initialized risk_tolerance in session state") # Main title st.title("📈 ETF Portfolio Builder") # Function to remove ticker def remove_ticker(ticker_to_remove: str) -> None: """Remove a ticker from the portfolio.""" try: logger.info(f"Removing ticker: {ticker_to_remove}") current_allocations = list(st.session_state.etf_allocations) st.session_state.etf_allocations = [etf for etf in current_allocations if etf["ticker"] != ticker_to_remove] logger.info(f"Updated allocations after removal: {st.session_state.etf_allocations}") st.rerun() except Exception as e: logger.error(f"Error removing ticker: {str(e)}") st.error(f"Error removing ticker: {str(e)}") # Display current tickers in the main space if st.session_state.etf_allocations: st.subheader("Selected ETFs") st.markdown(""" """, unsafe_allow_html=True) # Create a container for tickers ticker_container = st.container() with ticker_container: # Display each ticker with a close button for etf in st.session_state.etf_allocations: col1, col2 = st.columns([0.05, 0.95]) # Adjusted column ratio with col1: if st.button("×", key=f"remove_{etf['ticker']}", help=f"Remove {etf['ticker']} from portfolio"): remove_ticker(etf['ticker']) with col2: st.markdown(f"
{etf['ticker']}
", unsafe_allow_html=True) # Debug information logger.info("=== Session State Debug ===") logger.info(f"Full session state: {dict(st.session_state)}") logger.info(f"ETF allocations type: {type(st.session_state.etf_allocations)}") logger.info(f"ETF allocations content: {st.session_state.etf_allocations}") logger.info("=== End Session State Debug ===") def add_etf_to_portfolio(ticker: str) -> bool: """Add an ETF to the portfolio with proper validation and error handling.""" try: logger.info("=== Adding ETF to Portfolio ===") logger.info(f"Input ticker: {ticker}") logger.info(f"Current allocations before adding: {st.session_state.etf_allocations}") logger.info(f"Current allocations type: {type(st.session_state.etf_allocations)}") # Validate ticker format if not re.match(r'^[A-Z]{1,7}$', ticker.upper()): logger.warning(f"Invalid ticker format: {ticker}") st.error("Invalid ticker format. Must be 1-7 uppercase letters.") return False # Check if ticker already exists if any(etf["ticker"] == ticker.upper() for etf in st.session_state.etf_allocations): logger.warning(f"Ticker {ticker.upper()} already exists in portfolio") st.warning(f"{ticker.upper()} is already in your portfolio.") return False # Verify ticker exists by fetching data logger.info(f"Fetching data for ticker: {ticker.upper()}") etf_data = fetch_etf_data([ticker.upper()]) logger.info(f"Fetched ETF data: {etf_data}") if etf_data is None or etf_data.empty: logger.warning(f"Unknown ticker: {ticker.upper()}") st.error(f"Unknown ticker: {ticker.upper()}. Please enter a valid ETF ticker.") return False # Create new ETF entry new_etf = { "ticker": ticker.upper(), "allocation": 0.0 } logger.info(f"Created new ETF entry: {new_etf}") # Update session state current_allocations = list(st.session_state.etf_allocations) current_allocations.append(new_etf) st.session_state.etf_allocations = current_allocations logger.info(f"Updated session state allocations: {st.session_state.etf_allocations}") logger.info(f"Updated allocations type: {type(st.session_state.etf_allocations)}") # Recalculate allocations based on risk tolerance if len(st.session_state.etf_allocations) > 0: risk_tolerance = st.session_state.risk_tolerance tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] logger.info(f"Recalculating allocations for tickers: {tickers}") df_data = fetch_etf_data(tickers) logger.info(f"Fetched data for recalculation: {df_data}") if df_data is not None and not df_data.empty: etf_metrics = df_data.to_dict('records') new_allocations = optimize_portfolio_allocation( etf_metrics, risk_tolerance, pd.DataFrame() ) logger.info(f"Calculated new allocations: {new_allocations}") st.session_state.etf_allocations = new_allocations logger.info(f"Updated session state with new allocations: {st.session_state.etf_allocations}") logger.info("=== End Adding ETF to Portfolio ===") return True except Exception as e: logger.error("=== Error Adding ETF to Portfolio ===") logger.error(f"Error: {str(e)}") logger.error(traceback.format_exc()) st.error(f"Error adding ETF: {str(e)}") return False def remove_etf_from_portfolio(index: int) -> bool: """Remove an ETF from the portfolio with proper validation and error handling.""" try: logger.info(f"Attempting to remove ETF at index: {index}") logger.info(f"Current allocations before removal: {st.session_state.etf_allocations}") if not st.session_state.etf_allocations or index >= len(st.session_state.etf_allocations): logger.warning(f"Invalid ETF index for removal: {index}") return False # Create new list without the removed ETF current_allocations = list(st.session_state.etf_allocations) removed_etf = current_allocations.pop(index) st.session_state.etf_allocations = current_allocations logger.info(f"Successfully removed ETF: {removed_etf}") logger.info(f"Updated allocations: {st.session_state.etf_allocations}") # Recalculate allocations if there are remaining ETFs if st.session_state.etf_allocations: risk_tolerance = st.session_state.risk_tolerance tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] df_data = fetch_etf_data(tickers) if df_data is not None and not df_data.empty: etf_metrics = df_data.to_dict('records') new_allocations = optimize_portfolio_allocation( etf_metrics, risk_tolerance, pd.DataFrame() ) st.session_state.etf_allocations = new_allocations logger.info(f"Recalculated allocations: {new_allocations}") return True except Exception as e: logger.error(f"Error removing ETF: {str(e)}") logger.error(traceback.format_exc()) st.error(f"Error removing ETF: {str(e)}") return False # Sidebar for ETF input with st.sidebar: st.header("ETF Allocation") # Create a container for ETF input with st.container(): # Input field for ETF ticker only new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., SCHD)") # Add button to add ETF add_etf_button = st.button("ADD ETF", use_container_width=True) if add_etf_button: logger.info("=== Add ETF Button Clicked ===") logger.info(f"Input ticker: {new_ticker}") logger.info(f"Current allocations: {st.session_state.etf_allocations}") if not new_ticker: st.error("Please enter an ETF ticker.") logger.warning("No ticker provided") elif len(st.session_state.etf_allocations) >= 10: st.error("Maximum of 10 ETFs allowed in portfolio.") logger.warning("Maximum ETF limit reached") else: if add_etf_to_portfolio(new_ticker): st.success(f"Added {new_ticker.upper()} to portfolio.") logger.info("Successfully added ETF, triggering rerun") st.rerun() # Display total allocation if st.session_state.etf_allocations: current_total = sum(etf["allocation"] for etf in st.session_state.etf_allocations) st.metric("Total Allocation (%)", f"{current_total:.2f}") # Add a warning if total is not 100% if abs(current_total - 100) > 0.1: st.warning("Total allocation should be 100%") else: st.info("No ETFs added yet. Please add ETFs to your portfolio.") logger.info("No ETFs in portfolio") # Mode selection simulation_mode = st.radio( "Select Simulation Mode", ["Capital Target", "Income Target"] ) if simulation_mode == "Income Target": monthly_target = st.number_input( "Monthly Income Target ($)", min_value=100.0, max_value=100000.0, value=1000.0, step=100.0 ) ANNUAL_TARGET = monthly_target * 12 else: initial_capital = st.number_input( "Initial Capital ($)", min_value=1000.0, max_value=1000000.0, value=100000.0, step=1000.0 ) # Risk tolerance risk_tolerance = st.select_slider( "Risk Tolerance", options=["Conservative", "Moderate", "Aggressive"], value=st.session_state.get("risk_tolerance", "Moderate"), key="risk_tolerance_slider" ) # Check if risk tolerance changed if risk_tolerance != st.session_state.get("risk_tolerance"): logger.info("=== Risk Tolerance Change Detection ===") logger.info(f"Current risk tolerance in session state: {st.session_state.get('risk_tolerance')}") logger.info(f"New risk tolerance from slider: {risk_tolerance}") # Update session state st.session_state.risk_tolerance = risk_tolerance # Recalculate allocations if we have ETFs if st.session_state.etf_allocations: logger.info("Recalculating allocations due to risk tolerance change") tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] df_data = fetch_etf_data(tickers) if df_data is not None and not df_data.empty: etf_metrics = df_data.to_dict('records') new_allocations = optimize_portfolio_allocation( etf_metrics, risk_tolerance, pd.DataFrame() ) st.session_state.etf_allocations = new_allocations logger.info(f"New allocations after risk tolerance change: {new_allocations}") # If simulation has been run, update final allocation if st.session_state.simulation_run and st.session_state.df_data is not None: if st.session_state.mode == "Income Target": final_alloc = allocate_for_income( st.session_state.df_data, st.session_state.target, new_allocations ) else: final_alloc = allocate_for_capital( st.session_state.df_data, st.session_state.initial_capital, new_allocations ) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.rerun() # Additional options st.subheader("Additional Options") # DRIP option enable_drip = st.radio( "Enable Dividend Reinvestment (DRIP)", ["Yes", "No"], index=1 ) # Erosion options enable_erosion = st.radio( "Enable NAV & Yield Erosion", ["Yes", "No"], index=1 ) # Run simulation button if st.button("Run Portfolio Simulation", type="primary", use_container_width=True): if not st.session_state.etf_allocations: st.error("Please add at least one ETF to your portfolio.") else: try: # Store parameters in session state st.session_state.mode = simulation_mode st.session_state.enable_drip = enable_drip == "Yes" st.session_state.enable_erosion = enable_erosion == "Yes" if simulation_mode == "Income Target": st.session_state.target = monthly_target else: st.session_state.target = initial_capital st.session_state.initial_capital = initial_capital # Run simulation logger.info("Starting portfolio simulation...") logger.info(f"ETF allocations: {st.session_state.etf_allocations}") tickers = [etf["ticker"] for etf in st.session_state.etf_allocations] df_data = fetch_etf_data(tickers) logger.info(f"Fetched ETF data:\n{df_data}") if df_data is not None and not df_data.empty: if simulation_mode == "Income Target": logger.info(f"Allocating for income target: ${monthly_target}") final_alloc = allocate_for_income(df_data, monthly_target, st.session_state.etf_allocations) else: logger.info(f"Allocating for capital target: ${initial_capital}") final_alloc = allocate_for_capital(df_data, initial_capital, st.session_state.etf_allocations) logger.info(f"Final allocation result:\n{final_alloc}") if final_alloc is not None and not final_alloc.empty: st.session_state.simulation_run = True st.session_state.df_data = df_data st.session_state.final_alloc = final_alloc st.success("Portfolio simulation completed!") st.rerun() else: st.error("Failed to generate portfolio allocation. Please check your inputs and try again.") else: st.error("Failed to fetch ETF data. Please check your tickers and try again.") except Exception as e: st.error(f"Error running simulation: {str(e)}") logger.error(f"Error in simulation: {str(e)}") logger.error(traceback.format_exc()) # Add reset simulation button at the bottom of sidebar if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"): reset_simulation() # Add FMP connection status to the navigation bar st.sidebar.markdown("---") st.sidebar.subheader("FMP API Status") connection_status = test_fmp_connection() if connection_status: st.sidebar.success("✅ FMP API: Connected") else: st.sidebar.error("❌ FMP API: Connection failed") # Advanced Options section in sidebar with st.sidebar.expander("Advanced Options"): # Option to toggle FMP API usage use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API, help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs") if use_fmp_api != USE_FMP_API: # Update global setting if changed globals()["USE_FMP_API"] = use_fmp_api st.success("FMP API usage setting updated") # Add cache controls st.subheader("Cache Settings") # Display cache statistics cache_stats = get_cache_stats() st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)") # Force refresh option st.session_state.force_refresh_data = st.checkbox( "Force refresh data (ignore cache)", value=st.session_state.get("force_refresh_data", False), help="When enabled, always fetch fresh data from APIs" ) # Cache clearing options col1, col2 = st.columns(2) with col1: if st.button("Clear All Cache"): clear_cache() st.success("All cache files cleared!") st.session_state.api_calls = 0 with col2: ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker") if st.button("Clear") and ticker_to_clear: clear_cache(ticker_to_clear) st.success(f"Cache cleared for {ticker_to_clear.upper()}") # Show API call counter st.write(f"API calls this session: {st.session_state.api_calls}") # Add option for debug mode and parallel processing debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.") parallel_processing = st.checkbox("Enable Parallel Processing", value=True, help="Fetch data for multiple ETFs simultaneously") # Display results and interactive allocation adjustment UI after simulation is run if st.session_state.simulation_run and st.session_state.df_data is not None: df = st.session_state.df_data final_alloc = st.session_state.final_alloc if hasattr(st.session_state, 'final_alloc') else None # Validate final_alloc DataFrame if final_alloc is None or final_alloc.empty: st.error("No portfolio data available. Please run the simulation again.") st.session_state.simulation_run = False else: # Verify required columns exist required_columns = ["Capital Allocated ($)", "Yield (%)", "Price", "Ticker"] missing_columns = [col for col in required_columns if col not in final_alloc.columns] if missing_columns: st.error(f"Missing required columns in portfolio data: {', '.join(missing_columns)}") st.session_state.simulation_run = False else: # Create tabs for better organization tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"]) with tab1: st.subheader("💰 Portfolio Summary") portfolio_summary(final_alloc) # Display mode-specific information if st.session_state.mode == "Income Target": try: monthly_target = st.session_state.target ANNUAL_TARGET = monthly_target * 12 total_capital = final_alloc["Capital Allocated ($)"].sum() st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).") except Exception as e: st.error(f"Error displaying income target information: {str(e)}") else: try: initial_capital = st.session_state.initial_capital annual_income = final_alloc["Income Contributed ($)"].sum() monthly_income = annual_income / 12 st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).") except Exception as e: st.error(f"Error displaying capital investment information: {str(e)}") # Add save/load section st.subheader("💾 Save/Load Portfolio") # Create two columns for save and load save_col, load_col = st.columns(2) with save_col: st.write("Save current portfolio") portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name") if st.button("Save Portfolio", key="save_portfolio"): if portfolio_name: if save_portfolio(portfolio_name, final_alloc, st.session_state.mode, st.session_state.target): st.success(f"Portfolio '{portfolio_name}' saved successfully!") else: st.warning("Please enter a portfolio name.") with load_col: st.write("Load saved portfolio") if st.button("Show Saved Portfolios", key="show_portfolios"): saved_portfolios = list_saved_portfolios() if saved_portfolios: selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio") if st.button("Load Portfolio", key="load_portfolio_btn"): loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio) if loaded_df is not None: st.session_state.final_alloc = loaded_df st.session_state.mode = loaded_mode st.session_state.target = loaded_target st.success(f"Portfolio '{selected_portfolio}' loaded successfully!") st.rerun() else: st.info("No saved portfolios found.") # Display full detailed allocation table st.subheader("📊 Capital Allocation Details") try: # Format currencies for better readability display_df = final_alloc.copy() # Calculate shares for each ETF display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"] display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}") display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}") display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}") display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%") display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}") # Create a form for the allocation table with st.form("allocation_form"): # Create an editable DataFrame edited_df = st.data_editor( display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]], column_config={ "Ticker": st.column_config.TextColumn("Ticker", disabled=True), "Allocation (%)": st.column_config.NumberColumn( "Allocation (%)", min_value=0.0, max_value=100.0, step=0.1, format="%.1f", required=True ), "Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True), "Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True), "Risk Level": st.column_config.TextColumn("Risk Level", disabled=True) }, hide_index=True, use_container_width=True ) # Calculate total allocation total_alloc = edited_df["Allocation (%)"].sum() # Validate individual allocations invalid_allocations = edited_df[ (edited_df["Allocation (%)"] <= 0) | (edited_df["Allocation (%)"] > 100) ] if not invalid_allocations.empty: for _, row in invalid_allocations.iterrows(): st.error(f"Invalid allocation for {row['Ticker']}: must be between 0% and 100%") # Display total allocation with color coding if abs(total_alloc - 100) <= 0.1: st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None) else: st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=f"{total_alloc - 100:.2f}", delta_color="off") st.error("Total allocation must be exactly 100%") # Create columns for quick actions col1, col2, col3 = st.columns(3) with col1: equal_weight = st.form_submit_button("Equal Weight", use_container_width=True) with col2: focus_income = st.form_submit_button("Focus on Income", use_container_width=True) with col3: focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True) # Submit button for manual edits submitted = st.form_submit_button("Update Allocations", disabled=abs(total_alloc - 100) > 0.1 or not invalid_allocations.empty, type="primary", use_container_width=True) # Handle form submission if submitted: try: # Convert the edited allocations to a dictionary new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()} # Convert to the format expected by allocation functions etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()] # Get the mode and target from session state mode = st.session_state.mode target = st.session_state.target initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio updated with new allocations!") st.rerun() else: st.error("Failed to update portfolio. Please try again.") except Exception as e: st.error(f"Error updating allocations: {str(e)}") # Handle quick actions if equal_weight: try: # Calculate equal weight allocation num_etfs = len(edited_df) equal_allocation = 100 / num_etfs # Create new allocations in the format expected by allocation functions etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state mode = st.session_state.mode target = st.session_state.target initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to equal weight!") st.rerun() except Exception as e: st.error(f"Error applying equal weight: {str(e)}") elif focus_income: try: # Sort by yield and adjust allocations sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False) total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum() # Calculate new allocations based on yield etf_allocations = [] for _, row in sorted_alloc.iterrows(): yield_val = float(row["Yield (%)"].rstrip('%')) allocation = (yield_val / total_yield) * 100 etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation}) # Get the mode and target from session state mode = st.session_state.mode target = st.session_state.target initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to focus on income!") st.rerun() except Exception as e: st.error(f"Error focusing on income: {str(e)}") elif focus_capital: try: # Calculate equal weight allocation (same as equal weight) num_etfs = len(edited_df) equal_allocation = 100 / num_etfs # Create new allocations in the format expected by allocation functions etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] # Get the mode and target from session state mode = st.session_state.mode target = st.session_state.target initial_capital = st.session_state.initial_capital # Use the same allocation functions as the main navigation if mode == "Income Target": final_alloc = allocate_for_income(df, target, etf_allocations) else: # Capital Target final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) if final_alloc is not None: st.session_state.final_alloc = final_alloc st.success("Portfolio adjusted to focus on capital!") st.rerun() except Exception as e: st.error(f"Error focusing on capital: {str(e)}") except Exception as e: st.error(f"Error displaying allocation details: {str(e)}") logger.error(f"Error in allocation display: {str(e)}") logger.error(traceback.format_exc())