ETF_Suite_Portal/pages/ETF_Portfolio_Builder.py

1602 lines
66 KiB
Python

import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from pathlib import Path
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any, Callable, T
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import yfinance as yf
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import os
import sys
import logging
import traceback
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FMP API configuration
FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', ''))
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
# High-yield ETFs reference data
HIGH_YIELD_ETFS = {
"MSTY": {"expected_yield": 125.0, "frequency": "Monthly"},
"SMCY": {"expected_yield": 100.0, "frequency": "Monthly"},
"TSLY": {"expected_yield": 85.0, "frequency": "Monthly"},
"NVDY": {"expected_yield": 75.0, "frequency": "Monthly"},
"ULTY": {"expected_yield": 70.0, "frequency": "Monthly"},
"JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"},
"JEPI": {"expected_yield": 7.8, "frequency": "Monthly"},
"XYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
"QYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
"RYLD": {"expected_yield": 12.0, "frequency": "Monthly"}
}
def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate ETF metrics based on available data.
Args:
ticker: ETF ticker
price_data: DataFrame with price history
dividend_data: DataFrame with dividend history
Returns:
Dictionary with calculated metrics
"""
metrics = {
"Ticker": ticker,
"Yield (%)": 0.0,
"Price": 0.0,
"volatility": 0.0,
"sharpe_ratio": 0.0,
"sortino_ratio": 0.0,
"correlation": 0.0,
"payout_ratio": 0.0,
"score": 0.0,
"Risk Level": "Unknown",
"missing_metrics": []
}
try:
# Get current price from price data
if not price_data.empty:
metrics["Price"] = price_data["close"].iloc[-1]
else:
metrics["missing_metrics"].append("Price")
# Calculate yield if dividend data is available
if not dividend_data.empty and metrics["Price"] > 0:
# Convert date column to datetime if it's not already
dividend_data["date"] = pd.to_datetime(dividend_data["date"])
# Get dividends from the last 12 months
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
recent_dividends = dividend_data[dividend_data["date"] >= one_year_ago]
if not recent_dividends.empty:
# Calculate TTM dividend
ttm_dividend = recent_dividends["dividend"].sum()
# Calculate annual yield
metrics["Yield (%)"] = (ttm_dividend / metrics["Price"]) * 100
logger.info(f"Calculated yield for {ticker}: {metrics['Yield (%)']:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${metrics['Price']:.2f})")
else:
logger.warning(f"No recent dividends found for {ticker}")
metrics["missing_metrics"].append("Yield (%)")
else:
metrics["missing_metrics"].append("Yield (%)")
# Calculate volatility if price data is available
if len(price_data) > 1:
returns = price_data["close"].pct_change().dropna()
metrics["volatility"] = returns.std() * np.sqrt(252) * 100 # Annualized volatility
else:
metrics["missing_metrics"].append("volatility")
# Calculate Sharpe ratio if we have returns and risk-free rate
if len(price_data) > 1:
risk_free_rate = 0.05 # Assuming 5% risk-free rate
excess_returns = returns - (risk_free_rate / 252)
if excess_returns.std() != 0:
metrics["sharpe_ratio"] = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252)
else:
metrics["missing_metrics"].append("sharpe_ratio")
# Calculate Sortino ratio if we have returns
if len(price_data) > 1:
downside_returns = returns[returns < 0]
if len(downside_returns) > 0 and downside_returns.std() != 0:
metrics["sortino_ratio"] = (returns.mean() / downside_returns.std()) * np.sqrt(252)
else:
metrics["missing_metrics"].append("sortino_ratio")
# Categorize risk based on available metrics
metrics["Risk Level"] = categorize_etf_risk(metrics)
# Calculate overall score
metrics["score"] = calculate_etf_score(metrics)
logger.info(f"Calculated metrics for {ticker}: {metrics}")
return metrics
except Exception as e:
logger.error(f"Error calculating metrics for {ticker}: {str(e)}")
logger.error(traceback.format_exc())
return metrics
def categorize_etf_risk(metrics: Dict[str, Any]) -> str:
"""
Categorize ETF risk based on available metrics.
Args:
metrics: Dictionary with ETF metrics
Returns:
Risk category: "Low", "Medium", or "High"
"""
try:
# Initialize risk score
risk_score = 0
available_metrics = 0
# Yield-based risk (higher yield = higher risk)
if "Yield (%)" not in metrics["missing_metrics"]:
if metrics["Yield (%)"] > 10:
risk_score += 3
elif metrics["Yield (%)"] > 6:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Volatility-based risk
if "volatility" not in metrics["missing_metrics"]:
if metrics["volatility"] > 20:
risk_score += 3
elif metrics["volatility"] > 15:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Sharpe ratio-based risk (lower Sharpe = higher risk)
if "sharpe_ratio" not in metrics["missing_metrics"]:
if metrics["sharpe_ratio"] < 0.5:
risk_score += 3
elif metrics["sharpe_ratio"] < 1.0:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Sortino ratio-based risk (lower Sortino = higher risk)
if "sortino_ratio" not in metrics["missing_metrics"]:
if metrics["sortino_ratio"] < 0.5:
risk_score += 3
elif metrics["sortino_ratio"] < 1.0:
risk_score += 2
else:
risk_score += 1
available_metrics += 1
# Calculate average risk score
if available_metrics > 0:
avg_risk_score = risk_score / available_metrics
if avg_risk_score > 2.5:
return "High"
elif avg_risk_score > 1.5:
return "Medium"
else:
return "Low"
# If no metrics available, use yield as fallback
if metrics["Yield (%)"] > 10:
return "High"
elif metrics["Yield (%)"] > 6:
return "Medium"
else:
return "Low"
except Exception as e:
logger.error(f"Error categorizing ETF risk: {str(e)}")
return "Unknown"
def calculate_etf_score(metrics: Dict[str, Any]) -> float:
"""
Calculate overall ETF score based on available metrics.
Args:
metrics: Dictionary with ETF metrics
Returns:
Overall score (0-100)
"""
try:
score = 0
available_metrics = 0
# Yield score (0-25 points)
if "Yield (%)" not in metrics["missing_metrics"]:
if metrics["Yield (%)"] > 10:
score += 25
elif metrics["Yield (%)"] > 6:
score += 20
elif metrics["Yield (%)"] > 3:
score += 15
else:
score += 10
available_metrics += 1
# Volatility score (0-25 points)
if "volatility" not in metrics["missing_metrics"]:
if metrics["volatility"] < 10:
score += 25
elif metrics["volatility"] < 15:
score += 20
elif metrics["volatility"] < 20:
score += 15
else:
score += 10
available_metrics += 1
# Sharpe ratio score (0-25 points)
if "sharpe_ratio" not in metrics["missing_metrics"]:
if metrics["sharpe_ratio"] > 1.5:
score += 25
elif metrics["sharpe_ratio"] > 1.0:
score += 20
elif metrics["sharpe_ratio"] > 0.5:
score += 15
else:
score += 10
available_metrics += 1
# Sortino ratio score (0-25 points)
if "sortino_ratio" not in metrics["missing_metrics"]:
if metrics["sortino_ratio"] > 1.5:
score += 25
elif metrics["sortino_ratio"] > 1.0:
score += 20
elif metrics["sortino_ratio"] > 0.5:
score += 15
else:
score += 10
available_metrics += 1
# Calculate final score
if available_metrics > 0:
return score / available_metrics
return 0
except Exception as e:
logger.error(f"Error calculating ETF score: {str(e)}")
return 0
def calculate_correlation_matrix(price_data_dict: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Calculate correlation matrix between ETFs.
Args:
price_data_dict: Dictionary of price DataFrames for each ETF
Returns:
DataFrame with correlation matrix
"""
try:
# Create a DataFrame with returns for all ETFs
returns_df = pd.DataFrame()
for ticker, price_data in price_data_dict.items():
if len(price_data) > 1:
returns = price_data["close"].pct_change().dropna()
returns_df[ticker] = returns
if returns_df.empty:
logger.warning("No valid price data for correlation calculation")
return pd.DataFrame()
# Calculate correlation matrix
corr_matrix = returns_df.corr()
logger.info(f"Correlation matrix calculated:\n{corr_matrix}")
return corr_matrix
except Exception as e:
logger.error(f"Error calculating correlation matrix: {str(e)}")
logger.error(traceback.format_exc())
return pd.DataFrame()
def optimize_portfolio_allocation(
etf_metrics: List[Dict[str, Any]],
risk_tolerance: str,
correlation_matrix: pd.DataFrame
) -> Dict[str, float]:
"""
Optimize portfolio allocation based on risk tolerance and ETF metrics.
Args:
etf_metrics: List of ETF metrics dictionaries
risk_tolerance: Risk tolerance level ("Conservative", "Moderate", "Aggressive")
correlation_matrix: Correlation matrix between ETFs
Returns:
Dictionary with ETF tickers and their allocations
"""
try:
# Group ETFs by risk category
low_risk = [etf for etf in etf_metrics if etf["Risk Level"] == "Low"]
medium_risk = [etf for etf in etf_metrics if etf["Risk Level"] == "Medium"]
high_risk = [etf for etf in etf_metrics if etf["Risk Level"] == "High"]
# Sort ETFs by score within each risk category
low_risk.sort(key=lambda x: x["score"], reverse=True)
medium_risk.sort(key=lambda x: x["score"], reverse=True)
high_risk.sort(key=lambda x: x["score"], reverse=True)
# Initialize allocations
allocations = {}
if risk_tolerance == "Conservative":
# Conservative allocation
if low_risk:
# Allocate 50% to low-risk ETFs
low_risk_alloc = 50.0 / len(low_risk)
for etf in low_risk:
allocations[etf["Ticker"]] = low_risk_alloc
if medium_risk:
# Allocate 30% to medium-risk ETFs
medium_risk_alloc = 30.0 / len(medium_risk)
for etf in medium_risk:
allocations[etf["Ticker"]] = medium_risk_alloc
if high_risk:
# Allocate 20% to high-risk ETFs
high_risk_alloc = 20.0 / len(high_risk)
for etf in high_risk:
allocations[etf["Ticker"]] = high_risk_alloc
elif risk_tolerance == "Moderate":
# Moderate allocation
if low_risk:
# Allocate 30% to low-risk ETFs
low_risk_alloc = 30.0 / len(low_risk)
for etf in low_risk:
allocations[etf["Ticker"]] = low_risk_alloc
if medium_risk:
# Allocate 40% to medium-risk ETFs
medium_risk_alloc = 40.0 / len(medium_risk)
for etf in medium_risk:
allocations[etf["Ticker"]] = medium_risk_alloc
if high_risk:
# Allocate 30% to high-risk ETFs
high_risk_alloc = 30.0 / len(high_risk)
for etf in high_risk:
allocations[etf["Ticker"]] = high_risk_alloc
else: # Aggressive
# Aggressive allocation
if low_risk:
# Allocate 20% to low-risk ETFs
low_risk_alloc = 20.0 / len(low_risk)
for etf in low_risk:
allocations[etf["Ticker"]] = low_risk_alloc
if medium_risk:
# Allocate 40% to medium-risk ETFs
medium_risk_alloc = 40.0 / len(medium_risk)
for etf in medium_risk:
allocations[etf["Ticker"]] = medium_risk_alloc
if high_risk:
# Allocate 40% to high-risk ETFs
high_risk_alloc = 40.0 / len(high_risk)
for etf in high_risk:
allocations[etf["Ticker"]] = high_risk_alloc
# Adjust allocations based on correlation
if not correlation_matrix.empty:
allocations = adjust_allocations_for_correlation(allocations, correlation_matrix)
# Normalize allocations to ensure they sum to 100%
total_alloc = sum(allocations.values())
if total_alloc > 0:
allocations = {k: (v / total_alloc) * 100 for k, v in allocations.items()}
logger.info(f"Optimized allocations for {risk_tolerance} risk tolerance: {allocations}")
return allocations
except Exception as e:
logger.error(f"Error optimizing portfolio allocation: {str(e)}")
logger.error(traceback.format_exc())
return {}
def adjust_allocations_for_correlation(
allocations: Dict[str, float],
correlation_matrix: pd.DataFrame
) -> Dict[str, float]:
"""
Adjust allocations to reduce correlation between ETFs.
Args:
allocations: Dictionary with current allocations
correlation_matrix: Correlation matrix between ETFs
Returns:
Dictionary with adjusted allocations
"""
try:
adjusted_allocations = allocations.copy()
# Get highly correlated pairs (correlation > 0.7)
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
for j in range(i + 1, len(correlation_matrix.columns)):
ticker1 = correlation_matrix.columns[i]
ticker2 = correlation_matrix.columns[j]
if abs(correlation_matrix.iloc[i, j]) > 0.7:
high_corr_pairs.append((ticker1, ticker2))
# Adjust allocations for highly correlated pairs
for ticker1, ticker2 in high_corr_pairs:
if ticker1 in adjusted_allocations and ticker2 in adjusted_allocations:
# Reduce allocation to the ETF with lower score
if adjusted_allocations[ticker1] > adjusted_allocations[ticker2]:
reduction = adjusted_allocations[ticker1] * 0.1 # Reduce by 10%
adjusted_allocations[ticker1] -= reduction
adjusted_allocations[ticker2] += reduction
else:
reduction = adjusted_allocations[ticker2] * 0.1 # Reduce by 10%
adjusted_allocations[ticker2] -= reduction
adjusted_allocations[ticker1] += reduction
logger.info(f"Adjusted allocations for correlation: {adjusted_allocations}")
return adjusted_allocations
except Exception as e:
logger.error(f"Error adjusting allocations for correlation: {str(e)}")
logger.error(traceback.format_exc())
return allocations
def get_fmp_session():
"""Create a session with retry logic for FMP API calls."""
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5)
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]:
"""
Fetch ETF data from Financial Modeling Prep API.
Args:
ticker: ETF ticker symbol
Returns:
Dictionary with ETF data or None if failed
"""
try:
if not FMP_API_KEY:
logger.warning("FMP API key not configured, skipping FMP data fetch")
return None
session = get_fmp_session()
# Get profile data for current price
profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}"
logger.info(f"Fetching FMP profile data for {ticker}")
profile_response = session.get(profile_url)
if profile_response.status_code != 200:
logger.error(f"FMP API error for {ticker}: {profile_response.status_code}")
logger.error(f"Response content: {profile_response.text}")
return None
profile_data = profile_response.json()
logger.info(f"FMP profile response for {ticker}: {profile_data}")
if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0:
logger.warning(f"No profile data found for {ticker} in FMP")
return None
profile = profile_data[0]
current_price = float(profile.get('price', 0))
if current_price <= 0:
logger.error(f"Invalid price for {ticker}: {current_price}")
return None
# Get dividend history
dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}"
logger.info(f"Fetching FMP dividend data for {ticker}")
dividend_response = session.get(dividend_url)
if dividend_response.status_code != 200:
logger.error(f"FMP API error for dividend data: {dividend_response.status_code}")
logger.error(f"Response content: {dividend_response.text}")
return None
dividend_data = dividend_response.json()
logger.info(f"FMP dividend response for {ticker}: {dividend_data}")
if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]:
logger.warning(f"No dividend history found for {ticker}")
return None
# Calculate TTM dividend
dividends = pd.DataFrame(dividend_data["historical"])
dividends["date"] = pd.to_datetime(dividends["date"])
dividends = dividends.sort_values("date")
# Get dividends in the last 12 months
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
recent_dividends = dividends[dividends["date"] >= one_year_ago]
if recent_dividends.empty:
logger.warning(f"No recent dividends found for {ticker}")
return None
# Calculate TTM dividend
ttm_dividend = recent_dividends["dividend"].sum()
# Calculate yield
yield_pct = (ttm_dividend / current_price) * 100
logger.info(f"Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})")
# For high-yield ETFs, verify the yield is reasonable
if ticker in HIGH_YIELD_ETFS:
expected_yield = HIGH_YIELD_ETFS[ticker]["expected_yield"]
if yield_pct < expected_yield * 0.5: # If yield is less than 50% of expected
logger.error(f"Calculated yield {yield_pct:.2f}% for {ticker} is much lower than expected {expected_yield}%")
logger.error(f"TTM dividend: ${ttm_dividend:.2f}")
logger.error(f"Current price: ${current_price:.2f}")
logger.error(f"Recent dividends:\n{recent_dividends}")
# Determine distribution period
if len(recent_dividends) >= 2:
intervals = recent_dividends["date"].diff().dt.days.dropna()
avg_interval = intervals.mean()
if avg_interval <= 45:
dist_period = "Monthly"
elif avg_interval <= 100:
dist_period = "Quarterly"
elif avg_interval <= 200:
dist_period = "Semi-Annually"
else:
dist_period = "Annually"
else:
dist_period = "Unknown"
etf_data = {
"Ticker": ticker,
"Price": current_price,
"Yield (%)": yield_pct,
"Distribution Period": dist_period,
"Risk Level": "High" if ticker in HIGH_YIELD_ETFS else "Moderate"
}
logger.info(f"FMP data for {ticker}: {etf_data}")
return etf_data
except Exception as e:
logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
logger.error(traceback.format_exc())
return None
def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]:
"""
Fetch ETF data from yfinance as fallback.
Args:
ticker: ETF ticker symbol
Returns:
Dictionary with ETF data or None if failed
"""
try:
logger.info(f"Fetching yfinance data for {ticker}")
etf = yf.Ticker(ticker)
info = etf.info
# Get the most recent dividend yield
if 'dividendYield' in info and info['dividendYield'] is not None:
yield_pct = info['dividendYield'] * 100
logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%")
else:
# Try to calculate from dividend history
hist = etf.history(period="1y")
if not hist.empty and 'Dividends' in hist.columns:
annual_dividend = hist['Dividends'].sum()
current_price = info.get('regularMarketPrice', 0)
yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0
logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%")
else:
yield_pct = 0
logger.warning(f"No yield data found for {ticker} in yfinance")
# Get current price
current_price = info.get('regularMarketPrice', 0)
if current_price <= 0:
current_price = info.get('regularMarketPreviousClose', 0)
logger.warning(f"Using previous close price for {ticker}: {current_price}")
etf_data = {
"Ticker": ticker,
"Price": current_price,
"Yield (%)": yield_pct,
"Risk Level": "High" # Default for high-yield ETFs
}
logger.info(f"yfinance data for {ticker}: {etf_data}")
return etf_data
except Exception as e:
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
return None
def fetch_etf_data(tickers: List[str]) -> pd.DataFrame:
"""
Fetch ETF data using FMP API with yfinance fallback.
Uses HIGH_YIELD_ETFS data only as a last resort.
Args:
tickers: List of ETF tickers
Returns:
DataFrame with ETF data
"""
try:
data = {}
for ticker in tickers:
if not ticker: # Skip empty tickers
continue
logger.info(f"Processing {ticker}")
# Try FMP first
etf_data = fetch_etf_data_fmp(ticker)
# If FMP fails, try yfinance
if etf_data is None:
logger.info(f"Falling back to yfinance for {ticker}")
etf_data = fetch_etf_data_yfinance(ticker)
# Only use HIGH_YIELD_ETFS data if both FMP and yfinance failed
if etf_data is None and ticker in HIGH_YIELD_ETFS:
logger.info(f"Using fallback data from HIGH_YIELD_ETFS for {ticker}")
etf_data = {
"Ticker": ticker,
"Price": 25.0, # Default price for fallback
"Yield (%)": HIGH_YIELD_ETFS[ticker]["expected_yield"],
"Distribution Period": HIGH_YIELD_ETFS[ticker]["frequency"],
"Risk Level": "High"
}
if etf_data is not None:
data[ticker] = etf_data
logger.info(f"Final data for {ticker}: {etf_data}")
else:
logger.error(f"Failed to fetch data for {ticker} from all sources")
if not data:
st.error("No ETF data could be fetched")
return pd.DataFrame()
df = pd.DataFrame(data.values())
# Validate the data
if df.empty:
st.error("No ETF data could be fetched")
return pd.DataFrame()
if (df["Price"] <= 0).any():
st.error("Some ETFs have invalid prices")
return pd.DataFrame()
if (df["Yield (%)"] <= 0).any():
st.warning("Some ETFs have zero or negative yields")
logger.info(f"Final DataFrame:\n{df}")
return df
except Exception as e:
st.error(f"Error fetching ETF data: {str(e)}")
logger.error(f"Error in fetch_etf_data: {str(e)}")
logger.error(traceback.format_exc())
return pd.DataFrame()
def run_portfolio_simulation(
mode: str,
target: float,
risk_tolerance: str,
etf_inputs: List[Dict[str, str]],
enable_drip: bool,
enable_erosion: bool
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Run the portfolio simulation using the new optimization system.
Args:
mode: Simulation mode ("income_target" or "capital_target")
target: Target value (monthly income or initial capital)
risk_tolerance: Risk tolerance level
etf_inputs: List of ETF inputs
enable_drip: Whether to enable dividend reinvestment
enable_erosion: Whether to enable NAV & yield erosion
Returns:
Tuple of (ETF data DataFrame, Final allocation DataFrame)
"""
try:
logger.info(f"Starting portfolio simulation with mode: {mode}, target: {target}")
logger.info(f"ETF inputs: {etf_inputs}")
# Fetch real ETF data
tickers = [input["ticker"] for input in etf_inputs if input["ticker"]] # Filter out empty tickers
logger.info(f"Processing tickers: {tickers}")
if not tickers:
st.error("No valid tickers provided")
return pd.DataFrame(), pd.DataFrame()
# Fetch price and dividend data for all ETFs
price_data_dict = {}
dividend_data_dict = {}
etf_metrics_list = []
for ticker in tickers:
try:
# Fetch price history
price_url = f"{FMP_BASE_URL}/historical-price-full/{ticker}?apikey={FMP_API_KEY}"
price_response = get_fmp_session().get(price_url)
if price_response.status_code == 200:
price_data = pd.DataFrame(price_response.json().get("historical", []))
if not price_data.empty:
price_data_dict[ticker] = price_data
# Fetch dividend history
dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}"
dividend_response = get_fmp_session().get(dividend_url)
if dividend_response.status_code == 200:
dividend_data = pd.DataFrame(dividend_response.json().get("historical", []))
if not dividend_data.empty:
dividend_data_dict[ticker] = dividend_data
# Calculate metrics
if ticker in price_data_dict and ticker in dividend_data_dict:
metrics = calculate_etf_metrics(
ticker,
price_data_dict[ticker],
dividend_data_dict[ticker]
)
etf_metrics_list.append(metrics)
else:
logger.warning(f"Missing price or dividend data for {ticker}")
except Exception as e:
logger.error(f"Error processing {ticker}: {str(e)}")
continue
if not etf_metrics_list:
st.error("Failed to fetch ETF data")
return pd.DataFrame(), pd.DataFrame()
# Calculate correlation matrix
correlation_matrix = calculate_correlation_matrix(price_data_dict)
# Optimize portfolio allocation
allocations = optimize_portfolio_allocation(
etf_metrics_list,
risk_tolerance,
correlation_matrix
)
if not allocations:
st.error("Failed to optimize portfolio allocation")
return pd.DataFrame(), pd.DataFrame()
# Create final allocation DataFrame
final_alloc = pd.DataFrame(etf_metrics_list)
# Ensure all required columns exist
required_columns = [
"Ticker",
"Yield (%)",
"Price",
"Risk Level"
]
for col in required_columns:
if col not in final_alloc.columns:
logger.error(f"Missing required column: {col}")
st.error(f"Missing required column: {col}")
return pd.DataFrame(), pd.DataFrame()
# Add allocation column
final_alloc["Allocation (%)"] = final_alloc["Ticker"].map(allocations)
if mode == "income_target":
# Calculate required capital for income target
monthly_income = target
annual_income = monthly_income * 12
# Calculate weighted average yield
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
logger.info(f"Calculated weighted yield: {weighted_yield:.2f}%")
# Validate weighted yield
if weighted_yield <= 0:
st.error(f"Invalid weighted yield calculated: {weighted_yield:.2f}%")
return pd.DataFrame(), pd.DataFrame()
# Calculate required capital based on weighted yield
required_capital = (annual_income / weighted_yield) * 100
logger.info(f"Calculated required capital: ${required_capital:,.2f}")
else:
required_capital = target
logger.info(f"Using provided capital: ${required_capital:,.2f}")
# Calculate capital allocation and income
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
logger.info(f"Final allocation calculated:\n{final_alloc}")
# Apply erosion if enabled
if enable_erosion:
# Apply a small erosion factor to yield and price
erosion_factor = 0.98 # 2% erosion per year
final_alloc["Yield (%)"] = final_alloc["Yield (%)"] * erosion_factor
final_alloc["Price"] = final_alloc["Price"] * erosion_factor
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
logger.info("Applied erosion factor to yield and price")
# Validate final calculations
total_capital = final_alloc["Capital Allocated ($)"].sum()
total_income = final_alloc["Income Contributed ($)"].sum()
effective_yield = (total_income / total_capital) * 100
logger.info(f"Final validation - Total Capital: ${total_capital:,.2f}, Total Income: ${total_income:,.2f}, Effective Yield: {effective_yield:.2f}%")
if effective_yield <= 0:
st.error(f"Invalid effective yield calculated: {effective_yield:.2f}%")
return pd.DataFrame(), pd.DataFrame()
# Create ETF data DataFrame for display
etf_data = pd.DataFrame(etf_metrics_list)
return etf_data, final_alloc
except Exception as e:
st.error(f"Error in portfolio simulation: {str(e)}")
logger.error(f"Error in run_portfolio_simulation: {str(e)}")
logger.error(traceback.format_exc())
return pd.DataFrame(), pd.DataFrame()
def portfolio_summary(final_alloc: pd.DataFrame) -> None:
"""
Display a summary of the portfolio allocation.
Args:
final_alloc: DataFrame containing the portfolio allocation
"""
if final_alloc is None or final_alloc.empty:
st.warning("No portfolio data available.")
return
try:
# Calculate key metrics
total_capital = final_alloc["Capital Allocated ($)"].sum()
total_income = final_alloc["Income Contributed ($)"].sum()
# Calculate weighted average yield
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
# Display metrics in columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Capital", f"${total_capital:,.2f}")
with col2:
st.metric("Annual Income", f"${total_income:,.2f}")
st.metric("Monthly Income", f"${total_income/12:,.2f}")
with col3:
st.metric("Average Yield", f"{weighted_yield:.2f}%")
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
# Display allocation chart
fig = px.pie(
final_alloc,
values="Allocation (%)",
names="Ticker",
title="Portfolio Allocation by ETF",
hover_data={
"Ticker": True,
"Allocation (%)": ":.2f",
"Yield (%)": ":.2f",
"Capital Allocated ($)": ":,.2f",
"Income Contributed ($)": ":,.2f"
}
)
st.plotly_chart(fig, use_container_width=True)
# Display detailed allocation table
st.subheader("Detailed Allocation")
display_df = final_alloc.copy()
display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
# Format the display
st.dataframe(
display_df.style.format({
"Allocation (%)": "{:.2f}%",
"Yield (%)": "{:.2f}%",
"Price": "${:,.2f}",
"Shares": "{:,.4f}",
"Capital Allocated ($)": "${:,.2f}",
"Monthly Income": "${:,.2f}",
"Income Contributed ($)": "${:,.2f}"
}),
use_container_width=True
)
except Exception as e:
st.error(f"Error calculating portfolio summary: {str(e)}")
logger.error(f"Error in portfolio_summary: {str(e)}")
logger.error(traceback.format_exc())
def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool:
"""
Save portfolio allocation to a JSON file.
Args:
portfolio_name: Name of the portfolio
final_alloc: DataFrame containing portfolio allocation
mode: Portfolio mode ("Income Target" or "Capital Target")
target: Target value (income or capital)
Returns:
bool: True if save was successful, False otherwise
"""
try:
# Create portfolios directory if it doesn't exist
portfolios_dir = Path("portfolios")
portfolios_dir.mkdir(exist_ok=True)
# Prepare portfolio data
portfolio_data = {
"name": portfolio_name,
"created_at": datetime.now().isoformat(),
"mode": mode,
"target": target,
"allocations": []
}
# Convert DataFrame to list of dictionaries
for _, row in final_alloc.iterrows():
allocation = {
"ticker": row["Ticker"],
"allocation": float(row["Allocation (%)"]),
"yield": float(row["Yield (%)"]),
"price": float(row["Price"]),
"risk_level": row["Risk Level"]
}
portfolio_data["allocations"].append(allocation)
# Save to JSON file
file_path = portfolios_dir / f"{portfolio_name}.json"
with open(file_path, 'w') as f:
json.dump(portfolio_data, f, indent=2)
return True
except Exception as e:
st.error(f"Error saving portfolio: {str(e)}")
return False
def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]:
"""
Load portfolio allocation from a JSON file.
Args:
portfolio_name: Name of the portfolio to load
Returns:
Tuple containing:
- DataFrame with portfolio allocation
- Portfolio mode
- Target value
"""
try:
# Check if portfolio exists
file_path = Path("portfolios") / f"{portfolio_name}.json"
if not file_path.exists():
st.error(f"Portfolio '{portfolio_name}' not found.")
return None, None, None
# Load portfolio data
with open(file_path, 'r') as f:
portfolio_data = json.load(f)
# Convert allocations to DataFrame
allocations = portfolio_data["allocations"]
df = pd.DataFrame(allocations)
# Rename columns to match expected format
df = df.rename(columns={
"allocation": "Allocation (%)",
"yield": "Yield (%)",
"price": "Price"
})
return df, portfolio_data["mode"], portfolio_data["target"]
except Exception as e:
st.error(f"Error loading portfolio: {str(e)}")
return None, None, None
def list_saved_portfolios() -> List[str]:
"""
List all saved portfolios.
Returns:
List of portfolio names
"""
try:
portfolios_dir = Path("portfolios")
if not portfolios_dir.exists():
return []
# Get all JSON files in the portfolios directory
portfolio_files = list(portfolios_dir.glob("*.json"))
# Extract portfolio names from filenames
portfolio_names = [f.stem for f in portfolio_files]
return sorted(portfolio_names)
except Exception as e:
st.error(f"Error listing portfolios: {str(e)}")
return []
def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Allocate portfolio for income target.
Args:
df: DataFrame with ETF data
target: Monthly income target
etf_allocations: List of ETF allocations
Returns:
DataFrame with final allocation
"""
try:
# Create final allocation DataFrame
final_alloc = df.copy()
# Set allocations
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
# Calculate required capital for income target
monthly_income = target
annual_income = monthly_income * 12
avg_yield = final_alloc["Yield (%)"].mean()
required_capital = (annual_income / avg_yield) * 100
# Calculate capital allocation and income
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
return final_alloc
except Exception as e:
st.error(f"Error in income allocation: {str(e)}")
return None
def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Allocate portfolio for capital target.
Args:
df: DataFrame with ETF data
initial_capital: Initial capital amount
etf_allocations: List of ETF allocations
Returns:
DataFrame with final allocation
"""
try:
# Create final allocation DataFrame
final_alloc = df.copy()
# Set allocations
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
# Calculate capital allocation and income
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
return final_alloc
except Exception as e:
st.error(f"Error in capital allocation: {str(e)}")
return None
def reset_simulation():
"""Reset all simulation data and state."""
st.session_state.simulation_run = False
st.session_state.df_data = None
st.session_state.final_alloc = None
st.session_state.mode = 'Capital Target'
st.session_state.target = 0
st.session_state.initial_capital = 0
st.session_state.enable_drip = False
st.session_state.enable_erosion = False
st.rerun()
def test_fmp_connection():
"""Test the FMP API connection and display status."""
try:
if not FMP_API_KEY:
return False, "No API key found"
session = get_fmp_session()
test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}"
response = session.get(test_url)
if response.status_code == 200:
data = response.json()
if data and isinstance(data, list) and len(data) > 0:
return True, "Connected"
return False, f"Error: {response.status_code}"
except Exception as e:
return False, f"Error: {str(e)}"
# Set page config
st.set_page_config(
page_title="ETF Portfolio Builder",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state variables
if 'simulation_run' not in st.session_state:
st.session_state.simulation_run = False
if 'df_data' not in st.session_state:
st.session_state.df_data = None
if 'final_alloc' not in st.session_state:
st.session_state.final_alloc = None
if 'mode' not in st.session_state:
st.session_state.mode = 'Capital Target'
if 'target' not in st.session_state:
st.session_state.target = 0
if 'initial_capital' not in st.session_state:
st.session_state.initial_capital = 0
if 'enable_drip' not in st.session_state:
st.session_state.enable_drip = False
if 'enable_erosion' not in st.session_state:
st.session_state.enable_erosion = False
# Main title
st.title("📈 ETF Portfolio Builder")
# Sidebar for simulation parameters
with st.sidebar:
st.header("Simulation Parameters")
# Add refresh data button at the top
if st.button("🔄 Refresh Data", use_container_width=True):
st.info("Refreshing ETF data...")
# Add your data refresh logic here
st.success("Data refreshed successfully!")
# Mode selection
simulation_mode = st.radio(
"Select Simulation Mode",
["Capital Target", "Income Target"]
)
if simulation_mode == "Income Target":
monthly_target = st.number_input(
"Monthly Income Target ($)",
min_value=100.0,
max_value=100000.0,
value=1000.0,
step=100.0
)
ANNUAL_TARGET = monthly_target * 12
else:
initial_capital = st.number_input(
"Initial Capital ($)",
min_value=1000.0,
max_value=1000000.0,
value=100000.0,
step=1000.0
)
# Risk tolerance
risk_tolerance = st.select_slider(
"Risk Tolerance",
options=["Conservative", "Moderate", "Aggressive"],
value="Moderate"
)
# Additional options
st.subheader("Additional Options")
# DRIP option
enable_drip = st.radio(
"Enable Dividend Reinvestment (DRIP)",
["Yes", "No"],
index=1
)
# Erosion options
enable_erosion = st.radio(
"Enable NAV & Yield Erosion",
["Yes", "No"],
index=1
)
# ETF Selection
st.subheader("ETF Selection")
# Create a form for ETF selection
with st.form("etf_selection_form"):
# Number of ETFs
num_etfs = st.number_input("Number of ETFs", min_value=1, max_value=10, value=3, step=1)
# Create columns for ETF inputs
etf_inputs = []
for i in range(num_etfs):
ticker = st.text_input(f"ETF {i+1} Ticker", key=f"ticker_{i}")
if ticker: # Only add non-empty tickers
etf_inputs.append({"ticker": ticker.upper().strip()})
# Submit button
submitted = st.form_submit_button("Run Portfolio Simulation", type="primary")
if submitted:
try:
if not etf_inputs:
st.error("Please enter at least one ETF ticker")
else:
logger.info(f"Form submitted with {len(etf_inputs)} ETFs: {etf_inputs}")
# Store parameters in session state
st.session_state.mode = simulation_mode
st.session_state.enable_drip = enable_drip == "Yes"
st.session_state.enable_erosion = enable_erosion == "Yes"
if simulation_mode == "Income Target":
st.session_state.target = monthly_target
else:
st.session_state.target = initial_capital
st.session_state.initial_capital = initial_capital
# Run simulation
df_data, final_alloc = run_portfolio_simulation(
simulation_mode.lower().replace(" ", "_"),
st.session_state.target,
risk_tolerance,
etf_inputs,
st.session_state.enable_drip,
st.session_state.enable_erosion
)
if df_data is not None and not df_data.empty and final_alloc is not None and not final_alloc.empty:
# Store results in session state
st.session_state.simulation_run = True
st.session_state.df_data = df_data
st.session_state.final_alloc = final_alloc
st.success("Portfolio simulation completed!")
st.rerun()
else:
st.error("Simulation failed to generate valid results. Please check your inputs and try again.")
logger.error("Simulation returned empty DataFrames")
logger.error(f"df_data: {df_data}")
logger.error(f"final_alloc: {final_alloc}")
except Exception as e:
st.error(f"Error running simulation: {str(e)}")
logger.error(f"Error in form submission: {str(e)}")
logger.error(traceback.format_exc())
# Add reset simulation button at the bottom of sidebar
if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"):
reset_simulation()
# Add FMP connection status to the navigation bar
st.sidebar.markdown("---")
st.sidebar.subheader("FMP API Status")
connection_status, message = test_fmp_connection()
if connection_status:
st.sidebar.success(f"✅ FMP API: {message}")
else:
st.sidebar.error(f"❌ FMP API: {message}")
# Display results and interactive allocation adjustment UI after simulation is run
if st.session_state.simulation_run and st.session_state.df_data is not None:
df = st.session_state.df_data
final_alloc = st.session_state.final_alloc if hasattr(st.session_state, 'final_alloc') else None
# Validate final_alloc DataFrame
if final_alloc is None or final_alloc.empty:
st.error("No portfolio data available. Please run the simulation again.")
st.session_state.simulation_run = False
else:
# Verify required columns exist
required_columns = ["Capital Allocated ($)", "Yield (%)", "Price", "Ticker"]
missing_columns = [col for col in required_columns if col not in final_alloc.columns]
if missing_columns:
st.error(f"Missing required columns in portfolio data: {', '.join(missing_columns)}")
st.session_state.simulation_run = False
else:
# Create tabs for better organization
tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"])
with tab1:
st.subheader("💰 Portfolio Summary")
portfolio_summary(final_alloc)
# Display mode-specific information
if st.session_state.mode == "Income Target":
try:
monthly_target = st.session_state.target
ANNUAL_TARGET = monthly_target * 12
total_capital = final_alloc["Capital Allocated ($)"].sum()
st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).")
except Exception as e:
st.error(f"Error displaying income target information: {str(e)}")
else:
try:
initial_capital = st.session_state.initial_capital
annual_income = final_alloc["Income Contributed ($)"].sum()
monthly_income = annual_income / 12
st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).")
except Exception as e:
st.error(f"Error displaying capital investment information: {str(e)}")
# Add save/load section
st.subheader("💾 Save/Load Portfolio")
# Create two columns for save and load
save_col, load_col = st.columns(2)
with save_col:
st.write("Save current portfolio")
portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name")
if st.button("Save Portfolio", key="save_portfolio"):
if portfolio_name:
if save_portfolio(portfolio_name, final_alloc,
st.session_state.mode,
st.session_state.target):
st.success(f"Portfolio '{portfolio_name}' saved successfully!")
else:
st.warning("Please enter a portfolio name.")
with load_col:
st.write("Load saved portfolio")
if st.button("Show Saved Portfolios", key="show_portfolios"):
saved_portfolios = list_saved_portfolios()
if saved_portfolios:
selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio")
if st.button("Load Portfolio", key="load_portfolio_btn"):
loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio)
if loaded_df is not None:
st.session_state.final_alloc = loaded_df
st.session_state.mode = loaded_mode
st.session_state.target = loaded_target
st.success(f"Portfolio '{selected_portfolio}' loaded successfully!")
st.rerun()
else:
st.info("No saved portfolios found.")
# Display full detailed allocation table
st.subheader("📊 Capital Allocation Details")
try:
# Format currencies for better readability
display_df = final_alloc.copy()
# Calculate shares for each ETF
display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"]
display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}")
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}")
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}")
display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%")
display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}")
# Create a form for the allocation table
with st.form("allocation_form"):
# Create an editable DataFrame
edited_df = st.data_editor(
display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]],
column_config={
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
"Allocation (%)": st.column_config.NumberColumn(
"Allocation (%)",
min_value=0.0,
max_value=100.0,
step=0.1,
format="%.1f"
),
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
"Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True),
"Risk Level": st.column_config.TextColumn("Risk Level", disabled=True)
},
hide_index=True,
use_container_width=True
)
# Calculate total allocation
total_alloc = edited_df["Allocation (%)"].sum()
# Display total allocation with color coding
if abs(total_alloc - 100) <= 0.1:
st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None)
else:
st.metric("Total Allocation (%)", f"{total_alloc:.2f}",
delta=f"{total_alloc - 100:.2f}",
delta_color="off")
if abs(total_alloc - 100) > 0.1:
st.warning("Total allocation should be 100%")
# Create columns for quick actions
col1, col2, col3 = st.columns(3)
with col1:
equal_weight = st.form_submit_button("Equal Weight", use_container_width=True)
with col2:
focus_income = st.form_submit_button("Focus on Income", use_container_width=True)
with col3:
focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True)
# Submit button for manual edits
submitted = st.form_submit_button("Update Allocations",
disabled=abs(total_alloc - 100) > 0.1,
type="primary",
use_container_width=True)
# Handle form submission
if submitted:
try:
# Convert the edited allocations to a dictionary
new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()}
# Convert to the format expected by allocation functions
etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio updated with new allocations!")
st.rerun()
else:
st.error("Failed to update portfolio. Please try again.")
except Exception as e:
st.error(f"Error updating allocations: {str(e)}")
# Handle quick actions
if equal_weight:
try:
# Calculate equal weight allocation
num_etfs = len(edited_df)
equal_allocation = 100 / num_etfs
# Create new allocations in the format expected by allocation functions
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to equal weight!")
st.rerun()
except Exception as e:
st.error(f"Error applying equal weight: {str(e)}")
elif focus_income:
try:
# Sort by yield and adjust allocations
sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False)
total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum()
# Calculate new allocations based on yield
etf_allocations = []
for _, row in sorted_alloc.iterrows():
yield_val = float(row["Yield (%)"].rstrip('%'))
allocation = (yield_val / total_yield) * 100
etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation})
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to focus on income!")
st.rerun()
except Exception as e:
st.error(f"Error focusing on income: {str(e)}")
elif focus_capital:
try:
# Calculate equal weight allocation (same as equal weight)
num_etfs = len(edited_df)
equal_allocation = 100 / num_etfs
# Create new allocations in the format expected by allocation functions
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
# Get the mode and target from session state
mode = st.session_state.mode
target = st.session_state.target
initial_capital = st.session_state.initial_capital
# Use the same allocation functions as the main navigation
if mode == "Income Target":
final_alloc = allocate_for_income(df, target, etf_allocations)
else: # Capital Target
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
if final_alloc is not None:
st.session_state.final_alloc = final_alloc
st.success("Portfolio adjusted to focus on capital!")
st.rerun()
except Exception as e:
st.error(f"Error focusing on capital: {str(e)}")
except Exception as e:
st.error(f"Error displaying allocation details: {str(e)}")
logger.error(f"Error in allocation display: {str(e)}")
logger.error(traceback.format_exc())