ETF_Suite_Portal/pages/ETF_Portfolio_Builder.py

2264 lines
94 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from pathlib import Path
import json
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Optional, Any, Callable, T
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import yfinance as yf
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
import sys
import logging
import traceback
from dotenv import load_dotenv
import re
# Load environment variables
load_dotenv(override=True) # Force reload of environment variables
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global settings
USE_FMP_API = True # Default to using FMP API if available
# FMP API configuration
FMP_API_KEY = os.getenv('FMP_API_KEY')
if not FMP_API_KEY:
logger.warning("FMP_API_KEY not found in environment variables")
logger.warning("Current environment variables: %s", dict(os.environ))
logger.warning("Current working directory: %s", os.getcwd())
logger.warning("Files in current directory: %s", os.listdir('.'))
if os.path.exists('.env'):
logger.warning(".env file exists")
with open('.env', 'r') as f:
logger.warning("Contents of .env file: %s", f.read())
else:
logger.warning(".env file does not exist")
else:
logger.info("FMP_API_KEY loaded successfully")
# Mask the API key for security in logs
masked_key = FMP_API_KEY[:4] + '*' * (len(FMP_API_KEY) - 8) + FMP_API_KEY[-4:]
logger.info("FMP_API_KEY (masked): %s", masked_key)
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
def test_fmp_data_fetching():
"""Test FMP API data fetching with detailed logging."""
try:
logger.info("=== Starting FMP API Test ===")
logger.info(f"FMP API Key available: {bool(FMP_API_KEY)}")
logger.info(f"FMP API enabled: {USE_FMP_API}")
# Test a few high-yield ETFs
test_tickers = ["JEPI", "JEPQ", "QYLD"]
for ticker in test_tickers:
logger.info(f"\nTesting {ticker}:")
data = fetch_etf_data_fmp(ticker)
if data:
logger.info(f"Successfully fetched data for {ticker}")
logger.info(f"Data source: {data.get('data_source', 'Not specified')}")
logger.info(f"Raw data: {data.get('raw_data', 'No raw data')}")
else:
logger.error(f"Failed to fetch data for {ticker}")
logger.info("=== FMP API Test Complete ===")
except Exception as e:
logger.error(f"Error in FMP API test: {str(e)}")
logger.error(traceback.format_exc())
# High-yield ETFs reference data
HIGH_YIELD_ETFS = {
"MSTY": {"expected_yield": 125.0, "frequency": "Monthly"},
"SMCY": {"expected_yield": 100.0, "frequency": "Monthly"},
"TSLY": {"expected_yield": 85.0, "frequency": "Monthly"},
"NVDY": {"expected_yield": 75.0, "frequency": "Monthly"},
"ULTY": {"expected_yield": 70.0, "frequency": "Monthly"},
"JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"},
"JEPI": {"expected_yield": 7.8, "frequency": "Monthly"},
"XYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
"QYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
"RYLD": {"expected_yield": 12.0, "frequency": "Monthly"}
}
def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate ETF metrics based on available data.
Args:
ticker: ETF ticker
price_data: DataFrame with price history
dividend_data: DataFrame with dividend history
Returns:
Dictionary with calculated metrics
"""
metrics = {
"Ticker": ticker,
"Yield (%)": 0.0,
"Price": 0.0,
"volatility": 0.0,
"sharpe_ratio": 0.0,
"sortino_ratio": 0.0,
"correlation": 0.0,
"payout_ratio": 0.0,
"score": 0.0,
"Risk Level": "Unknown",
"missing_metrics": []
}
try:
# Get current price from price data
if not price_data.empty:
metrics["Price"] = price_data["close"].iloc[-1]
else:
metrics["missing_metrics"].append("Price")
# Calculate yield if dividend data is available
if not dividend_data.empty and metrics["Price"] > 0:
# Convert date column to datetime if it's not already
dividend_data["date"] = pd.to_datetime(dividend_data["date"])
# Get dividends from the last 12 months
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
recent_dividends = dividend_data[dividend_data["date"] >= one_year_ago]
if not recent_dividends.empty:
# Calculate TTM dividend
ttm_dividend = recent_dividends["dividend"].sum()
# Calculate annual yield
metrics["Yield (%)"] = (ttm_dividend / metrics["Price"]) * 100
logger.info(f"Calculated yield for {ticker}: {metrics['Yield (%)']:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${metrics['Price']:.2f})")
else:
logger.warning(f"No recent dividends found for {ticker}")
metrics["missing_metrics"].append("Yield (%)")
else:
metrics["missing_metrics"].append("Yield (%)")
# Calculate volatility if price data is available
if len(price_data) > 1:
returns = price_data["close"].pct_change().dropna()
metrics["volatility"] = returns.std() * np.sqrt(252) * 100 # Annualized volatility
else:
metrics["missing_metrics"].append("volatility")
# Calculate Sharpe ratio if we have returns and risk-free rate
if len(price_data) > 1:
risk_free_rate = 0.05 # Assuming 5% risk-free rate
excess_returns = returns - (risk_free_rate / 252)
if excess_returns.std() != 0:
metrics["sharpe_ratio"] = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252)
else:
metrics["missing_metrics"].append("sharpe_ratio")
# Calculate Sortino ratio if we have returns
if len(price_data) > 1:
downside_returns = returns[returns < 0]
if len(downside_returns) > 0 and downside_returns.std() != 0:
metrics["sortino_ratio"] = (returns.mean() / downside_returns.std()) * np.sqrt(252)
else:
metrics["missing_metrics"].append("sortino_ratio")
# Categorize risk based on available metrics
metrics["Risk Level"] = categorize_etf_risk(metrics)
# Calculate overall score
metrics["score"] = calculate_etf_score(metrics)
logger.info(f"Calculated metrics for {ticker}: {metrics}")
return metrics
except Exception as e:
logger.error(f"Error calculating metrics for {ticker}: {str(e)}")
logger.error(traceback.format_exc())
return metrics
def categorize_etf_risk(metrics: Dict[str, Any]) -> str:
"""
Categorize ETF risk based on available metrics.
Args:
metrics: Dictionary with ETF metrics
Returns:
Risk category: "Low", "Medium", or "High"
"""
try:
# Initialize risk score
risk_score = 0
available_metrics = 0
# Yield-based risk (higher yield = higher risk)
if "Yield (%)" not in metrics["missing_metrics"]:
if metrics["Yield (%)"] > 10:
risk_score += 3
elif metrics["Yield (%)"] > 6:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Volatility-based risk
if "volatility" not in metrics["missing_metrics"]:
if metrics["volatility"] > 20:
risk_score += 3
elif metrics["volatility"] > 15:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Sharpe ratio-based risk (lower Sharpe = higher risk)
if "sharpe_ratio" not in metrics["missing_metrics"]:
if metrics["sharpe_ratio"] < 0.5:
risk_score += 3
elif metrics["sharpe_ratio"] < 1.0:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Sortino ratio-based risk (lower Sortino = higher risk)
if "sortino_ratio" not in metrics["missing_metrics"]:
if metrics["sortino_ratio"] < 0.5:
risk_score += 3
elif metrics["sortino_ratio"] < 1.0:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Calculate average risk score
if available_metrics > 0:
avg_risk_score = risk_score / available_metrics
if avg_risk_score > 2.5:
return "High"
elif avg_risk_score > 1.5:
return "Medium"
else:
return "Low"
# If no metrics available, use yield as fallback
if metrics["Yield (%)"] > 10:
return "High"
elif metrics["Yield (%)"] > 6:
return "Medium"
else:
return "Low"
except Exception as e:
logger.error(f"Error categorizing ETF risk: {str(e)}")
return "Unknown"
def calculate_etf_score(metrics: Dict[str, Any]) -> float:
"""
Calculate overall ETF score based on available metrics.
Args:
metrics: Dictionary with ETF metrics
Returns:
Overall score (0-100)
"""
try:
score = 0
available_metrics = 0
# Yield score (0-25 points)
if "Yield (%)" not in metrics["missing_metrics"]:
if metrics["Yield (%)"] > 10:
score += 25
elif metrics["Yield (%)"] > 6:
score += 20
elif metrics["Yield (%)"] > 3:
score += 15
else:
score += 10
available_metrics += 1
# Volatility score (0-25 points)
if "volatility" not in metrics["missing_metrics"]:
if metrics["volatility"] < 10:
score += 25
elif metrics["volatility"] < 15:
score += 20
elif metrics["volatility"] < 20:
score += 15
else:
score += 10
available_metrics += 1
# Sharpe ratio score (0-25 points)
if "sharpe_ratio" not in metrics["missing_metrics"]:
if metrics["sharpe_ratio"] > 1.5:
score += 25
elif metrics["sharpe_ratio"] > 1.0:
score += 20
elif metrics["sharpe_ratio"] > 0.5:
score += 15
else:
score += 10
available_metrics += 1
# Sortino ratio score (0-25 points)
if "sortino_ratio" not in metrics["missing_metrics"]:
if metrics["sortino_ratio"] > 1.5:
score += 25
elif metrics["sortino_ratio"] > 1.0:
score += 20
elif metrics["sortino_ratio"] > 0.5:
score += 15
else:
score += 10
available_metrics += 1
# Calculate final score
if available_metrics > 0:
return score / available_metrics
return 0
except Exception as e:
logger.error(f"Error calculating ETF score: {str(e)}")
return 0
def calculate_correlation_matrix(price_data_dict: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Calculate correlation matrix between ETFs.
Args:
price_data_dict: Dictionary of price DataFrames for each ETF
Returns:
DataFrame with correlation matrix
"""
try:
# Create a DataFrame with returns for all ETFs
returns_df = pd.DataFrame()
for ticker, price_data in price_data_dict.items():
if len(price_data) > 1:
returns = price_data["close"].pct_change().dropna()
returns_df[ticker] = returns
if returns_df.empty:
logger.warning("No valid price data for correlation calculation")
return pd.DataFrame()
# Calculate correlation matrix
corr_matrix = returns_df.corr()
logger.info(f"Correlation matrix calculated:\n{corr_matrix}")
return corr_matrix
except Exception as e:
logger.error(f"Error calculating correlation matrix: {str(e)}")
logger.error(traceback.format_exc())
return pd.DataFrame()
def calculate_etf_risk_score(etf: Dict[str, Any]) -> float:
"""
Calculate a comprehensive risk score for an ETF based on multiple metrics.
Args:
etf: Dictionary containing ETF metrics
Returns:
float: Risk score (0-100, higher means higher risk)
"""
try:
score = 0
metrics_used = 0
# Primary Metrics (60% of total score)
# 1. Volatility (20%)
if 'volatility' in etf:
volatility = etf['volatility']
if volatility < 10:
score += 20
elif volatility < 15:
score += 15
elif volatility < 20:
score += 10
else:
score += 5
metrics_used += 1
# 2. Yield (20%)
if 'yield' in etf:
yield_value = etf['yield']
if yield_value < 3:
score += 5
elif yield_value < 6:
score += 10
elif yield_value < 10:
score += 15
else:
score += 20
metrics_used += 1
# 3. Sharpe/Sortino Ratio (20%)
if 'sharpe_ratio' in etf:
sharpe = etf['sharpe_ratio']
if sharpe > 1.5:
score += 5
elif sharpe > 1.0:
score += 10
elif sharpe > 0.8:
score += 15
else:
score += 20
metrics_used += 1
# Secondary Metrics (40% of total score)
# 1. Dividend Growth (10%)
if 'dividend_growth' in etf:
growth = etf['dividend_growth']
if growth > 10:
score += 5
elif growth > 5:
score += 7
elif growth > 0:
score += 10
else:
score += 15
metrics_used += 1
# 2. Payout Ratio (10%)
if 'payout_ratio' in etf:
ratio = etf['payout_ratio']
if ratio < 40:
score += 5
elif ratio < 60:
score += 7
elif ratio < 80:
score += 10
else:
score += 15
metrics_used += 1
# 3. Expense Ratio (10%)
if 'expense_ratio' in etf:
ratio = etf['expense_ratio']
if ratio < 0.2:
score += 5
elif ratio < 0.4:
score += 7
elif ratio < 0.6:
score += 10
else:
score += 15
metrics_used += 1
# 4. AUM/Volume (10%)
if 'aum' in etf:
aum = etf['aum']
if aum > 5e9: # > $5B
score += 5
elif aum > 1e9: # > $1B
score += 7
elif aum > 500e6: # > $500M
score += 10
else:
score += 15
metrics_used += 1
# Normalize score based on available metrics
if metrics_used > 0:
return score / metrics_used
return 50 # Default middle score if no metrics available
except Exception as e:
logger.error(f"Error calculating ETF risk score: {str(e)}")
return 50
def optimize_portfolio_allocation(etf_metrics: List[Dict[str, Any]], risk_tolerance: str, correlation_matrix: pd.DataFrame) -> List[Dict[str, Any]]:
"""
Optimize portfolio allocation based on risk tolerance and ETF metrics.
Args:
etf_metrics: List of ETF metrics dictionaries
risk_tolerance: Risk tolerance level ("Conservative", "Moderate", "Aggressive")
correlation_matrix: Correlation matrix between ETFs
Returns:
List of dictionaries with ETF tickers and their allocations
"""
try:
logger.info(f"Optimizing portfolio allocation for {risk_tolerance} risk tolerance")
logger.info(f"ETF metrics: {etf_metrics}")
# Sort ETFs by yield (higher yield = higher risk)
sorted_etfs = sorted(etf_metrics, key=lambda x: x.get('Yield (%)', 0), reverse=True)
logger.info(f"Sorted ETFs by yield: {[etf['Ticker'] for etf in sorted_etfs]}")
# Calculate base allocations based on risk tolerance
num_etfs = len(sorted_etfs)
if num_etfs == 0:
return []
if risk_tolerance == "Conservative":
# For conservative, allocate more to lower yielding ETFs
# This naturally requires more capital for the same income
base_allocations = []
remaining_alloc = 100
for i in range(num_etfs):
if i < num_etfs - 1:
# Allocate more to lower yielding ETFs
alloc = remaining_alloc * 0.4 # 40% of remaining
base_allocations.append(alloc)
remaining_alloc -= alloc
else:
# Last ETF gets remaining allocation
base_allocations.append(remaining_alloc)
elif risk_tolerance == "Moderate":
# For moderate, allocate more to middle yielding ETFs
# This naturally requires medium capital for the same income
base_allocations = []
remaining_alloc = 100
for i in range(num_etfs):
if i < num_etfs - 1:
# Allocate more to middle yielding ETFs
alloc = remaining_alloc * 0.5 # 50% of remaining
base_allocations.append(alloc)
remaining_alloc -= alloc
else:
# Last ETF gets remaining allocation
base_allocations.append(remaining_alloc)
else: # Aggressive
# For aggressive, allocate more to higher yielding ETFs
# This naturally requires less capital for the same income
base_allocations = []
remaining_alloc = 100
for i in range(num_etfs):
if i < num_etfs - 1:
# Allocate more to higher yielding ETFs
alloc = remaining_alloc * 0.6 # 60% of remaining
base_allocations.append(alloc)
remaining_alloc -= alloc
else:
# Last ETF gets remaining allocation
base_allocations.append(remaining_alloc)
# Create final allocation list
final_allocations = []
for etf, allocation in zip(sorted_etfs, base_allocations):
final_allocations.append({
"ticker": etf["Ticker"],
"allocation": allocation # Already in percentage
})
logger.info(f"Final allocations: {final_allocations}")
return final_allocations
except Exception as e:
logger.error(f"Error in optimize_portfolio_allocation: {str(e)}")
logger.error(traceback.format_exc())
return []
def adjust_allocations_for_correlation(
allocations: Dict[str, float],
correlation_matrix: pd.DataFrame
) -> Dict[str, float]:
"""
Adjust allocations to reduce correlation between ETFs.
Args:
allocations: Dictionary with current allocations
correlation_matrix: Correlation matrix between ETFs
Returns:
Dictionary with adjusted allocations
"""
try:
adjusted_allocations = allocations.copy()
# Get highly correlated pairs (correlation > 0.7)
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
for j in range(i + 1, len(correlation_matrix.columns)):
ticker1 = correlation_matrix.columns[i]
ticker2 = correlation_matrix.columns[j]
if abs(correlation_matrix.iloc[i, j]) > 0.7:
high_corr_pairs.append((ticker1, ticker2))
# Adjust allocations for highly correlated pairs
for ticker1, ticker2 in high_corr_pairs:
if ticker1 in adjusted_allocations and ticker2 in adjusted_allocations:
# Reduce allocation to the ETF with lower score
if adjusted_allocations[ticker1] > adjusted_allocations[ticker2]:
reduction = adjusted_allocations[ticker1] * 0.1 # Reduce by 10%
adjusted_allocations[ticker1] -= reduction
adjusted_allocations[ticker2] += reduction
else:
reduction = adjusted_allocations[ticker2] * 0.1 # Reduce by 10%
adjusted_allocations[ticker2] -= reduction
adjusted_allocations[ticker1] += reduction
logger.info(f"Adjusted allocations for correlation: {adjusted_allocations}")
return adjusted_allocations
except Exception as e:
logger.error(f"Error adjusting allocations for correlation: {str(e)}")
logger.error(traceback.format_exc())
return allocations
def get_fmp_session():
"""Create a session with retry logic for FMP API calls."""
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5)
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]:
"""
Fetch ETF data from Financial Modeling Prep API.
Args:
ticker: ETF ticker symbol
Returns:
Dictionary with ETF data or None if failed
"""
try:
if not FMP_API_KEY:
logger.warning("FMP API key not configured in environment variables")
st.warning("FMP API key not found in environment variables. Some features may be limited.")
return None
session = get_fmp_session()
# Get profile data for current price
profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}"
logger.info(f"[FMP API] Making profile request to: {profile_url}")
profile_response = session.get(profile_url)
st.session_state.api_calls += 1
logger.info(f"[FMP API] Profile response status: {profile_response.status_code}")
logger.info(f"[FMP API] Profile response content: {profile_response.text[:500]}...") # Log first 500 chars
if profile_response.status_code != 200:
logger.error(f"[FMP API] Error for {ticker}: {profile_response.status_code}")
logger.error(f"[FMP API] Response content: {profile_response.text}")
return None
profile_data = profile_response.json()
logger.info(f"[FMP API] Profile data for {ticker}: {profile_data}")
if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0:
logger.warning(f"[FMP API] No profile data found for {ticker}")
return None
profile = profile_data[0]
current_price = float(profile.get('price', 0))
if current_price <= 0:
logger.error(f"[FMP API] Invalid price for {ticker}: {current_price}")
return None
# Get dividend history
dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}"
logger.info(f"[FMP API] Making dividend request to: {dividend_url}")
dividend_response = session.get(dividend_url)
st.session_state.api_calls += 1
logger.info(f"[FMP API] Dividend response status: {dividend_response.status_code}")
logger.info(f"[FMP API] Dividend response content: {dividend_response.text[:500]}...") # Log first 500 chars
if dividend_response.status_code != 200:
logger.error(f"[FMP API] Error for dividend data: {dividend_response.status_code}")
logger.error(f"[FMP API] Response content: {dividend_response.text}")
return None
dividend_data = dividend_response.json()
logger.info(f"[FMP API] Dividend data for {ticker}: {dividend_data}")
if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]:
logger.warning(f"[FMP API] No dividend history found for {ticker}")
return None
# Calculate TTM dividend
dividends = pd.DataFrame(dividend_data["historical"])
dividends["date"] = pd.to_datetime(dividends["date"])
dividends = dividends.sort_values("date")
# Get dividends in the last 12 months
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
recent_dividends = dividends[dividends["date"] >= one_year_ago]
if recent_dividends.empty:
logger.warning(f"[FMP API] No recent dividends found for {ticker}")
return None
# Calculate TTM dividend
ttm_dividend = recent_dividends["dividend"].sum()
# Calculate yield
yield_pct = (ttm_dividend / current_price) * 100
logger.info(f"[FMP API] Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})")
# For high-yield ETFs, verify the yield is reasonable
if ticker in HIGH_YIELD_ETFS:
expected_yield = HIGH_YIELD_ETFS[ticker]["expected_yield"]
if yield_pct < expected_yield * 0.5: # If yield is less than 50% of expected
logger.error(f"[FMP API] Calculated yield {yield_pct:.2f}% for {ticker} is much lower than expected {expected_yield}%")
logger.error(f"[FMP API] TTM dividend: ${ttm_dividend:.2f}")
logger.error(f"[FMP API] Current price: ${current_price:.2f}")
logger.error(f"[FMP API] Recent dividends:\n{recent_dividends}")
# Determine distribution period
if len(recent_dividends) >= 2:
intervals = recent_dividends["date"].diff().dt.days.dropna()
avg_interval = intervals.mean()
if avg_interval <= 45:
dist_period = "Monthly"
elif avg_interval <= 100:
dist_period = "Quarterly"
elif avg_interval <= 200:
dist_period = "Semi-Annually"
else:
dist_period = "Annually"
else:
dist_period = "Unknown"
etf_data = {
"Ticker": ticker,
"Price": current_price,
"Yield (%)": yield_pct,
"Distribution Period": dist_period,
"Risk Level": "High" if ticker in HIGH_YIELD_ETFS else "Moderate",
"data_source": "FMP API", # Add data source identifier
"raw_data": { # Store raw data for debugging
"profile": profile,
"dividend_history": dividend_data["historical"][:5] # Store first 5 dividend records
}
}
logger.info(f"[FMP API] Final data for {ticker}: {etf_data}")
return etf_data
except Exception as e:
logger.error(f"[FMP API] Error fetching data for {ticker}: {str(e)}")
logger.error(traceback.format_exc())
return None
def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]:
"""
Fetch ETF data from yfinance as fallback.
Args:
ticker: ETF ticker symbol
Returns:
Dictionary with ETF data or None if failed
"""
try:
logger.info(f"Fetching yfinance data for {ticker}")
etf = yf.Ticker(ticker)
info = etf.info
# Get the most recent dividend yield
if 'dividendYield' in info and info['dividendYield'] is not None:
yield_pct = info['dividendYield'] * 100
logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%")
else:
# Try to calculate from dividend history
hist = etf.history(period="1y")
if not hist.empty and 'Dividends' in hist.columns:
annual_dividend = hist['Dividends'].sum()
current_price = info.get('regularMarketPrice', 0)
yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0
logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%")
else:
yield_pct = 0
logger.warning(f"No yield data found for {ticker} in yfinance")
# Get current price
current_price = info.get('regularMarketPrice', 0)
if current_price <= 0:
current_price = info.get('regularMarketPreviousClose', 0)
logger.warning(f"Using previous close price for {ticker}: {current_price}")
etf_data = {
"Ticker": ticker,
"Price": current_price,
"Yield (%)": yield_pct,
"Risk Level": "High", # Default for high-yield ETFs
"data_source": "yfinance" # Add data source identifier
}
logger.info(f"yfinance data for {ticker}: {etf_data}")
return etf_data
except Exception as e:
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
return None
def fetch_etf_data(tickers: List[str]) -> pd.DataFrame:
"""
Fetch ETF data using FMP API with yfinance fallback.
Uses HIGH_YIELD_ETFS data only as a last resort.
Args:
tickers: List of ETF tickers
Returns:
DataFrame with ETF data
"""
try:
data = {}
cache_dir = Path("cache")
cache_dir.mkdir(exist_ok=True)
logger.info("=== Starting ETF data fetch ===")
logger.info(f"Force refresh enabled: {st.session_state.get('force_refresh_data', False)}")
logger.info(f"Cache directory: {cache_dir.absolute()}")
logger.info(f"FMP API enabled: {USE_FMP_API}")
logger.info(f"FMP API key available: {bool(FMP_API_KEY)}")
for ticker in tickers:
if not ticker: # Skip empty tickers
continue
logger.info(f"\n=== Processing {ticker} ===")
# Check cache first if not forcing refresh
cache_file = cache_dir / f"{ticker}_data.json"
logger.info(f"Cache file path: {cache_file.absolute()}")
logger.info(f"Cache file exists: {cache_file.exists()}")
if not st.session_state.get("force_refresh_data", False) and cache_file.exists():
try:
with open(cache_file, 'r') as f:
cached_data = json.load(f)
cache_time = datetime.fromisoformat(cached_data.get('timestamp', '2000-01-01'))
cache_age = datetime.now() - cache_time
logger.info(f"Cache age: {cache_age.total_seconds() / 3600:.2f} hours")
if cache_age < timedelta(hours=24):
logger.info(f"Using cached data for {ticker}")
data[ticker] = cached_data['data']
continue
else:
logger.info(f"Cache expired for {ticker} (age: {cache_age.total_seconds() / 3600:.2f} hours)")
except Exception as e:
logger.warning(f"Error reading cache for {ticker}: {str(e)}")
logger.warning(traceback.format_exc())
else:
logger.info(f"No cache found or force refresh enabled for {ticker}")
# Try FMP first if enabled
if USE_FMP_API and FMP_API_KEY:
logger.info(f"Attempting to fetch data from FMP API for {ticker}")
etf_data = fetch_etf_data_fmp(ticker)
if etf_data is not None:
logger.info(f"Successfully fetched data from FMP API for {ticker}")
# Cache the data
try:
cache_data = {
'timestamp': datetime.now().isoformat(),
'data': etf_data
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Cached FMP data for {ticker}")
except Exception as e:
logger.warning(f"Error caching FMP data for {ticker}: {str(e)}")
logger.warning(traceback.format_exc())
data[ticker] = etf_data
st.session_state.api_calls += 1
logger.info(f"Total API calls: {st.session_state.api_calls}")
continue
else:
logger.warning(f"FMP API fetch failed for {ticker}, falling back to yfinance")
# If FMP fails, try yfinance
logger.info(f"Fetching data from yfinance for {ticker}")
etf_data = fetch_etf_data_yfinance(ticker)
if etf_data is not None:
logger.info(f"Successfully fetched data from yfinance for {ticker}")
# Cache the data
try:
cache_data = {
'timestamp': datetime.now().isoformat(),
'data': etf_data
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Cached yfinance data for {ticker}")
except Exception as e:
logger.warning(f"Error caching yfinance data for {ticker}: {str(e)}")
logger.warning(traceback.format_exc())
data[ticker] = etf_data
continue
# Only use HIGH_YIELD_ETFS data if both FMP and yfinance failed
if ticker in HIGH_YIELD_ETFS:
logger.info(f"Using fallback data from HIGH_YIELD_ETFS for {ticker}")
etf_data = {
"Ticker": ticker,
"Price": 25.0, # Default price for fallback
"Yield (%)": HIGH_YIELD_ETFS[ticker]["expected_yield"],
"Distribution Period": HIGH_YIELD_ETFS[ticker]["frequency"],
"Risk Level": "High",
"data_source": "HIGH_YIELD_ETFS"
}
data[ticker] = etf_data
else:
logger.error(f"Failed to fetch data for {ticker} from all sources")
if not data:
st.error("No ETF data could be fetched")
return pd.DataFrame()
df = pd.DataFrame(data.values())
# Validate the data
if df.empty:
st.error("No ETF data could be fetched")
return pd.DataFrame()
if (df["Price"] <= 0).any():
st.error("Some ETFs have invalid prices")
return pd.DataFrame()
if (df["Yield (%)"] <= 0).any():
st.warning("Some ETFs have zero or negative yields")
# Log data sources used
if "data_source" in df.columns:
source_counts = df["data_source"].value_counts()
logger.info(f"Data sources used:\n{source_counts}")
logger.info(f"Final DataFrame:\n{df}")
return df
except Exception as e:
st.error(f"Error fetching ETF data: {str(e)}")
logger.error(f"Error in fetch_etf_data: {str(e)}")
logger.error(traceback.format_exc())
return pd.DataFrame()
def run_portfolio_simulation(
tickers: List[str],
weights: List[float],
initial_investment: float,
start_date: str,
end_date: str,
rebalance_frequency: str = 'monthly',
use_fmp: bool = True
) -> Dict[str, Any]:
"""
Run portfolio simulation with the given parameters.
Args:
tickers: List of ETF tickers
weights: List of portfolio weights
initial_investment: Initial investment amount
start_date: Start date for simulation
end_date: End date for simulation
rebalance_frequency: Frequency of rebalancing
use_fmp: Whether to use FMP API for data
Returns:
Dictionary with simulation results
"""
try:
# Validate inputs
if not tickers or not weights:
raise ValueError("No tickers or weights provided")
if len(tickers) != len(weights):
raise ValueError("Number of tickers must match number of weights")
if not all(0 <= w <= 1 for w in weights):
raise ValueError("Weights must be between 0 and 1")
if sum(weights) != 1:
raise ValueError("Weights must sum to 1")
# Get historical data
historical_data = {}
for ticker in tickers:
if use_fmp and FMP_API_KEY:
data = fetch_etf_data_fmp(ticker)
if data and 'historical' in data:
historical_data[ticker] = data['historical']
else:
logger.warning(f"Falling back to yfinance for {ticker}")
data = fetch_etf_data_yfinance(ticker)
if data and 'historical' in data:
historical_data[ticker] = data['historical']
else:
data = fetch_etf_data_yfinance(ticker)
if data and 'historical' in data:
historical_data[ticker] = data['historical']
if not historical_data:
raise ValueError("No historical data available for any tickers")
# Create portfolio DataFrame
portfolio = pd.DataFrame()
for ticker, data in historical_data.items():
portfolio[ticker] = data['close']
# Calculate portfolio returns
portfolio_returns = portfolio.pct_change()
portfolio_returns = portfolio_returns.fillna(0)
# Calculate weighted returns
weighted_returns = pd.DataFrame()
for i, ticker in enumerate(tickers):
weighted_returns[ticker] = portfolio_returns[ticker] * weights[i]
portfolio_returns['portfolio'] = weighted_returns.sum(axis=1)
# Calculate cumulative returns
cumulative_returns = (1 + portfolio_returns).cumprod()
# Calculate portfolio value
portfolio_value = initial_investment * cumulative_returns['portfolio']
# Calculate metrics
total_return = (portfolio_value.iloc[-1] / initial_investment) - 1
annual_return = (1 + total_return) ** (252 / len(portfolio_value)) - 1
volatility = portfolio_returns['portfolio'].std() * np.sqrt(252)
sharpe_ratio = annual_return / volatility if volatility != 0 else 0
# Calculate drawdown
rolling_max = portfolio_value.expanding().max()
drawdown = (portfolio_value - rolling_max) / rolling_max
max_drawdown = drawdown.min()
return {
'portfolio_value': portfolio_value,
'returns': portfolio_returns,
'cumulative_returns': cumulative_returns,
'total_return': total_return,
'annual_return': annual_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'drawdown': drawdown
}
except Exception as e:
logger.error(f"Error in portfolio simulation: {str(e)}")
st.error(f"Error running portfolio simulation: {str(e)}")
return None
def portfolio_summary(final_alloc: pd.DataFrame) -> None:
"""
Display a summary of the portfolio allocation.
Args:
final_alloc: DataFrame containing the portfolio allocation
"""
if final_alloc is None or final_alloc.empty:
st.warning("No portfolio data available.")
return
try:
# Calculate key metrics
total_capital = final_alloc["Capital Allocated ($)"].sum()
total_income = final_alloc["Income Contributed ($)"].sum()
# Calculate weighted average yield
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
# Display metrics in columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Capital", f"${total_capital:,.2f}")
with col2:
st.metric("Annual Income", f"${total_income:,.2f}")
st.metric("Monthly Income", f"${total_income/12:,.2f}")
with col3:
st.metric("Average Yield", f"{weighted_yield:.2f}%")
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
# Display allocation chart
fig = px.pie(
final_alloc,
values="Allocation (%)",
names="Ticker",
title="Portfolio Allocation by ETF",
hover_data={
"Ticker": True,
"Allocation (%)": ":.2f",
"Yield (%)": ":.2f",
"Capital Allocated ($)": ":,.2f",
"Income Contributed ($)": ":,.2f"
}
)
st.plotly_chart(fig, use_container_width=True)
# Display detailed allocation table
st.subheader("Detailed Allocation")
display_df = final_alloc.copy()
display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
# Ensure data_source column exists and rename it for display
if "data_source" in display_df.columns:
display_df = display_df.rename(columns={"data_source": "Data Source"})
else:
display_df["Data Source"] = "Unknown"
# Select and order columns for display
display_columns = [
"Ticker",
"Allocation (%)",
"Yield (%)",
"Price",
"Shares",
"Capital Allocated ($)",
"Monthly Income",
"Income Contributed ($)",
"Risk Level",
"Data Source"
]
# Format the display
st.dataframe(
display_df[display_columns].style.format({
"Allocation (%)": "{:.2f}%",
"Yield (%)": "{:.2f}%",
"Price": "${:,.2f}",
"Shares": "{:,.4f}",
"Capital Allocated ($)": "${:,.2f}",
"Monthly Income": "${:,.2f}",
"Income Contributed ($)": "${:,.2f}"
}),
column_config={
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
"Allocation (%)": st.column_config.NumberColumn(
"Allocation (%)",
min_value=0.0,
max_value=100.0,
step=0.1,
format="%.1f",
required=True
),
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
"Price": st.column_config.TextColumn("Price", disabled=True),
"Shares": st.column_config.TextColumn("Shares", disabled=True),
"Capital Allocated ($)": st.column_config.TextColumn("Capital Allocated ($)", disabled=True),
"Monthly Income": st.column_config.TextColumn("Monthly Income", disabled=True),
"Income Contributed ($)": st.column_config.TextColumn("Income Contributed ($)", disabled=True),
"Risk Level": st.column_config.TextColumn("Risk Level", disabled=True),
"Data Source": st.column_config.TextColumn("Data Source", disabled=True)
},
hide_index=True,
use_container_width=True
)
except Exception as e:
st.error(f"Error calculating portfolio summary: {str(e)}")
logger.error(f"Error in portfolio_summary: {str(e)}")
logger.error(traceback.format_exc())
def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool:
"""
Save portfolio allocation to a JSON file.
Args:
portfolio_name: Name of the portfolio
final_alloc: DataFrame containing portfolio allocation
mode: Portfolio mode ("Income Target" or "Capital Target")
target: Target value (income or capital)
Returns:
bool: True if save was successful, False otherwise
"""
try:
# Create portfolios directory if it doesn't exist
portfolios_dir = Path("portfolios")
portfolios_dir.mkdir(exist_ok=True)
# Prepare portfolio data
portfolio_data = {
"name": portfolio_name,
"created_at": datetime.now().isoformat(),
"mode": mode,
"target": target,
"allocations": []
}
# Convert DataFrame to list of dictionaries
for _, row in final_alloc.iterrows():
allocation = {
"ticker": row["Ticker"],
"allocation": float(row["Allocation (%)"]),
"yield": float(row["Yield (%)"]),
"price": float(row["Price"]),
"risk_level": row["Risk Level"]
}
portfolio_data["allocations"].append(allocation)
# Save to JSON file
file_path = portfolios_dir / f"{portfolio_name}.json"
with open(file_path, 'w') as f:
json.dump(portfolio_data, f, indent=2)
return True
except Exception as e:
st.error(f"Error saving portfolio: {str(e)}")
return False
def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]:
"""
Load portfolio allocation from a JSON file.
Args:
portfolio_name: Name of the portfolio to load
Returns:
Tuple containing:
- DataFrame with portfolio allocation
- Portfolio mode
- Target value
"""
try:
# Check if portfolio exists
file_path = Path("portfolios") / f"{portfolio_name}.json"
if not file_path.exists():
st.error(f"Portfolio '{portfolio_name}' not found.")
return None, None, None
# Load portfolio data
with open(file_path, 'r') as f:
portfolio_data = json.load(f)
# Convert allocations to DataFrame
allocations = portfolio_data["allocations"]
df = pd.DataFrame(allocations)
# Rename columns to match expected format
df = df.rename(columns={
"allocation": "Allocation (%)",
"yield": "Yield (%)",
"price": "Price"
})
return df, portfolio_data["mode"], portfolio_data["target"]
except Exception as e:
st.error(f"Error loading portfolio: {str(e)}")
return None, None, None
def list_saved_portfolios() -> List[str]:
"""
List all saved portfolios.
Returns:
List of portfolio names
"""
try:
portfolios_dir = Path("portfolios")
if not portfolios_dir.exists():
return []
# Get all JSON files in the portfolios directory
portfolio_files = list(portfolios_dir.glob("*.json"))
# Extract portfolio names from filenames
portfolio_names = [f.stem for f in portfolio_files]
return sorted(portfolio_names)
except Exception as e:
st.error(f"Error listing portfolios: {str(e)}")
return []
def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Allocate portfolio for income target.
Args:
df: DataFrame with ETF data
target: Monthly income target
etf_allocations: List of ETF allocations
Returns:
DataFrame with final allocation
"""
try:
# Create final allocation DataFrame
final_alloc = df.copy()
# Initialize allocation column if it doesn't exist
if "Allocation (%)" not in final_alloc.columns:
final_alloc["Allocation (%)"] = 0.0
# Set allocations
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
if mask.any():
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
else:
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
# Verify allocations are set
if final_alloc["Allocation (%)"].sum() == 0:
logger.error("No allocations were set")
return None
# Calculate required capital for income target
monthly_income = target
annual_income = monthly_income * 12
# Calculate weighted average yield
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
if weighted_yield == 0:
logger.error("Weighted yield is zero")
return None
# Calculate required capital based on weighted yield
required_capital = (annual_income / weighted_yield) * 100
# Calculate capital allocation and income
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
# Verify calculations
total_income = final_alloc["Income Contributed ($)"].sum()
if abs(total_income - annual_income) > 1.0: # Allow for small rounding errors
logger.warning(f"Total income ({total_income}) does not match target ({annual_income})")
logger.info(f"Income allocation completed. Required capital: ${required_capital:,.2f}")
logger.info(f"Final allocations:\n{final_alloc}")
return final_alloc
except Exception as e:
logger.error(f"Error in income allocation: {str(e)}")
logger.error(traceback.format_exc())
return None
def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Allocate portfolio for capital target.
Args:
df: DataFrame with ETF data
initial_capital: Initial capital amount
etf_allocations: List of ETF allocations
Returns:
DataFrame with final allocation
"""
try:
# Create final allocation DataFrame
final_alloc = df.copy()
# Initialize allocation column if it doesn't exist
if "Allocation (%)" not in final_alloc.columns:
final_alloc["Allocation (%)"] = 0.0
# Set allocations
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
if mask.any():
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
else:
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
# Verify allocations are set
if final_alloc["Allocation (%)"].sum() == 0:
logger.error("No allocations were set")
return None
# Calculate capital allocation and income
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
# Verify calculations
total_capital = final_alloc["Capital Allocated ($)"].sum()
if abs(total_capital - initial_capital) > 1.0: # Allow for small rounding errors
logger.warning(f"Total capital ({total_capital}) does not match initial capital ({initial_capital})")
logger.info(f"Capital allocation completed. Initial capital: ${initial_capital:,.2f}")
logger.info(f"Final allocations:\n{final_alloc}")
return final_alloc
except Exception as e:
logger.error(f"Error in capital allocation: {str(e)}")
logger.error(traceback.format_exc())
return None
def reset_simulation():
"""Reset all simulation data and state."""
st.session_state.simulation_run = False
st.session_state.df_data = None
st.session_state.final_alloc = None
st.session_state.mode = 'Capital Target'
st.session_state.target = 0
st.session_state.initial_capital = 0
st.session_state.enable_drip = False
st.session_state.enable_erosion = False
st.rerun()
def test_fmp_connection() -> bool:
"""Test connection to FMP API."""
try:
if not FMP_API_KEY:
st.error("FMP API key not found in environment variables")
return False
session = get_fmp_session()
test_url = f"{FMP_BASE_URL}/profile/SPY?apikey={FMP_API_KEY}"
logger.info(f"Making FMP API test call to {test_url}")
response = session.get(test_url)
st.session_state.api_calls += 1
logger.info(f"FMP API call count: {st.session_state.api_calls}")
if response.status_code == 200:
st.success("Successfully connected to FMP API")
return True
else:
st.error(f"Failed to connect to FMP API: {response.status_code}")
logger.error(f"FMP API test failed: {response.text}")
return False
except Exception as e:
st.error(f"Error testing FMP connection: {str(e)}")
logger.error(f"FMP API test error: {str(e)}")
return False
def get_cache_stats() -> Dict[str, Any]:
"""
Get statistics about the cache usage.
Returns:
Dictionary containing cache statistics
"""
try:
cache_dir = Path("cache")
if not cache_dir.exists():
return {
"ticker_count": 0,
"file_count": 0,
"total_size_kb": 0
}
# Get all cache files
cache_files = list(cache_dir.glob("**/*.json"))
# Count unique tickers
tickers = set()
for file in cache_files:
# Extract ticker from filename (assuming format: ticker_data_type.json)
ticker = file.stem.split('_')[0]
tickers.add(ticker)
# Calculate total size
total_size = sum(file.stat().st_size for file in cache_files)
return {
"ticker_count": len(tickers),
"file_count": len(cache_files),
"total_size_kb": total_size / 1024 # Convert to KB
}
except Exception as e:
logger.error(f"Error getting cache stats: {str(e)}")
return {
"ticker_count": 0,
"file_count": 0,
"total_size_kb": 0
}
def clear_cache(ticker: Optional[str] = None) -> None:
"""
Clear cache files for a specific ticker or all tickers.
Args:
ticker: Optional ticker symbol to clear cache for. If None, clears all cache.
"""
try:
cache_dir = Path("cache")
if not cache_dir.exists():
return
if ticker:
# Clear cache for specific ticker
pattern = f"{ticker.upper()}_*.json"
cache_files = list(cache_dir.glob(f"**/{pattern}"))
else:
# Clear all cache files
cache_files = list(cache_dir.glob("**/*.json"))
# Delete cache files
for file in cache_files:
try:
file.unlink()
logger.info(f"Deleted cache file: {file}")
except Exception as e:
logger.error(f"Error deleting cache file {file}: {str(e)}")
except Exception as e:
logger.error(f"Error clearing cache: {str(e)}")
# Set page config
st.set_page_config(
page_title="ETF Portfolio Builder",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state variables
if 'simulation_run' not in st.session_state:
st.session_state.simulation_run = False
logger.info("Initialized simulation_run in session state")
if 'df_data' not in st.session_state:
st.session_state.df_data = None
logger.info("Initialized df_data in session state")
if 'final_alloc' not in st.session_state:
st.session_state.final_alloc = None
logger.info("Initialized final_alloc in session state")
if 'mode' not in st.session_state:
st.session_state.mode = 'Capital Target'
logger.info("Initialized mode in session state")
if 'target' not in st.session_state:
st.session_state.target = 0
logger.info("Initialized target in session state")
if 'initial_capital' not in st.session_state:
st.session_state.initial_capital = 0
logger.info("Initialized initial_capital in session state")
if 'enable_drip' not in st.session_state:
st.session_state.enable_drip = False
logger.info("Initialized enable_drip in session state")
if 'enable_erosion' not in st.session_state:
st.session_state.enable_erosion = False
logger.info("Initialized enable_erosion in session state")
if 'api_calls' not in st.session_state:
st.session_state.api_calls = 0
logger.info("Initialized api_calls in session state")
if 'force_refresh_data' not in st.session_state:
st.session_state.force_refresh_data = False
logger.info("Initialized force_refresh_data in session state")
if 'etf_allocations' not in st.session_state:
st.session_state.etf_allocations = []
logger.info("Initialized empty etf_allocations in session state")
if 'risk_tolerance' not in st.session_state:
st.session_state.risk_tolerance = "Moderate"
logger.info("Initialized risk_tolerance in session state")
# Main title
st.title("📈 ETF Portfolio Builder")
# Function to remove ticker
def remove_ticker(ticker_to_remove: str) -> None:
"""Remove a ticker from the portfolio."""
try:
logger.info(f"Removing ticker: {ticker_to_remove}")
current_allocations = list(st.session_state.etf_allocations)
st.session_state.etf_allocations = [etf for etf in current_allocations if etf["ticker"] != ticker_to_remove]
logger.info(f"Updated allocations after removal: {st.session_state.etf_allocations}")
st.rerun()
except Exception as e:
logger.error(f"Error removing ticker: {str(e)}")
st.error(f"Error removing ticker: {str(e)}")
# Display current tickers in the main space
if st.session_state.etf_allocations:
st.subheader("Selected ETFs")
st.markdown("""
<style>
.ticker-container {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-bottom: 20px;
}
.ticker-item {
display: inline-flex;
align-items: center;
color: #2ecc71;
font-size: 1.1em;
font-weight: 500;
}
.ticker-close {
margin-left: 2px;
cursor: pointer;
font-size: 1.1em;
opacity: 0.7;
}
.ticker-close:hover {
opacity: 1;
}
</style>
""", unsafe_allow_html=True)
# Create a container for tickers
ticker_container = st.container()
with ticker_container:
# Display each ticker with a close button
for etf in st.session_state.etf_allocations:
col1, col2 = st.columns([0.05, 0.95]) # Adjusted column ratio
with col1:
if st.button("×", key=f"remove_{etf['ticker']}",
help=f"Remove {etf['ticker']} from portfolio"):
remove_ticker(etf['ticker'])
with col2:
st.markdown(f"<div class='ticker-item'>{etf['ticker']}</div>", unsafe_allow_html=True)
# Debug information
logger.info("=== Session State Debug ===")
logger.info(f"Full session state: {dict(st.session_state)}")
logger.info(f"ETF allocations type: {type(st.session_state.etf_allocations)}")
logger.info(f"ETF allocations content: {st.session_state.etf_allocations}")
logger.info("=== End Session State Debug ===")
def add_etf_to_portfolio(ticker: str) -> bool:
"""Add an ETF to the portfolio with proper validation and error handling."""
try:
logger.info("=== Adding ETF to Portfolio ===")
logger.info(f"Input ticker: {ticker}")
logger.info(f"Current allocations before adding: {st.session_state.etf_allocations}")
logger.info(f"Current allocations type: {type(st.session_state.etf_allocations)}")
# Validate ticker format
if not re.match(r'^[A-Z]{1,7}$', ticker.upper()):
logger.warning(f"Invalid ticker format: {ticker}")
st.error("Invalid ticker format. Must be 1-7 uppercase letters.")
return False
# Check if ticker already exists
if any(etf["ticker"] == ticker.upper() for etf in st.session_state.etf_allocations):
logger.warning(f"Ticker {ticker.upper()} already exists in portfolio")
st.warning(f"{ticker.upper()} is already in your portfolio.")
return False
# Verify ticker exists by fetching data
logger.info(f"Fetching data for ticker: {ticker.upper()}")
etf_data = fetch_etf_data([ticker.upper()])
logger.info(f"Fetched ETF data: {etf_data}")
if etf_data is None or etf_data.empty:
logger.warning(f"Unknown ticker: {ticker.upper()}")
st.error(f"Unknown ticker: {ticker.upper()}. Please enter a valid ETF ticker.")
return False
# Create new ETF entry
new_etf = {
"ticker": ticker.upper(),
"allocation": 0.0
}
logger.info(f"Created new ETF entry: {new_etf}")
# Update session state
current_allocations = list(st.session_state.etf_allocations)
current_allocations.append(new_etf)
st.session_state.etf_allocations = current_allocations
logger.info(f"Updated session state allocations: {st.session_state.etf_allocations}")
logger.info(f"Updated allocations type: {type(st.session_state.etf_allocations)}")
# Recalculate allocations based on risk tolerance
if len(st.session_state.etf_allocations) > 0:
risk_tolerance = st.session_state.risk_tolerance
tickers = [etf["ticker"] for etf in st.session_state.etf_allocations]
logger.info(f"Recalculating allocations for tickers: {tickers}")
df_data = fetch_etf_data(tickers)
logger.info(f"Fetched data for recalculation: {df_data}")
if df_data is not None and not df_data.empty:
etf_metrics = df_data.to_dict('records')
new_allocations = optimize_portfolio_allocation(
etf_metrics,
risk_tolerance,
pd.DataFrame()
)
logger.info(f"Calculated new allocations: {new_allocations}")
st.session_state.etf_allocations = new_allocations
logger.info(f"Updated session state with new allocations: {st.session_state.etf_allocations}")
logger.info("=== End Adding ETF to Portfolio ===")
return True
except Exception as e:
logger.error("=== Error Adding ETF to Portfolio ===")
logger.error(f"Error: {str(e)}")
logger.error(traceback.format_exc())
st.error(f"Error adding ETF: {str(e)}")
return False
def remove_etf_from_portfolio(index: int) -> bool:
"""Remove an ETF from the portfolio with proper validation and error handling."""
try:
logger.info(f"Attempting to remove ETF at index: {index}")
logger.info(f"Current allocations before removal: {st.session_state.etf_allocations}")
if not st.session_state.etf_allocations or index >= len(st.session_state.etf_allocations):
logger.warning(f"Invalid ETF index for removal: {index}")
return False
# Create new list without the removed ETF
current_allocations = list(st.session_state.etf_allocations)
removed_etf = current_allocations.pop(index)
st.session_state.etf_allocations = current_allocations
logger.info(f"Successfully removed ETF: {removed_etf}")
logger.info(f"Updated allocations: {st.session_state.etf_allocations}")
# Recalculate allocations if there are remaining ETFs
if st.session_state.etf_allocations:
risk_tolerance = st.session_state.risk_tolerance
tickers = [etf["ticker"] for etf in st.session_state.etf_allocations]
df_data = fetch_etf_data(tickers)
if df_data is not None and not df_data.empty:
etf_metrics = df_data.to_dict('records')
new_allocations = optimize_portfolio_allocation(
etf_metrics,
risk_tolerance,
pd.DataFrame()
)
st.session_state.etf_allocations = new_allocations
logger.info(f"Recalculated allocations: {new_allocations}")
return True
except Exception as e:
logger.error(f"Error removing ETF: {str(e)}")
logger.error(traceback.format_exc())
st.error(f"Error removing ETF: {str(e)}")
return False
# Sidebar for ETF input
with st.sidebar:
st.header("ETF Allocation")
# Create a container for ETF input
with st.container():
# Input field for ETF ticker only
new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., SCHD)")
# Add button to add ETF
add_etf_button = st.button("ADD ETF", use_container_width=True)
if add_etf_button:
logger.info("=== Add ETF Button Clicked ===")
logger.info(f"Input ticker: {new_ticker}")
logger.info(f"Current allocations: {st.session_state.etf_allocations}")
if not new_ticker:
st.error("Please enter an ETF ticker.")
logger.warning("No ticker provided")
elif len(st.session_state.etf_allocations) >= 10:
st.error("Maximum of 10 ETFs allowed in portfolio.")
logger.warning("Maximum ETF limit reached")
else:
if add_etf_to_portfolio(new_ticker):
st.success(f"Added {new_ticker.upper()} to portfolio.")
logger.info("Successfully added ETF, triggering rerun")
st.rerun()
# Display total allocation
if st.session_state.etf_allocations:
current_total = sum(etf["allocation"] for etf in st.session_state.etf_allocations)
st.metric("Total Allocation (%)", f"{current_total:.2f}")
# Add a warning if total is not 100%
if abs(current_total - 100) > 0.1:
st.warning("Total allocation should be 100%")
else:
st.info("No ETFs added yet. Please add ETFs to your portfolio.")
logger.info("No ETFs in portfolio")
# Mode selection
simulation_mode = st.radio(
"Select Simulation Mode",
["Capital Target", "Income Target"]
)
if simulation_mode == "Income Target":
monthly_target = st.number_input(
"Monthly Income Target ($)",
min_value=100.0,
max_value=100000.0,
value=1000.0,
step=100.0
)
ANNUAL_TARGET = monthly_target * 12
else:
initial_capital = st.number_input(
"Initial Capital ($)",
min_value=1000.0,
max_value=1000000.0,
value=100000.0,
step=1000.0
)
# Risk tolerance
risk_tolerance = st.select_slider(
"Risk Tolerance",
options=["Conservative", "Moderate", "Aggressive"],
value=st.session_state.get("risk_tolerance", "Moderate"),
key="risk_tolerance_slider"
)
# Check if risk tolerance changed
if risk_tolerance != st.session_state.get("risk_tolerance"):
logger.info("=== Risk Tolerance Change Detection ===")
logger.info(f"Current risk tolerance in session state: {st.session_state.get('risk_tolerance')}")
logger.info(f"New risk tolerance from slider: {risk_tolerance}")
# Update session state
st.session_state.risk_tolerance = risk_tolerance
# Recalculate allocations if we have ETFs
if st.session_state.etf_allocations:
logger.info("Recalculating allocations due to risk tolerance change")
tickers = [etf["ticker"] for etf in st.session_state.etf_allocations]
df_data = fetch_etf_data(tickers)
if df_data is not None and not df_data.empty:
etf_metrics = df_data.to_dict('records')
new_allocations = optimize_portfolio_allocation(
etf_metrics,
risk_tolerance,
pd.DataFrame()
)
st.session_state.etf_allocations = new_allocations
logger.info(f"New allocations after risk tolerance change: {new_allocations}")
# If simulation has been run, update final allocation
if st.session_state.simulation_run and st.session_state.df_data is not None:
if st.session_state.mode == "Income Target":
final_alloc = allocate_for_income(
st.session_state.df_data,
st.session_state.target,
new_allocations
)
else:
final_alloc = allocate_for_capital(
st.session_state.df_data,
st.session_state.initial_capital,
new_allocations
)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.rerun()
# Additional options
st.subheader("Additional Options")
# DRIP option
enable_drip = st.radio(
"Enable Dividend Reinvestment (DRIP)",
["Yes", "No"],
index=1
)
# Erosion options
enable_erosion = st.radio(
"Enable NAV & Yield Erosion",
["Yes", "No"],
index=1
)
# Run simulation button
if st.button("Run Portfolio Simulation", type="primary", use_container_width=True):
if not st.session_state.etf_allocations:
st.error("Please add at least one ETF to your portfolio.")
else:
try:
# Store parameters in session state
st.session_state.mode = simulation_mode
st.session_state.enable_drip = enable_drip == "Yes"
st.session_state.enable_erosion = enable_erosion == "Yes"
if simulation_mode == "Income Target":
st.session_state.target = monthly_target
else:
st.session_state.target = initial_capital
st.session_state.initial_capital = initial_capital
# Run simulation
logger.info("Starting portfolio simulation...")
logger.info(f"ETF allocations: {st.session_state.etf_allocations}")
tickers = [etf["ticker"] for etf in st.session_state.etf_allocations]
df_data = fetch_etf_data(tickers)
logger.info(f"Fetched ETF data:\n{df_data}")
if df_data is not None and not df_data.empty:
if simulation_mode == "Income Target":
logger.info(f"Allocating for income target: ${monthly_target}")
final_alloc = allocate_for_income(df_data, monthly_target, st.session_state.etf_allocations)
else:
logger.info(f"Allocating for capital target: ${initial_capital}")
final_alloc = allocate_for_capital(df_data, initial_capital, st.session_state.etf_allocations)
logger.info(f"Final allocation result:\n{final_alloc}")
if final_alloc is not None and not final_alloc.empty:
st.session_state.simulation_run = True
st.session_state.df_data = df_data
st.session_state.final_alloc = final_alloc
st.success("Portfolio simulation completed!")
st.rerun()
else:
st.error("Failed to generate portfolio allocation. Please check your inputs and try again.")
else:
st.error("Failed to fetch ETF data. Please check your tickers and try again.")
except Exception as e:
st.error(f"Error running simulation: {str(e)}")
logger.error(f"Error in simulation: {str(e)}")
logger.error(traceback.format_exc())
# Add reset simulation button at the bottom of sidebar
if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"):
reset_simulation()
# Add FMP connection status to the navigation bar
st.sidebar.markdown("---")
st.sidebar.subheader("FMP API Status")
connection_status = test_fmp_connection()
if connection_status:
st.sidebar.success("✅ FMP API: Connected")
else:
st.sidebar.error("❌ FMP API: Connection failed")
# Advanced Options section in sidebar
with st.sidebar.expander("Advanced Options"):
# Option to toggle FMP API usage
use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API,
help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs")
if use_fmp_api != USE_FMP_API:
# Update global setting if changed
globals()["USE_FMP_API"] = use_fmp_api
st.success("FMP API usage setting updated")
# Add cache controls
st.subheader("Cache Settings")
# Display cache statistics
cache_stats = get_cache_stats()
st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)")
# Force refresh option
st.session_state.force_refresh_data = st.checkbox(
"Force refresh data (ignore cache)",
value=st.session_state.get("force_refresh_data", False),
help="When enabled, always fetch fresh data from APIs"
)
# Cache clearing options
col1, col2 = st.columns(2)
with col1:
if st.button("Clear All Cache", key="clear_all_cache"):
clear_cache()
st.success("All cache files cleared!")
st.session_state.api_calls = 0
with col2:
ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker")
if st.button("Clear", key="clear_single_cache") and ticker_to_clear:
clear_cache(ticker_to_clear)
st.success(f"Cache cleared for {ticker_to_clear.upper()}")
# Show API call counter
st.write(f"API calls this session: {st.session_state.api_calls}")
# Add option for debug mode and parallel processing
debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.")
parallel_processing = st.checkbox("Enable Parallel Processing", value=True,
help="Fetch data for multiple ETFs simultaneously")
# Add FMP API test button
st.sidebar.subheader("FMP API Testing")
if st.sidebar.button("Test FMP API", key="test_fmp_api_button"):
test_fmp_data_fetching()
st.sidebar.success("Test completed. Check logs for details.")
# Display results and interactive allocation adjustment UI after simulation is run
if st.session_state.simulation_run and st.session_state.df_data is not None:
df = st.session_state.df_data
final_alloc = st.session_state.final_alloc if hasattr(st.session_state, 'final_alloc') else None
# Validate final_alloc DataFrame
if final_alloc is None or final_alloc.empty:
st.error("No portfolio data available. Please run the simulation again.")
st.session_state.simulation_run = False
else:
# Verify required columns exist
required_columns = ["Capital Allocated ($)", "Yield (%)", "Price", "Ticker"]
missing_columns = [col for col in required_columns if col not in final_alloc.columns]
if missing_columns:
st.error(f"Missing required columns in portfolio data: {', '.join(missing_columns)}")
st.session_state.simulation_run = False
else:
# Create tabs for better organization
tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"])
with tab1:
st.subheader("💰 Portfolio Summary")
portfolio_summary(final_alloc)
# Display mode-specific information
if st.session_state.mode == "Income Target":
try:
monthly_target = st.session_state.target
ANNUAL_TARGET = monthly_target * 12
total_capital = final_alloc["Capital Allocated ($)"].sum()
st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).")
except Exception as e:
st.error(f"Error displaying income target information: {str(e)}")
else:
try:
initial_capital = st.session_state.initial_capital
annual_income = final_alloc["Income Contributed ($)"].sum()
monthly_income = annual_income / 12
st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).")
except Exception as e:
st.error(f"Error displaying capital investment information: {str(e)}")
# Add save/load section
st.subheader("💾 Save/Load Portfolio")
# Create two columns for save and load
save_col, load_col = st.columns(2)
with save_col:
st.write("Save current portfolio")
portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name")
if st.button("Save Portfolio", key="save_portfolio"):
if portfolio_name:
if save_portfolio(portfolio_name, final_alloc,
st.session_state.mode,
st.session_state.target):
st.success(f"Portfolio '{portfolio_name}' saved successfully!")
else:
st.warning("Please enter a portfolio name.")
with load_col:
st.write("Load saved portfolio")
if st.button("Show Saved Portfolios", key="show_portfolios"):
saved_portfolios = list_saved_portfolios()
if saved_portfolios:
selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio")
if st.button("Load Portfolio", key="load_portfolio_btn"):
loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio)
if loaded_df is not None:
st.session_state.final_alloc = loaded_df
st.session_state.mode = loaded_mode
st.session_state.target = loaded_target
st.success(f"Portfolio '{selected_portfolio}' loaded successfully!")
st.rerun()
else:
st.info("No saved portfolios found.")
# Display full detailed allocation table
st.subheader("📊 Capital Allocation Details")
try:
# Format currencies for better readability
display_df = final_alloc.copy()
# Calculate shares for each ETF
display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"]
display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}")
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}")
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}")
display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%")
display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}")
# Create a form for the allocation table
with st.form("allocation_form"):
# Create an editable DataFrame
edited_df = st.data_editor(
display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]],
column_config={
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
"Allocation (%)": st.column_config.NumberColumn(
"Allocation (%)",
min_value=0.0,
max_value=100.0,
step=0.1,
format="%.1f",
required=True
),
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
"Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True),
"Risk Level": st.column_config.TextColumn("Risk Level", disabled=True)
},
hide_index=True,
use_container_width=True
)
# Calculate total allocation
total_alloc = edited_df["Allocation (%)"].sum()
# Validate individual allocations
invalid_allocations = edited_df[
(edited_df["Allocation (%)"] <= 0) |
(edited_df["Allocation (%)"] > 100)
]
if not invalid_allocations.empty:
for _, row in invalid_allocations.iterrows():
st.error(f"Invalid allocation for {row['Ticker']}: must be between 0% and 100%")
# Display total allocation with color coding
if abs(total_alloc - 100) <= 0.1:
st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None)
else:
st.metric("Total Allocation (%)", f"{total_alloc:.2f}",
delta=f"{total_alloc - 100:.2f}",
delta_color="off")
st.error("Total allocation must be exactly 100%")
# Create columns for quick actions
col1, col2, col3 = st.columns(3)
with col1:
equal_weight = st.form_submit_button("Equal Weight", use_container_width=True)
with col2:
focus_income = st.form_submit_button("Focus on Income", use_container_width=True)
with col3:
focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True)
# Submit button for manual edits
submitted = st.form_submit_button("Update Allocations",
disabled=abs(total_alloc - 100) > 0.1 or not invalid_allocations.empty,
type="primary",
use_container_width=True)
# Handle form submission
if submitted:
try:
# Convert the edited allocations to a dictionary
new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()}
# Convert to the format expected by allocation functions
etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio updated with new allocations!")
st.rerun()
else:
st.error("Failed to update portfolio. Please try again.")
except Exception as e:
st.error(f"Error updating allocations: {str(e)}")
# Handle quick actions
if equal_weight:
try:
# Calculate equal weight allocation
num_etfs = len(edited_df)
equal_allocation = 100 / num_etfs
# Create new allocations in the format expected by allocation functions
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to equal weight!")
st.rerun()
except Exception as e:
st.error(f"Error applying equal weight: {str(e)}")
elif focus_income:
try:
# Sort by yield and adjust allocations
sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False)
total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum()
# Calculate new allocations based on yield
etf_allocations = []
for _, row in sorted_alloc.iterrows():
yield_val = float(row["Yield (%)"].rstrip('%'))
allocation = (yield_val / total_yield) * 100
etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation})
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to focus on income!")
st.rerun()
except Exception as e:
st.error(f"Error focusing on income: {str(e)}")
elif focus_capital:
try:
# Calculate equal weight allocation (same as equal weight)
num_etfs = len(edited_df)
equal_allocation = 100 / num_etfs
# Create new allocations in the format expected by allocation functions
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to focus on capital!")
st.rerun()
except Exception as e:
st.error(f"Error focusing on capital: {str(e)}")
except Exception as e:
st.error(f"Error displaying allocation details: {str(e)}")
logger.error(f"Error in allocation display: {str(e)}")
logger.error(traceback.format_exc())