1833 lines
76 KiB
Python
1833 lines
76 KiB
Python
import streamlit as st
|
|
import pandas as pd
|
|
import numpy as np
|
|
import plotly.express as px
|
|
import plotly.graph_objects as go
|
|
from pathlib import Path
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Tuple, Optional, Any, Callable, T
|
|
import time
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import yfinance as yf
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
import os
|
|
import sys
|
|
import logging
|
|
import traceback
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv(override=True) # Force reload of environment variables
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global settings
|
|
USE_FMP_API = True # Default to using FMP API if available
|
|
|
|
# FMP API configuration
|
|
FMP_API_KEY = os.getenv('FMP_API_KEY')
|
|
if not FMP_API_KEY:
|
|
logger.warning("FMP_API_KEY not found in environment variables")
|
|
logger.warning("Current environment variables: %s", dict(os.environ))
|
|
logger.warning("Current working directory: %s", os.getcwd())
|
|
logger.warning("Files in current directory: %s", os.listdir('.'))
|
|
if os.path.exists('.env'):
|
|
logger.warning(".env file exists")
|
|
with open('.env', 'r') as f:
|
|
logger.warning("Contents of .env file: %s", f.read())
|
|
else:
|
|
logger.warning(".env file does not exist")
|
|
else:
|
|
logger.info("FMP_API_KEY loaded successfully")
|
|
# Mask the API key for security in logs
|
|
masked_key = FMP_API_KEY[:4] + '*' * (len(FMP_API_KEY) - 8) + FMP_API_KEY[-4:]
|
|
logger.info("FMP_API_KEY (masked): %s", masked_key)
|
|
|
|
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
|
|
|
|
# High-yield ETFs reference data
|
|
HIGH_YIELD_ETFS = {
|
|
"MSTY": {"expected_yield": 125.0, "frequency": "Monthly"},
|
|
"SMCY": {"expected_yield": 100.0, "frequency": "Monthly"},
|
|
"TSLY": {"expected_yield": 85.0, "frequency": "Monthly"},
|
|
"NVDY": {"expected_yield": 75.0, "frequency": "Monthly"},
|
|
"ULTY": {"expected_yield": 70.0, "frequency": "Monthly"},
|
|
"JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"},
|
|
"JEPI": {"expected_yield": 7.8, "frequency": "Monthly"},
|
|
"XYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
|
|
"QYLD": {"expected_yield": 12.0, "frequency": "Monthly"},
|
|
"RYLD": {"expected_yield": 12.0, "frequency": "Monthly"}
|
|
}
|
|
|
|
def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: pd.DataFrame) -> Dict[str, Any]:
|
|
"""
|
|
Calculate ETF metrics based on available data.
|
|
|
|
Args:
|
|
ticker: ETF ticker
|
|
price_data: DataFrame with price history
|
|
dividend_data: DataFrame with dividend history
|
|
|
|
Returns:
|
|
Dictionary with calculated metrics
|
|
"""
|
|
metrics = {
|
|
"Ticker": ticker,
|
|
"Yield (%)": 0.0,
|
|
"Price": 0.0,
|
|
"volatility": 0.0,
|
|
"sharpe_ratio": 0.0,
|
|
"sortino_ratio": 0.0,
|
|
"correlation": 0.0,
|
|
"payout_ratio": 0.0,
|
|
"score": 0.0,
|
|
"Risk Level": "Unknown",
|
|
"missing_metrics": []
|
|
}
|
|
|
|
try:
|
|
# Get current price from price data
|
|
if not price_data.empty:
|
|
metrics["Price"] = price_data["close"].iloc[-1]
|
|
else:
|
|
metrics["missing_metrics"].append("Price")
|
|
|
|
# Calculate yield if dividend data is available
|
|
if not dividend_data.empty and metrics["Price"] > 0:
|
|
# Convert date column to datetime if it's not already
|
|
dividend_data["date"] = pd.to_datetime(dividend_data["date"])
|
|
|
|
# Get dividends from the last 12 months
|
|
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
|
|
recent_dividends = dividend_data[dividend_data["date"] >= one_year_ago]
|
|
|
|
if not recent_dividends.empty:
|
|
# Calculate TTM dividend
|
|
ttm_dividend = recent_dividends["dividend"].sum()
|
|
|
|
# Calculate annual yield
|
|
metrics["Yield (%)"] = (ttm_dividend / metrics["Price"]) * 100
|
|
|
|
logger.info(f"Calculated yield for {ticker}: {metrics['Yield (%)']:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${metrics['Price']:.2f})")
|
|
else:
|
|
logger.warning(f"No recent dividends found for {ticker}")
|
|
metrics["missing_metrics"].append("Yield (%)")
|
|
else:
|
|
metrics["missing_metrics"].append("Yield (%)")
|
|
|
|
# Calculate volatility if price data is available
|
|
if len(price_data) > 1:
|
|
returns = price_data["close"].pct_change().dropna()
|
|
metrics["volatility"] = returns.std() * np.sqrt(252) * 100 # Annualized volatility
|
|
else:
|
|
metrics["missing_metrics"].append("volatility")
|
|
|
|
# Calculate Sharpe ratio if we have returns and risk-free rate
|
|
if len(price_data) > 1:
|
|
risk_free_rate = 0.05 # Assuming 5% risk-free rate
|
|
excess_returns = returns - (risk_free_rate / 252)
|
|
if excess_returns.std() != 0:
|
|
metrics["sharpe_ratio"] = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252)
|
|
else:
|
|
metrics["missing_metrics"].append("sharpe_ratio")
|
|
|
|
# Calculate Sortino ratio if we have returns
|
|
if len(price_data) > 1:
|
|
downside_returns = returns[returns < 0]
|
|
if len(downside_returns) > 0 and downside_returns.std() != 0:
|
|
metrics["sortino_ratio"] = (returns.mean() / downside_returns.std()) * np.sqrt(252)
|
|
else:
|
|
metrics["missing_metrics"].append("sortino_ratio")
|
|
|
|
# Categorize risk based on available metrics
|
|
metrics["Risk Level"] = categorize_etf_risk(metrics)
|
|
|
|
# Calculate overall score
|
|
metrics["score"] = calculate_etf_score(metrics)
|
|
|
|
logger.info(f"Calculated metrics for {ticker}: {metrics}")
|
|
return metrics
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating metrics for {ticker}: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return metrics
|
|
|
|
def categorize_etf_risk(metrics: Dict[str, Any]) -> str:
|
|
"""
|
|
Categorize ETF risk based on available metrics.
|
|
|
|
Args:
|
|
metrics: Dictionary with ETF metrics
|
|
|
|
Returns:
|
|
Risk category: "Low", "Medium", or "High"
|
|
"""
|
|
try:
|
|
# Initialize risk score
|
|
risk_score = 0
|
|
available_metrics = 0
|
|
|
|
# Yield-based risk (higher yield = higher risk)
|
|
if "Yield (%)" not in metrics["missing_metrics"]:
|
|
if metrics["Yield (%)"] > 10:
|
|
risk_score += 3
|
|
elif metrics["Yield (%)"] > 6:
|
|
risk_score += 2
|
|
else:
|
|
risk_score += 1
|
|
available_metrics += 1
|
|
|
|
# Volatility-based risk
|
|
if "volatility" not in metrics["missing_metrics"]:
|
|
if metrics["volatility"] > 20:
|
|
risk_score += 3
|
|
elif metrics["volatility"] > 15:
|
|
risk_score += 2
|
|
else:
|
|
risk_score += 1
|
|
available_metrics += 1
|
|
|
|
# Sharpe ratio-based risk (lower Sharpe = higher risk)
|
|
if "sharpe_ratio" not in metrics["missing_metrics"]:
|
|
if metrics["sharpe_ratio"] < 0.5:
|
|
risk_score += 3
|
|
elif metrics["sharpe_ratio"] < 1.0:
|
|
risk_score += 2
|
|
else:
|
|
risk_score += 1
|
|
available_metrics += 1
|
|
|
|
# Sortino ratio-based risk (lower Sortino = higher risk)
|
|
if "sortino_ratio" not in metrics["missing_metrics"]:
|
|
if metrics["sortino_ratio"] < 0.5:
|
|
risk_score += 3
|
|
elif metrics["sortino_ratio"] < 1.0:
|
|
risk_score += 2
|
|
else:
|
|
risk_score += 1
|
|
available_metrics += 1
|
|
|
|
# Calculate average risk score
|
|
if available_metrics > 0:
|
|
avg_risk_score = risk_score / available_metrics
|
|
if avg_risk_score > 2.5:
|
|
return "High"
|
|
elif avg_risk_score > 1.5:
|
|
return "Medium"
|
|
else:
|
|
return "Low"
|
|
|
|
# If no metrics available, use yield as fallback
|
|
if metrics["Yield (%)"] > 10:
|
|
return "High"
|
|
elif metrics["Yield (%)"] > 6:
|
|
return "Medium"
|
|
else:
|
|
return "Low"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error categorizing ETF risk: {str(e)}")
|
|
return "Unknown"
|
|
|
|
def calculate_etf_score(metrics: Dict[str, Any]) -> float:
|
|
"""
|
|
Calculate overall ETF score based on available metrics.
|
|
|
|
Args:
|
|
metrics: Dictionary with ETF metrics
|
|
|
|
Returns:
|
|
Overall score (0-100)
|
|
"""
|
|
try:
|
|
score = 0
|
|
available_metrics = 0
|
|
|
|
# Yield score (0-25 points)
|
|
if "Yield (%)" not in metrics["missing_metrics"]:
|
|
if metrics["Yield (%)"] > 10:
|
|
score += 25
|
|
elif metrics["Yield (%)"] > 6:
|
|
score += 20
|
|
elif metrics["Yield (%)"] > 3:
|
|
score += 15
|
|
else:
|
|
score += 10
|
|
available_metrics += 1
|
|
|
|
# Volatility score (0-25 points)
|
|
if "volatility" not in metrics["missing_metrics"]:
|
|
if metrics["volatility"] < 10:
|
|
score += 25
|
|
elif metrics["volatility"] < 15:
|
|
score += 20
|
|
elif metrics["volatility"] < 20:
|
|
score += 15
|
|
else:
|
|
score += 10
|
|
available_metrics += 1
|
|
|
|
# Sharpe ratio score (0-25 points)
|
|
if "sharpe_ratio" not in metrics["missing_metrics"]:
|
|
if metrics["sharpe_ratio"] > 1.5:
|
|
score += 25
|
|
elif metrics["sharpe_ratio"] > 1.0:
|
|
score += 20
|
|
elif metrics["sharpe_ratio"] > 0.5:
|
|
score += 15
|
|
else:
|
|
score += 10
|
|
available_metrics += 1
|
|
|
|
# Sortino ratio score (0-25 points)
|
|
if "sortino_ratio" not in metrics["missing_metrics"]:
|
|
if metrics["sortino_ratio"] > 1.5:
|
|
score += 25
|
|
elif metrics["sortino_ratio"] > 1.0:
|
|
score += 20
|
|
elif metrics["sortino_ratio"] > 0.5:
|
|
score += 15
|
|
else:
|
|
score += 10
|
|
available_metrics += 1
|
|
|
|
# Calculate final score
|
|
if available_metrics > 0:
|
|
return score / available_metrics
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating ETF score: {str(e)}")
|
|
return 0
|
|
|
|
def calculate_correlation_matrix(price_data_dict: Dict[str, pd.DataFrame]) -> pd.DataFrame:
|
|
"""
|
|
Calculate correlation matrix between ETFs.
|
|
|
|
Args:
|
|
price_data_dict: Dictionary of price DataFrames for each ETF
|
|
|
|
Returns:
|
|
DataFrame with correlation matrix
|
|
"""
|
|
try:
|
|
# Create a DataFrame with returns for all ETFs
|
|
returns_df = pd.DataFrame()
|
|
|
|
for ticker, price_data in price_data_dict.items():
|
|
if len(price_data) > 1:
|
|
returns = price_data["close"].pct_change().dropna()
|
|
returns_df[ticker] = returns
|
|
|
|
if returns_df.empty:
|
|
logger.warning("No valid price data for correlation calculation")
|
|
return pd.DataFrame()
|
|
|
|
# Calculate correlation matrix
|
|
corr_matrix = returns_df.corr()
|
|
logger.info(f"Correlation matrix calculated:\n{corr_matrix}")
|
|
return corr_matrix
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating correlation matrix: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
def optimize_portfolio_allocation(
|
|
etf_metrics: List[Dict[str, Any]],
|
|
risk_tolerance: str,
|
|
correlation_matrix: pd.DataFrame
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Optimize portfolio allocation based on risk tolerance and ETF metrics.
|
|
|
|
Args:
|
|
etf_metrics: List of ETF metrics dictionaries
|
|
risk_tolerance: Risk tolerance level ("Conservative", "Moderate", "Aggressive")
|
|
correlation_matrix: Correlation matrix between ETFs
|
|
|
|
Returns:
|
|
List of dictionaries with ETF tickers and their allocations
|
|
"""
|
|
try:
|
|
logger.info(f"Optimizing portfolio allocation for {risk_tolerance} risk tolerance")
|
|
logger.info(f"ETF metrics: {etf_metrics}")
|
|
|
|
# Group ETFs by risk category
|
|
low_risk = [etf for etf in etf_metrics if etf.get("Risk Level", "Unknown") == "Low"]
|
|
medium_risk = [etf for etf in etf_metrics if etf.get("Risk Level", "Unknown") == "Medium"]
|
|
high_risk = [etf for etf in etf_metrics if etf.get("Risk Level", "Unknown") == "High"]
|
|
|
|
logger.info(f"Risk groups - Low: {len(low_risk)}, Medium: {len(medium_risk)}, High: {len(high_risk)}")
|
|
|
|
# Sort ETFs by score within each risk category
|
|
low_risk.sort(key=lambda x: x.get("score", 0), reverse=True)
|
|
medium_risk.sort(key=lambda x: x.get("score", 0), reverse=True)
|
|
high_risk.sort(key=lambda x: x.get("score", 0), reverse=True)
|
|
|
|
# Initialize allocations
|
|
allocations = []
|
|
|
|
if risk_tolerance == "Conservative":
|
|
# Conservative allocation
|
|
if low_risk:
|
|
# Allocate 50% to low-risk ETFs
|
|
low_risk_alloc = 50.0 / len(low_risk)
|
|
for etf in low_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": low_risk_alloc})
|
|
|
|
if medium_risk:
|
|
# Allocate 30% to medium-risk ETFs
|
|
medium_risk_alloc = 30.0 / len(medium_risk)
|
|
for etf in medium_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": medium_risk_alloc})
|
|
|
|
if high_risk:
|
|
# Allocate 20% to high-risk ETFs
|
|
high_risk_alloc = 20.0 / len(high_risk)
|
|
for etf in high_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": high_risk_alloc})
|
|
|
|
elif risk_tolerance == "Moderate":
|
|
# Moderate allocation
|
|
if low_risk:
|
|
# Allocate 30% to low-risk ETFs
|
|
low_risk_alloc = 30.0 / len(low_risk)
|
|
for etf in low_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": low_risk_alloc})
|
|
|
|
if medium_risk:
|
|
# Allocate 40% to medium-risk ETFs
|
|
medium_risk_alloc = 40.0 / len(medium_risk)
|
|
for etf in medium_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": medium_risk_alloc})
|
|
|
|
if high_risk:
|
|
# Allocate 30% to high-risk ETFs
|
|
high_risk_alloc = 30.0 / len(high_risk)
|
|
for etf in high_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": high_risk_alloc})
|
|
|
|
else: # Aggressive
|
|
# Aggressive allocation
|
|
if low_risk:
|
|
# Allocate 20% to low-risk ETFs
|
|
low_risk_alloc = 20.0 / len(low_risk)
|
|
for etf in low_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": low_risk_alloc})
|
|
|
|
if medium_risk:
|
|
# Allocate 40% to medium-risk ETFs
|
|
medium_risk_alloc = 40.0 / len(medium_risk)
|
|
for etf in medium_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": medium_risk_alloc})
|
|
|
|
if high_risk:
|
|
# Allocate 40% to high-risk ETFs
|
|
high_risk_alloc = 40.0 / len(high_risk)
|
|
for etf in high_risk:
|
|
allocations.append({"ticker": etf["Ticker"], "allocation": high_risk_alloc})
|
|
|
|
# If no allocations were made, use equal weighting
|
|
if not allocations:
|
|
logger.warning("No risk-based allocations made, using equal weighting")
|
|
total_etfs = len(etf_metrics)
|
|
equal_alloc = 100.0 / total_etfs
|
|
allocations = [{"ticker": etf["Ticker"], "allocation": equal_alloc} for etf in etf_metrics]
|
|
|
|
logger.info(f"Final allocations: {allocations}")
|
|
return allocations
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error optimizing portfolio allocation: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return []
|
|
|
|
def adjust_allocations_for_correlation(
|
|
allocations: Dict[str, float],
|
|
correlation_matrix: pd.DataFrame
|
|
) -> Dict[str, float]:
|
|
"""
|
|
Adjust allocations to reduce correlation between ETFs.
|
|
|
|
Args:
|
|
allocations: Dictionary with current allocations
|
|
correlation_matrix: Correlation matrix between ETFs
|
|
|
|
Returns:
|
|
Dictionary with adjusted allocations
|
|
"""
|
|
try:
|
|
adjusted_allocations = allocations.copy()
|
|
|
|
# Get highly correlated pairs (correlation > 0.7)
|
|
high_corr_pairs = []
|
|
for i in range(len(correlation_matrix.columns)):
|
|
for j in range(i + 1, len(correlation_matrix.columns)):
|
|
ticker1 = correlation_matrix.columns[i]
|
|
ticker2 = correlation_matrix.columns[j]
|
|
if abs(correlation_matrix.iloc[i, j]) > 0.7:
|
|
high_corr_pairs.append((ticker1, ticker2))
|
|
|
|
# Adjust allocations for highly correlated pairs
|
|
for ticker1, ticker2 in high_corr_pairs:
|
|
if ticker1 in adjusted_allocations and ticker2 in adjusted_allocations:
|
|
# Reduce allocation to the ETF with lower score
|
|
if adjusted_allocations[ticker1] > adjusted_allocations[ticker2]:
|
|
reduction = adjusted_allocations[ticker1] * 0.1 # Reduce by 10%
|
|
adjusted_allocations[ticker1] -= reduction
|
|
adjusted_allocations[ticker2] += reduction
|
|
else:
|
|
reduction = adjusted_allocations[ticker2] * 0.1 # Reduce by 10%
|
|
adjusted_allocations[ticker2] -= reduction
|
|
adjusted_allocations[ticker1] += reduction
|
|
|
|
logger.info(f"Adjusted allocations for correlation: {adjusted_allocations}")
|
|
return adjusted_allocations
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adjusting allocations for correlation: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return allocations
|
|
|
|
def get_fmp_session():
|
|
"""Create a session with retry logic for FMP API calls."""
|
|
session = requests.Session()
|
|
retries = Retry(total=3, backoff_factor=0.5)
|
|
session.mount('https://', HTTPAdapter(max_retries=retries))
|
|
return session
|
|
|
|
def fetch_etf_data_fmp(ticker: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch ETF data from Financial Modeling Prep API.
|
|
|
|
Args:
|
|
ticker: ETF ticker symbol
|
|
|
|
Returns:
|
|
Dictionary with ETF data or None if failed
|
|
"""
|
|
try:
|
|
if not FMP_API_KEY:
|
|
logger.warning("FMP API key not configured in environment variables")
|
|
st.warning("FMP API key not found in environment variables. Some features may be limited.")
|
|
return None
|
|
|
|
session = get_fmp_session()
|
|
|
|
# Get profile data for current price
|
|
profile_url = f"{FMP_BASE_URL}/profile/{ticker}?apikey={FMP_API_KEY}"
|
|
logger.info(f"Making FMP API call to {profile_url}")
|
|
profile_response = session.get(profile_url)
|
|
st.session_state.api_calls += 1
|
|
logger.info(f"FMP API call count: {st.session_state.api_calls}")
|
|
|
|
if profile_response.status_code != 200:
|
|
logger.error(f"FMP API error for {ticker}: {profile_response.status_code}")
|
|
logger.error(f"Response content: {profile_response.text}")
|
|
return None
|
|
|
|
profile_data = profile_response.json()
|
|
logger.info(f"FMP profile response for {ticker}: {profile_data}")
|
|
|
|
if not profile_data or not isinstance(profile_data, list) or len(profile_data) == 0:
|
|
logger.warning(f"No profile data found for {ticker} in FMP")
|
|
return None
|
|
|
|
profile = profile_data[0]
|
|
current_price = float(profile.get('price', 0))
|
|
if current_price <= 0:
|
|
logger.error(f"Invalid price for {ticker}: {current_price}")
|
|
return None
|
|
|
|
# Get dividend history
|
|
dividend_url = f"{FMP_BASE_URL}/historical-price-full/stock_dividend/{ticker}?apikey={FMP_API_KEY}"
|
|
logger.info(f"Making FMP API call to {dividend_url}")
|
|
dividend_response = session.get(dividend_url)
|
|
st.session_state.api_calls += 1
|
|
logger.info(f"FMP API call count: {st.session_state.api_calls}")
|
|
|
|
if dividend_response.status_code != 200:
|
|
logger.error(f"FMP API error for dividend data: {dividend_response.status_code}")
|
|
logger.error(f"Response content: {dividend_response.text}")
|
|
return None
|
|
|
|
dividend_data = dividend_response.json()
|
|
logger.info(f"FMP dividend response for {ticker}: {dividend_data}")
|
|
|
|
if not dividend_data or "historical" not in dividend_data or not dividend_data["historical"]:
|
|
logger.warning(f"No dividend history found for {ticker}")
|
|
return None
|
|
|
|
# Calculate TTM dividend
|
|
dividends = pd.DataFrame(dividend_data["historical"])
|
|
dividends["date"] = pd.to_datetime(dividends["date"])
|
|
dividends = dividends.sort_values("date")
|
|
|
|
# Get dividends in the last 12 months
|
|
one_year_ago = pd.Timestamp.now() - pd.Timedelta(days=365)
|
|
recent_dividends = dividends[dividends["date"] >= one_year_ago]
|
|
|
|
if recent_dividends.empty:
|
|
logger.warning(f"No recent dividends found for {ticker}")
|
|
return None
|
|
|
|
# Calculate TTM dividend
|
|
ttm_dividend = recent_dividends["dividend"].sum()
|
|
|
|
# Calculate yield
|
|
yield_pct = (ttm_dividend / current_price) * 100
|
|
logger.info(f"Calculated yield for {ticker}: {yield_pct:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${current_price:.2f})")
|
|
|
|
# For high-yield ETFs, verify the yield is reasonable
|
|
if ticker in HIGH_YIELD_ETFS:
|
|
expected_yield = HIGH_YIELD_ETFS[ticker]["expected_yield"]
|
|
if yield_pct < expected_yield * 0.5: # If yield is less than 50% of expected
|
|
logger.error(f"Calculated yield {yield_pct:.2f}% for {ticker} is much lower than expected {expected_yield}%")
|
|
logger.error(f"TTM dividend: ${ttm_dividend:.2f}")
|
|
logger.error(f"Current price: ${current_price:.2f}")
|
|
logger.error(f"Recent dividends:\n{recent_dividends}")
|
|
|
|
# Determine distribution period
|
|
if len(recent_dividends) >= 2:
|
|
intervals = recent_dividends["date"].diff().dt.days.dropna()
|
|
avg_interval = intervals.mean()
|
|
if avg_interval <= 45:
|
|
dist_period = "Monthly"
|
|
elif avg_interval <= 100:
|
|
dist_period = "Quarterly"
|
|
elif avg_interval <= 200:
|
|
dist_period = "Semi-Annually"
|
|
else:
|
|
dist_period = "Annually"
|
|
else:
|
|
dist_period = "Unknown"
|
|
|
|
etf_data = {
|
|
"Ticker": ticker,
|
|
"Price": current_price,
|
|
"Yield (%)": yield_pct,
|
|
"Distribution Period": dist_period,
|
|
"Risk Level": "High" if ticker in HIGH_YIELD_ETFS else "Moderate"
|
|
}
|
|
logger.info(f"FMP data for {ticker}: {etf_data}")
|
|
return etf_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def fetch_etf_data_yfinance(ticker: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch ETF data from yfinance as fallback.
|
|
|
|
Args:
|
|
ticker: ETF ticker symbol
|
|
|
|
Returns:
|
|
Dictionary with ETF data or None if failed
|
|
"""
|
|
try:
|
|
logger.info(f"Fetching yfinance data for {ticker}")
|
|
etf = yf.Ticker(ticker)
|
|
info = etf.info
|
|
|
|
# Get the most recent dividend yield
|
|
if 'dividendYield' in info and info['dividendYield'] is not None:
|
|
yield_pct = info['dividendYield'] * 100
|
|
logger.info(f"Found dividend yield in yfinance for {ticker}: {yield_pct:.2f}%")
|
|
else:
|
|
# Try to calculate from dividend history
|
|
hist = etf.history(period="1y")
|
|
if not hist.empty and 'Dividends' in hist.columns:
|
|
annual_dividend = hist['Dividends'].sum()
|
|
current_price = info.get('regularMarketPrice', 0)
|
|
yield_pct = (annual_dividend / current_price) * 100 if current_price > 0 else 0
|
|
logger.info(f"Calculated yield from history for {ticker}: {yield_pct:.2f}%")
|
|
else:
|
|
yield_pct = 0
|
|
logger.warning(f"No yield data found for {ticker} in yfinance")
|
|
|
|
# Get current price
|
|
current_price = info.get('regularMarketPrice', 0)
|
|
if current_price <= 0:
|
|
current_price = info.get('regularMarketPreviousClose', 0)
|
|
logger.warning(f"Using previous close price for {ticker}: {current_price}")
|
|
|
|
etf_data = {
|
|
"Ticker": ticker,
|
|
"Price": current_price,
|
|
"Yield (%)": yield_pct,
|
|
"Risk Level": "High" # Default for high-yield ETFs
|
|
}
|
|
logger.info(f"yfinance data for {ticker}: {etf_data}")
|
|
return etf_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
|
|
return None
|
|
|
|
def fetch_etf_data(tickers: List[str]) -> pd.DataFrame:
|
|
"""
|
|
Fetch ETF data using FMP API with yfinance fallback.
|
|
Uses HIGH_YIELD_ETFS data only as a last resort.
|
|
|
|
Args:
|
|
tickers: List of ETF tickers
|
|
|
|
Returns:
|
|
DataFrame with ETF data
|
|
"""
|
|
try:
|
|
data = {}
|
|
cache_dir = Path("cache")
|
|
cache_dir.mkdir(exist_ok=True)
|
|
|
|
logger.info("=== Starting ETF data fetch ===")
|
|
logger.info(f"Force refresh enabled: {st.session_state.get('force_refresh_data', False)}")
|
|
logger.info(f"Cache directory: {cache_dir.absolute()}")
|
|
|
|
for ticker in tickers:
|
|
if not ticker: # Skip empty tickers
|
|
continue
|
|
|
|
logger.info(f"\n=== Processing {ticker} ===")
|
|
|
|
# Check cache first if not forcing refresh
|
|
cache_file = cache_dir / f"{ticker}_data.json"
|
|
logger.info(f"Cache file path: {cache_file.absolute()}")
|
|
logger.info(f"Cache file exists: {cache_file.exists()}")
|
|
|
|
if not st.session_state.get("force_refresh_data", False) and cache_file.exists():
|
|
try:
|
|
with open(cache_file, 'r') as f:
|
|
cached_data = json.load(f)
|
|
cache_time = datetime.fromisoformat(cached_data.get('timestamp', '2000-01-01'))
|
|
cache_age = datetime.now() - cache_time
|
|
logger.info(f"Cache age: {cache_age.total_seconds() / 3600:.2f} hours")
|
|
|
|
if cache_age < timedelta(hours=24):
|
|
logger.info(f"Using cached data for {ticker}")
|
|
data[ticker] = cached_data['data']
|
|
continue
|
|
else:
|
|
logger.info(f"Cache expired for {ticker} (age: {cache_age.total_seconds() / 3600:.2f} hours)")
|
|
except Exception as e:
|
|
logger.warning(f"Error reading cache for {ticker}: {str(e)}")
|
|
logger.warning(traceback.format_exc())
|
|
else:
|
|
logger.info(f"No cache found or force refresh enabled for {ticker}")
|
|
|
|
# Try FMP first if enabled
|
|
if USE_FMP_API and FMP_API_KEY:
|
|
logger.info(f"Making FMP API call for {ticker}")
|
|
etf_data = fetch_etf_data_fmp(ticker)
|
|
if etf_data is not None:
|
|
# Cache the data
|
|
try:
|
|
cache_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'data': etf_data
|
|
}
|
|
with open(cache_file, 'w') as f:
|
|
json.dump(cache_data, f)
|
|
logger.info(f"Cached FMP data for {ticker}")
|
|
except Exception as e:
|
|
logger.warning(f"Error caching FMP data for {ticker}: {str(e)}")
|
|
logger.warning(traceback.format_exc())
|
|
|
|
data[ticker] = etf_data
|
|
st.session_state.api_calls += 1
|
|
logger.info(f"Total API calls: {st.session_state.api_calls}")
|
|
continue
|
|
|
|
# If FMP fails, try yfinance
|
|
logger.info(f"Falling back to yfinance for {ticker}")
|
|
etf_data = fetch_etf_data_yfinance(ticker)
|
|
if etf_data is not None:
|
|
# Cache the data
|
|
try:
|
|
cache_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'data': etf_data
|
|
}
|
|
with open(cache_file, 'w') as f:
|
|
json.dump(cache_data, f)
|
|
logger.info(f"Cached yfinance data for {ticker}")
|
|
except Exception as e:
|
|
logger.warning(f"Error caching yfinance data for {ticker}: {str(e)}")
|
|
logger.warning(traceback.format_exc())
|
|
|
|
data[ticker] = etf_data
|
|
continue
|
|
|
|
# Only use HIGH_YIELD_ETFS data if both FMP and yfinance failed
|
|
if ticker in HIGH_YIELD_ETFS:
|
|
logger.info(f"Using fallback data from HIGH_YIELD_ETFS for {ticker}")
|
|
etf_data = {
|
|
"Ticker": ticker,
|
|
"Price": 25.0, # Default price for fallback
|
|
"Yield (%)": HIGH_YIELD_ETFS[ticker]["expected_yield"],
|
|
"Distribution Period": HIGH_YIELD_ETFS[ticker]["frequency"],
|
|
"Risk Level": "High"
|
|
}
|
|
data[ticker] = etf_data
|
|
else:
|
|
logger.error(f"Failed to fetch data for {ticker} from all sources")
|
|
|
|
if not data:
|
|
st.error("No ETF data could be fetched")
|
|
return pd.DataFrame()
|
|
|
|
df = pd.DataFrame(data.values())
|
|
|
|
# Validate the data
|
|
if df.empty:
|
|
st.error("No ETF data could be fetched")
|
|
return pd.DataFrame()
|
|
|
|
if (df["Price"] <= 0).any():
|
|
st.error("Some ETFs have invalid prices")
|
|
return pd.DataFrame()
|
|
|
|
if (df["Yield (%)"] <= 0).any():
|
|
st.warning("Some ETFs have zero or negative yields")
|
|
|
|
logger.info(f"Final DataFrame:\n{df}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
st.error(f"Error fetching ETF data: {str(e)}")
|
|
logger.error(f"Error in fetch_etf_data: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
def run_portfolio_simulation(
|
|
tickers: List[str],
|
|
weights: List[float],
|
|
initial_investment: float,
|
|
start_date: str,
|
|
end_date: str,
|
|
rebalance_frequency: str = 'monthly',
|
|
use_fmp: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run portfolio simulation with the given parameters.
|
|
|
|
Args:
|
|
tickers: List of ETF tickers
|
|
weights: List of portfolio weights
|
|
initial_investment: Initial investment amount
|
|
start_date: Start date for simulation
|
|
end_date: End date for simulation
|
|
rebalance_frequency: Frequency of rebalancing
|
|
use_fmp: Whether to use FMP API for data
|
|
|
|
Returns:
|
|
Dictionary with simulation results
|
|
"""
|
|
try:
|
|
# Validate inputs
|
|
if not tickers or not weights:
|
|
raise ValueError("No tickers or weights provided")
|
|
if len(tickers) != len(weights):
|
|
raise ValueError("Number of tickers must match number of weights")
|
|
if not all(0 <= w <= 1 for w in weights):
|
|
raise ValueError("Weights must be between 0 and 1")
|
|
if sum(weights) != 1:
|
|
raise ValueError("Weights must sum to 1")
|
|
|
|
# Get historical data
|
|
historical_data = {}
|
|
for ticker in tickers:
|
|
if use_fmp and FMP_API_KEY:
|
|
data = fetch_etf_data_fmp(ticker)
|
|
if data and 'historical' in data:
|
|
historical_data[ticker] = data['historical']
|
|
else:
|
|
logger.warning(f"Falling back to yfinance for {ticker}")
|
|
data = fetch_etf_data_yfinance(ticker)
|
|
if data and 'historical' in data:
|
|
historical_data[ticker] = data['historical']
|
|
else:
|
|
data = fetch_etf_data_yfinance(ticker)
|
|
if data and 'historical' in data:
|
|
historical_data[ticker] = data['historical']
|
|
|
|
if not historical_data:
|
|
raise ValueError("No historical data available for any tickers")
|
|
|
|
# Create portfolio DataFrame
|
|
portfolio = pd.DataFrame()
|
|
for ticker, data in historical_data.items():
|
|
portfolio[ticker] = data['close']
|
|
|
|
# Calculate portfolio returns
|
|
portfolio_returns = portfolio.pct_change()
|
|
portfolio_returns = portfolio_returns.fillna(0)
|
|
|
|
# Calculate weighted returns
|
|
weighted_returns = pd.DataFrame()
|
|
for i, ticker in enumerate(tickers):
|
|
weighted_returns[ticker] = portfolio_returns[ticker] * weights[i]
|
|
|
|
portfolio_returns['portfolio'] = weighted_returns.sum(axis=1)
|
|
|
|
# Calculate cumulative returns
|
|
cumulative_returns = (1 + portfolio_returns).cumprod()
|
|
|
|
# Calculate portfolio value
|
|
portfolio_value = initial_investment * cumulative_returns['portfolio']
|
|
|
|
# Calculate metrics
|
|
total_return = (portfolio_value.iloc[-1] / initial_investment) - 1
|
|
annual_return = (1 + total_return) ** (252 / len(portfolio_value)) - 1
|
|
volatility = portfolio_returns['portfolio'].std() * np.sqrt(252)
|
|
sharpe_ratio = annual_return / volatility if volatility != 0 else 0
|
|
|
|
# Calculate drawdown
|
|
rolling_max = portfolio_value.expanding().max()
|
|
drawdown = (portfolio_value - rolling_max) / rolling_max
|
|
max_drawdown = drawdown.min()
|
|
|
|
return {
|
|
'portfolio_value': portfolio_value,
|
|
'returns': portfolio_returns,
|
|
'cumulative_returns': cumulative_returns,
|
|
'total_return': total_return,
|
|
'annual_return': annual_return,
|
|
'volatility': volatility,
|
|
'sharpe_ratio': sharpe_ratio,
|
|
'max_drawdown': max_drawdown,
|
|
'drawdown': drawdown
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in portfolio simulation: {str(e)}")
|
|
st.error(f"Error running portfolio simulation: {str(e)}")
|
|
return None
|
|
|
|
def portfolio_summary(final_alloc: pd.DataFrame) -> None:
|
|
"""
|
|
Display a summary of the portfolio allocation.
|
|
|
|
Args:
|
|
final_alloc: DataFrame containing the portfolio allocation
|
|
"""
|
|
if final_alloc is None or final_alloc.empty:
|
|
st.warning("No portfolio data available.")
|
|
return
|
|
|
|
try:
|
|
# Calculate key metrics
|
|
total_capital = final_alloc["Capital Allocated ($)"].sum()
|
|
total_income = final_alloc["Income Contributed ($)"].sum()
|
|
|
|
# Calculate weighted average yield
|
|
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
|
|
|
|
# Display metrics in columns
|
|
col1, col2, col3 = st.columns(3)
|
|
|
|
with col1:
|
|
st.metric("Total Capital", f"${total_capital:,.2f}")
|
|
|
|
with col2:
|
|
st.metric("Annual Income", f"${total_income:,.2f}")
|
|
st.metric("Monthly Income", f"${total_income/12:,.2f}")
|
|
|
|
with col3:
|
|
st.metric("Average Yield", f"{weighted_yield:.2f}%")
|
|
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
|
|
|
|
# Display allocation chart
|
|
fig = px.pie(
|
|
final_alloc,
|
|
values="Allocation (%)",
|
|
names="Ticker",
|
|
title="Portfolio Allocation by ETF",
|
|
hover_data={
|
|
"Ticker": True,
|
|
"Allocation (%)": ":.2f",
|
|
"Yield (%)": ":.2f",
|
|
"Capital Allocated ($)": ":,.2f",
|
|
"Income Contributed ($)": ":,.2f"
|
|
}
|
|
)
|
|
st.plotly_chart(fig, use_container_width=True)
|
|
|
|
# Display detailed allocation table
|
|
st.subheader("Detailed Allocation")
|
|
display_df = final_alloc.copy()
|
|
display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
|
|
|
|
# Format the display
|
|
st.dataframe(
|
|
display_df.style.format({
|
|
"Allocation (%)": "{:.2f}%",
|
|
"Yield (%)": "{:.2f}%",
|
|
"Price": "${:,.2f}",
|
|
"Shares": "{:,.4f}",
|
|
"Capital Allocated ($)": "${:,.2f}",
|
|
"Monthly Income": "${:,.2f}",
|
|
"Income Contributed ($)": "${:,.2f}"
|
|
}),
|
|
use_container_width=True
|
|
)
|
|
|
|
except Exception as e:
|
|
st.error(f"Error calculating portfolio summary: {str(e)}")
|
|
logger.error(f"Error in portfolio_summary: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
|
|
def save_portfolio(portfolio_name: str, final_alloc: pd.DataFrame, mode: str, target: float) -> bool:
|
|
"""
|
|
Save portfolio allocation to a JSON file.
|
|
|
|
Args:
|
|
portfolio_name: Name of the portfolio
|
|
final_alloc: DataFrame containing portfolio allocation
|
|
mode: Portfolio mode ("Income Target" or "Capital Target")
|
|
target: Target value (income or capital)
|
|
|
|
Returns:
|
|
bool: True if save was successful, False otherwise
|
|
"""
|
|
try:
|
|
# Create portfolios directory if it doesn't exist
|
|
portfolios_dir = Path("portfolios")
|
|
portfolios_dir.mkdir(exist_ok=True)
|
|
|
|
# Prepare portfolio data
|
|
portfolio_data = {
|
|
"name": portfolio_name,
|
|
"created_at": datetime.now().isoformat(),
|
|
"mode": mode,
|
|
"target": target,
|
|
"allocations": []
|
|
}
|
|
|
|
# Convert DataFrame to list of dictionaries
|
|
for _, row in final_alloc.iterrows():
|
|
allocation = {
|
|
"ticker": row["Ticker"],
|
|
"allocation": float(row["Allocation (%)"]),
|
|
"yield": float(row["Yield (%)"]),
|
|
"price": float(row["Price"]),
|
|
"risk_level": row["Risk Level"]
|
|
}
|
|
portfolio_data["allocations"].append(allocation)
|
|
|
|
# Save to JSON file
|
|
file_path = portfolios_dir / f"{portfolio_name}.json"
|
|
with open(file_path, 'w') as f:
|
|
json.dump(portfolio_data, f, indent=2)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
st.error(f"Error saving portfolio: {str(e)}")
|
|
return False
|
|
|
|
def load_portfolio(portfolio_name: str) -> Tuple[Optional[pd.DataFrame], Optional[str], Optional[float]]:
|
|
"""
|
|
Load portfolio allocation from a JSON file.
|
|
|
|
Args:
|
|
portfolio_name: Name of the portfolio to load
|
|
|
|
Returns:
|
|
Tuple containing:
|
|
- DataFrame with portfolio allocation
|
|
- Portfolio mode
|
|
- Target value
|
|
"""
|
|
try:
|
|
# Check if portfolio exists
|
|
file_path = Path("portfolios") / f"{portfolio_name}.json"
|
|
if not file_path.exists():
|
|
st.error(f"Portfolio '{portfolio_name}' not found.")
|
|
return None, None, None
|
|
|
|
# Load portfolio data
|
|
with open(file_path, 'r') as f:
|
|
portfolio_data = json.load(f)
|
|
|
|
# Convert allocations to DataFrame
|
|
allocations = portfolio_data["allocations"]
|
|
df = pd.DataFrame(allocations)
|
|
|
|
# Rename columns to match expected format
|
|
df = df.rename(columns={
|
|
"allocation": "Allocation (%)",
|
|
"yield": "Yield (%)",
|
|
"price": "Price"
|
|
})
|
|
|
|
return df, portfolio_data["mode"], portfolio_data["target"]
|
|
|
|
except Exception as e:
|
|
st.error(f"Error loading portfolio: {str(e)}")
|
|
return None, None, None
|
|
|
|
def list_saved_portfolios() -> List[str]:
|
|
"""
|
|
List all saved portfolios.
|
|
|
|
Returns:
|
|
List of portfolio names
|
|
"""
|
|
try:
|
|
portfolios_dir = Path("portfolios")
|
|
if not portfolios_dir.exists():
|
|
return []
|
|
|
|
# Get all JSON files in the portfolios directory
|
|
portfolio_files = list(portfolios_dir.glob("*.json"))
|
|
|
|
# Extract portfolio names from filenames
|
|
portfolio_names = [f.stem for f in portfolio_files]
|
|
|
|
return sorted(portfolio_names)
|
|
|
|
except Exception as e:
|
|
st.error(f"Error listing portfolios: {str(e)}")
|
|
return []
|
|
|
|
def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
|
|
"""
|
|
Allocate portfolio for income target.
|
|
|
|
Args:
|
|
df: DataFrame with ETF data
|
|
target: Monthly income target
|
|
etf_allocations: List of ETF allocations
|
|
|
|
Returns:
|
|
DataFrame with final allocation
|
|
"""
|
|
try:
|
|
# Create final allocation DataFrame
|
|
final_alloc = df.copy()
|
|
|
|
# Initialize allocation column if it doesn't exist
|
|
if "Allocation (%)" not in final_alloc.columns:
|
|
final_alloc["Allocation (%)"] = 0.0
|
|
|
|
# Set allocations
|
|
for alloc in etf_allocations:
|
|
mask = final_alloc["Ticker"] == alloc["ticker"]
|
|
if mask.any():
|
|
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
|
|
else:
|
|
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
|
|
|
|
# Verify allocations are set
|
|
if final_alloc["Allocation (%)"].sum() == 0:
|
|
logger.error("No allocations were set")
|
|
return None
|
|
|
|
# Calculate required capital for income target
|
|
monthly_income = target
|
|
annual_income = monthly_income * 12
|
|
|
|
# Calculate weighted average yield
|
|
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
|
|
if weighted_yield == 0:
|
|
logger.error("Weighted yield is zero")
|
|
return None
|
|
|
|
# Calculate required capital
|
|
required_capital = (annual_income / weighted_yield) * 100
|
|
|
|
# Calculate capital allocation and income
|
|
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital
|
|
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
|
|
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
|
|
|
|
# Verify calculations
|
|
total_income = final_alloc["Income Contributed ($)"].sum()
|
|
if abs(total_income - annual_income) > 1.0: # Allow for small rounding errors
|
|
logger.warning(f"Total income ({total_income}) does not match target ({annual_income})")
|
|
|
|
logger.info(f"Income allocation completed. Required capital: ${required_capital:,.2f}")
|
|
logger.info(f"Final allocations:\n{final_alloc}")
|
|
|
|
return final_alloc
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in income allocation: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocations: List[Dict[str, Any]]) -> pd.DataFrame:
|
|
"""
|
|
Allocate portfolio for capital target.
|
|
|
|
Args:
|
|
df: DataFrame with ETF data
|
|
initial_capital: Initial capital amount
|
|
etf_allocations: List of ETF allocations
|
|
|
|
Returns:
|
|
DataFrame with final allocation
|
|
"""
|
|
try:
|
|
# Create final allocation DataFrame
|
|
final_alloc = df.copy()
|
|
|
|
# Initialize allocation column if it doesn't exist
|
|
if "Allocation (%)" not in final_alloc.columns:
|
|
final_alloc["Allocation (%)"] = 0.0
|
|
|
|
# Set allocations
|
|
for alloc in etf_allocations:
|
|
mask = final_alloc["Ticker"] == alloc["ticker"]
|
|
if mask.any():
|
|
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
|
|
else:
|
|
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
|
|
|
|
# Verify allocations are set
|
|
if final_alloc["Allocation (%)"].sum() == 0:
|
|
logger.error("No allocations were set")
|
|
return None
|
|
|
|
# Calculate capital allocation and income
|
|
final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * initial_capital
|
|
final_alloc["Shares"] = final_alloc["Capital Allocated ($)"] / final_alloc["Price"]
|
|
final_alloc["Income Contributed ($)"] = (final_alloc["Capital Allocated ($)"] * final_alloc["Yield (%)"]) / 100
|
|
|
|
# Verify calculations
|
|
total_capital = final_alloc["Capital Allocated ($)"].sum()
|
|
if abs(total_capital - initial_capital) > 1.0: # Allow for small rounding errors
|
|
logger.warning(f"Total capital ({total_capital}) does not match initial capital ({initial_capital})")
|
|
|
|
logger.info(f"Capital allocation completed. Initial capital: ${initial_capital:,.2f}")
|
|
logger.info(f"Final allocations:\n{final_alloc}")
|
|
|
|
return final_alloc
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in capital allocation: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def reset_simulation():
|
|
"""Reset all simulation data and state."""
|
|
st.session_state.simulation_run = False
|
|
st.session_state.df_data = None
|
|
st.session_state.final_alloc = None
|
|
st.session_state.mode = 'Capital Target'
|
|
st.session_state.target = 0
|
|
st.session_state.initial_capital = 0
|
|
st.session_state.enable_drip = False
|
|
st.session_state.enable_erosion = False
|
|
st.rerun()
|
|
|
|
def test_fmp_connection() -> bool:
|
|
"""Test connection to FMP API."""
|
|
try:
|
|
if not FMP_API_KEY:
|
|
st.error("FMP API key not found in environment variables")
|
|
return False
|
|
|
|
session = get_fmp_session()
|
|
test_url = f"{FMP_BASE_URL}/profile/SPY?apikey={FMP_API_KEY}"
|
|
logger.info(f"Making FMP API test call to {test_url}")
|
|
response = session.get(test_url)
|
|
st.session_state.api_calls += 1
|
|
logger.info(f"FMP API call count: {st.session_state.api_calls}")
|
|
|
|
if response.status_code == 200:
|
|
st.success("Successfully connected to FMP API")
|
|
return True
|
|
else:
|
|
st.error(f"Failed to connect to FMP API: {response.status_code}")
|
|
logger.error(f"FMP API test failed: {response.text}")
|
|
return False
|
|
except Exception as e:
|
|
st.error(f"Error testing FMP connection: {str(e)}")
|
|
logger.error(f"FMP API test error: {str(e)}")
|
|
return False
|
|
|
|
def get_cache_stats() -> Dict[str, Any]:
|
|
"""
|
|
Get statistics about the cache usage.
|
|
|
|
Returns:
|
|
Dictionary containing cache statistics
|
|
"""
|
|
try:
|
|
cache_dir = Path("cache")
|
|
if not cache_dir.exists():
|
|
return {
|
|
"ticker_count": 0,
|
|
"file_count": 0,
|
|
"total_size_kb": 0
|
|
}
|
|
|
|
# Get all cache files
|
|
cache_files = list(cache_dir.glob("**/*.json"))
|
|
|
|
# Count unique tickers
|
|
tickers = set()
|
|
for file in cache_files:
|
|
# Extract ticker from filename (assuming format: ticker_data_type.json)
|
|
ticker = file.stem.split('_')[0]
|
|
tickers.add(ticker)
|
|
|
|
# Calculate total size
|
|
total_size = sum(file.stat().st_size for file in cache_files)
|
|
|
|
return {
|
|
"ticker_count": len(tickers),
|
|
"file_count": len(cache_files),
|
|
"total_size_kb": total_size / 1024 # Convert to KB
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting cache stats: {str(e)}")
|
|
return {
|
|
"ticker_count": 0,
|
|
"file_count": 0,
|
|
"total_size_kb": 0
|
|
}
|
|
|
|
def clear_cache(ticker: Optional[str] = None) -> None:
|
|
"""
|
|
Clear cache files for a specific ticker or all tickers.
|
|
|
|
Args:
|
|
ticker: Optional ticker symbol to clear cache for. If None, clears all cache.
|
|
"""
|
|
try:
|
|
cache_dir = Path("cache")
|
|
if not cache_dir.exists():
|
|
return
|
|
|
|
if ticker:
|
|
# Clear cache for specific ticker
|
|
pattern = f"{ticker.upper()}_*.json"
|
|
cache_files = list(cache_dir.glob(f"**/{pattern}"))
|
|
else:
|
|
# Clear all cache files
|
|
cache_files = list(cache_dir.glob("**/*.json"))
|
|
|
|
# Delete cache files
|
|
for file in cache_files:
|
|
try:
|
|
file.unlink()
|
|
logger.info(f"Deleted cache file: {file}")
|
|
except Exception as e:
|
|
logger.error(f"Error deleting cache file {file}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error clearing cache: {str(e)}")
|
|
|
|
# Set page config
|
|
st.set_page_config(
|
|
page_title="ETF Portfolio Builder",
|
|
page_icon="📈",
|
|
layout="wide",
|
|
initial_sidebar_state="expanded"
|
|
)
|
|
|
|
# Initialize session state variables
|
|
if 'simulation_run' not in st.session_state:
|
|
st.session_state.simulation_run = False
|
|
if 'df_data' not in st.session_state:
|
|
st.session_state.df_data = None
|
|
if 'final_alloc' not in st.session_state:
|
|
st.session_state.final_alloc = None
|
|
if 'mode' not in st.session_state:
|
|
st.session_state.mode = 'Capital Target'
|
|
if 'target' not in st.session_state:
|
|
st.session_state.target = 0
|
|
if 'initial_capital' not in st.session_state:
|
|
st.session_state.initial_capital = 0
|
|
if 'enable_drip' not in st.session_state:
|
|
st.session_state.enable_drip = False
|
|
if 'enable_erosion' not in st.session_state:
|
|
st.session_state.enable_erosion = False
|
|
if 'api_calls' not in st.session_state:
|
|
st.session_state.api_calls = 0
|
|
if 'force_refresh_data' not in st.session_state:
|
|
st.session_state.force_refresh_data = False
|
|
|
|
# Main title
|
|
st.title("📈 ETF Portfolio Builder")
|
|
|
|
# Sidebar for simulation parameters
|
|
with st.sidebar:
|
|
st.header("Simulation Parameters")
|
|
|
|
# Add refresh data button at the top
|
|
if st.button("🔄 Refresh Data", use_container_width=True):
|
|
st.info("Refreshing ETF data...")
|
|
# Add your data refresh logic here
|
|
st.success("Data refreshed successfully!")
|
|
|
|
# Mode selection
|
|
simulation_mode = st.radio(
|
|
"Select Simulation Mode",
|
|
["Capital Target", "Income Target"]
|
|
)
|
|
|
|
if simulation_mode == "Income Target":
|
|
monthly_target = st.number_input(
|
|
"Monthly Income Target ($)",
|
|
min_value=100.0,
|
|
max_value=100000.0,
|
|
value=1000.0,
|
|
step=100.0
|
|
)
|
|
ANNUAL_TARGET = monthly_target * 12
|
|
else:
|
|
initial_capital = st.number_input(
|
|
"Initial Capital ($)",
|
|
min_value=1000.0,
|
|
max_value=1000000.0,
|
|
value=100000.0,
|
|
step=1000.0
|
|
)
|
|
|
|
# Risk tolerance
|
|
risk_tolerance = st.select_slider(
|
|
"Risk Tolerance",
|
|
options=["Conservative", "Moderate", "Aggressive"],
|
|
value="Moderate"
|
|
)
|
|
|
|
# Additional options
|
|
st.subheader("Additional Options")
|
|
|
|
# DRIP option
|
|
enable_drip = st.radio(
|
|
"Enable Dividend Reinvestment (DRIP)",
|
|
["Yes", "No"],
|
|
index=1
|
|
)
|
|
|
|
# Erosion options
|
|
enable_erosion = st.radio(
|
|
"Enable NAV & Yield Erosion",
|
|
["Yes", "No"],
|
|
index=1
|
|
)
|
|
|
|
# ETF Selection
|
|
st.subheader("ETF Selection")
|
|
|
|
# Create a form for ETF selection
|
|
with st.form("etf_selection_form"):
|
|
# Number of ETFs
|
|
num_etfs = st.number_input("Number of ETFs", min_value=1, max_value=10, value=3, step=1)
|
|
|
|
# Create columns for ETF inputs
|
|
etf_inputs = []
|
|
for i in range(num_etfs):
|
|
ticker = st.text_input(f"ETF {i+1} Ticker", key=f"ticker_{i}")
|
|
if ticker: # Only add non-empty tickers
|
|
etf_inputs.append({"ticker": ticker.upper().strip()})
|
|
|
|
# Submit button
|
|
submitted = st.form_submit_button("Run Portfolio Simulation", type="primary")
|
|
|
|
if submitted:
|
|
try:
|
|
if not etf_inputs:
|
|
st.error("Please enter at least one ETF ticker")
|
|
else:
|
|
logger.info(f"Form submitted with {len(etf_inputs)} ETFs: {etf_inputs}")
|
|
|
|
# Store parameters in session state
|
|
st.session_state.mode = simulation_mode
|
|
st.session_state.enable_drip = enable_drip == "Yes"
|
|
st.session_state.enable_erosion = enable_erosion == "Yes"
|
|
|
|
if simulation_mode == "Income Target":
|
|
st.session_state.target = monthly_target
|
|
else:
|
|
st.session_state.target = initial_capital
|
|
st.session_state.initial_capital = initial_capital
|
|
|
|
# Run simulation
|
|
logger.info("Starting portfolio simulation...")
|
|
logger.info(f"ETF inputs: {etf_inputs}")
|
|
|
|
df_data = fetch_etf_data([etf["ticker"] for etf in etf_inputs])
|
|
logger.info(f"Fetched ETF data:\n{df_data}")
|
|
|
|
if df_data is not None and not df_data.empty:
|
|
logger.info("Calculating optimal allocations...")
|
|
# Calculate allocations based on risk tolerance
|
|
etf_allocations = optimize_portfolio_allocation(
|
|
df_data.to_dict('records'),
|
|
risk_tolerance,
|
|
pd.DataFrame() # Empty correlation matrix for now
|
|
)
|
|
logger.info(f"Optimal allocations: {etf_allocations}")
|
|
|
|
if simulation_mode == "Income Target":
|
|
logger.info(f"Allocating for income target: ${monthly_target}")
|
|
final_alloc = allocate_for_income(df_data, monthly_target, etf_allocations)
|
|
else:
|
|
logger.info(f"Allocating for capital target: ${initial_capital}")
|
|
final_alloc = allocate_for_capital(df_data, initial_capital, etf_allocations)
|
|
|
|
logger.info(f"Final allocation result:\n{final_alloc}")
|
|
|
|
if final_alloc is not None and not final_alloc.empty:
|
|
# Store results in session state
|
|
st.session_state.simulation_run = True
|
|
st.session_state.df_data = df_data
|
|
st.session_state.final_alloc = final_alloc
|
|
st.success("Portfolio simulation completed!")
|
|
st.rerun()
|
|
else:
|
|
st.error("Failed to generate portfolio allocation. Please check your inputs and try again.")
|
|
logger.error("Allocation returned empty DataFrame")
|
|
logger.error(f"df_data columns: {df_data.columns}")
|
|
logger.error(f"df_data shape: {df_data.shape}")
|
|
logger.error(f"df_data:\n{df_data}")
|
|
else:
|
|
st.error("Failed to fetch ETF data. Please check your tickers and try again.")
|
|
logger.error("ETF data fetch returned empty DataFrame")
|
|
|
|
except Exception as e:
|
|
st.error(f"Error running simulation: {str(e)}")
|
|
logger.error(f"Error in form submission: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Add reset simulation button at the bottom of sidebar
|
|
if st.button("🔄 Reset Simulation", use_container_width=True, type="secondary"):
|
|
reset_simulation()
|
|
|
|
# Add FMP connection status to the navigation bar
|
|
st.sidebar.markdown("---")
|
|
st.sidebar.subheader("FMP API Status")
|
|
connection_status = test_fmp_connection()
|
|
if connection_status:
|
|
st.sidebar.success("✅ FMP API: Connected")
|
|
else:
|
|
st.sidebar.error("❌ FMP API: Connection failed")
|
|
|
|
# Advanced Options section in sidebar
|
|
with st.sidebar.expander("Advanced Options"):
|
|
# Option to toggle FMP API usage
|
|
use_fmp_api = st.checkbox("Use FMP API for high-yield ETFs", value=USE_FMP_API,
|
|
help="Use Financial Modeling Prep API for more accurate yield data on high-yield ETFs")
|
|
if use_fmp_api != USE_FMP_API:
|
|
# Update global setting if changed
|
|
globals()["USE_FMP_API"] = use_fmp_api
|
|
st.success("FMP API usage setting updated")
|
|
|
|
# Add cache controls
|
|
st.subheader("Cache Settings")
|
|
|
|
# Display cache statistics
|
|
cache_stats = get_cache_stats()
|
|
st.write(f"Cache contains data for {cache_stats['ticker_count']} tickers ({cache_stats['file_count']} files, {cache_stats['total_size_kb']:.1f} KB)")
|
|
|
|
# Force refresh option
|
|
st.session_state.force_refresh_data = st.checkbox(
|
|
"Force refresh data (ignore cache)",
|
|
value=st.session_state.get("force_refresh_data", False),
|
|
help="When enabled, always fetch fresh data from APIs"
|
|
)
|
|
|
|
# Cache clearing options
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
if st.button("Clear All Cache"):
|
|
clear_cache()
|
|
st.success("All cache files cleared!")
|
|
st.session_state.api_calls = 0
|
|
|
|
with col2:
|
|
ticker_to_clear = st.text_input("Clear cache for ticker:", key="cache_ticker")
|
|
if st.button("Clear") and ticker_to_clear:
|
|
clear_cache(ticker_to_clear)
|
|
st.success(f"Cache cleared for {ticker_to_clear.upper()}")
|
|
|
|
# Show API call counter
|
|
st.write(f"API calls this session: {st.session_state.api_calls}")
|
|
|
|
# Add option for debug mode and parallel processing
|
|
debug_mode = st.checkbox("Enable Debug Mode", help="Show detailed error logs.")
|
|
parallel_processing = st.checkbox("Enable Parallel Processing", value=True,
|
|
help="Fetch data for multiple ETFs simultaneously")
|
|
|
|
# Display results and interactive allocation adjustment UI after simulation is run
|
|
if st.session_state.simulation_run and st.session_state.df_data is not None:
|
|
df = st.session_state.df_data
|
|
final_alloc = st.session_state.final_alloc if hasattr(st.session_state, 'final_alloc') else None
|
|
|
|
# Validate final_alloc DataFrame
|
|
if final_alloc is None or final_alloc.empty:
|
|
st.error("No portfolio data available. Please run the simulation again.")
|
|
st.session_state.simulation_run = False
|
|
else:
|
|
# Verify required columns exist
|
|
required_columns = ["Capital Allocated ($)", "Yield (%)", "Price", "Ticker"]
|
|
missing_columns = [col for col in required_columns if col not in final_alloc.columns]
|
|
|
|
if missing_columns:
|
|
st.error(f"Missing required columns in portfolio data: {', '.join(missing_columns)}")
|
|
st.session_state.simulation_run = False
|
|
else:
|
|
# Create tabs for better organization
|
|
tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"])
|
|
|
|
with tab1:
|
|
st.subheader("💰 Portfolio Summary")
|
|
portfolio_summary(final_alloc)
|
|
|
|
# Display mode-specific information
|
|
if st.session_state.mode == "Income Target":
|
|
try:
|
|
monthly_target = st.session_state.target
|
|
ANNUAL_TARGET = monthly_target * 12
|
|
total_capital = final_alloc["Capital Allocated ($)"].sum()
|
|
st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).")
|
|
except Exception as e:
|
|
st.error(f"Error displaying income target information: {str(e)}")
|
|
else:
|
|
try:
|
|
initial_capital = st.session_state.initial_capital
|
|
annual_income = final_alloc["Income Contributed ($)"].sum()
|
|
monthly_income = annual_income / 12
|
|
st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).")
|
|
except Exception as e:
|
|
st.error(f"Error displaying capital investment information: {str(e)}")
|
|
|
|
# Add save/load section
|
|
st.subheader("💾 Save/Load Portfolio")
|
|
|
|
# Create two columns for save and load
|
|
save_col, load_col = st.columns(2)
|
|
|
|
with save_col:
|
|
st.write("Save current portfolio")
|
|
portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name")
|
|
if st.button("Save Portfolio", key="save_portfolio"):
|
|
if portfolio_name:
|
|
if save_portfolio(portfolio_name, final_alloc,
|
|
st.session_state.mode,
|
|
st.session_state.target):
|
|
st.success(f"Portfolio '{portfolio_name}' saved successfully!")
|
|
else:
|
|
st.warning("Please enter a portfolio name.")
|
|
|
|
with load_col:
|
|
st.write("Load saved portfolio")
|
|
if st.button("Show Saved Portfolios", key="show_portfolios"):
|
|
saved_portfolios = list_saved_portfolios()
|
|
if saved_portfolios:
|
|
selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio")
|
|
if st.button("Load Portfolio", key="load_portfolio_btn"):
|
|
loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio)
|
|
if loaded_df is not None:
|
|
st.session_state.final_alloc = loaded_df
|
|
st.session_state.mode = loaded_mode
|
|
st.session_state.target = loaded_target
|
|
st.success(f"Portfolio '{selected_portfolio}' loaded successfully!")
|
|
st.rerun()
|
|
else:
|
|
st.info("No saved portfolios found.")
|
|
|
|
# Display full detailed allocation table
|
|
st.subheader("📊 Capital Allocation Details")
|
|
|
|
try:
|
|
# Format currencies for better readability
|
|
display_df = final_alloc.copy()
|
|
# Calculate shares for each ETF
|
|
display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"]
|
|
display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}")
|
|
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}")
|
|
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}")
|
|
display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%")
|
|
display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}")
|
|
|
|
# Create a form for the allocation table
|
|
with st.form("allocation_form"):
|
|
# Create an editable DataFrame
|
|
edited_df = st.data_editor(
|
|
display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]],
|
|
column_config={
|
|
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
|
|
"Allocation (%)": st.column_config.NumberColumn(
|
|
"Allocation (%)",
|
|
min_value=0.0,
|
|
max_value=100.0,
|
|
step=0.1,
|
|
format="%.1f"
|
|
),
|
|
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
|
|
"Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True),
|
|
"Risk Level": st.column_config.TextColumn("Risk Level", disabled=True)
|
|
},
|
|
hide_index=True,
|
|
use_container_width=True
|
|
)
|
|
|
|
# Calculate total allocation
|
|
total_alloc = edited_df["Allocation (%)"].sum()
|
|
|
|
# Display total allocation with color coding
|
|
if abs(total_alloc - 100) <= 0.1:
|
|
st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None)
|
|
else:
|
|
st.metric("Total Allocation (%)", f"{total_alloc:.2f}",
|
|
delta=f"{total_alloc - 100:.2f}",
|
|
delta_color="off")
|
|
|
|
if abs(total_alloc - 100) > 0.1:
|
|
st.warning("Total allocation should be 100%")
|
|
|
|
# Create columns for quick actions
|
|
col1, col2, col3 = st.columns(3)
|
|
|
|
with col1:
|
|
equal_weight = st.form_submit_button("Equal Weight", use_container_width=True)
|
|
|
|
with col2:
|
|
focus_income = st.form_submit_button("Focus on Income", use_container_width=True)
|
|
|
|
with col3:
|
|
focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True)
|
|
|
|
# Submit button for manual edits
|
|
submitted = st.form_submit_button("Update Allocations",
|
|
disabled=abs(total_alloc - 100) > 0.1,
|
|
type="primary",
|
|
use_container_width=True)
|
|
|
|
# Handle form submission
|
|
if submitted:
|
|
try:
|
|
# Convert the edited allocations to a dictionary
|
|
new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()}
|
|
|
|
# Convert to the format expected by allocation functions
|
|
etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()]
|
|
|
|
# Get the mode and target from session state
|
|
mode = st.session_state.mode
|
|
target = st.session_state.target
|
|
initial_capital = st.session_state.initial_capital
|
|
|
|
# Use the same allocation functions as the main navigation
|
|
if mode == "Income Target":
|
|
final_alloc = allocate_for_income(df, target, etf_allocations)
|
|
else: # Capital Target
|
|
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
|
|
|
|
if final_alloc is not None:
|
|
st.session_state.final_alloc = final_alloc
|
|
st.success("Portfolio updated with new allocations!")
|
|
st.rerun()
|
|
else:
|
|
st.error("Failed to update portfolio. Please try again.")
|
|
except Exception as e:
|
|
st.error(f"Error updating allocations: {str(e)}")
|
|
|
|
# Handle quick actions
|
|
if equal_weight:
|
|
try:
|
|
# Calculate equal weight allocation
|
|
num_etfs = len(edited_df)
|
|
equal_allocation = 100 / num_etfs
|
|
|
|
# Create new allocations in the format expected by allocation functions
|
|
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
|
|
|
|
# Get the mode and target from session state
|
|
mode = st.session_state.mode
|
|
target = st.session_state.target
|
|
initial_capital = st.session_state.initial_capital
|
|
|
|
# Use the same allocation functions as the main navigation
|
|
if mode == "Income Target":
|
|
final_alloc = allocate_for_income(df, target, etf_allocations)
|
|
else: # Capital Target
|
|
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
|
|
|
|
if final_alloc is not None:
|
|
st.session_state.final_alloc = final_alloc
|
|
st.success("Portfolio adjusted to equal weight!")
|
|
st.rerun()
|
|
except Exception as e:
|
|
st.error(f"Error applying equal weight: {str(e)}")
|
|
|
|
elif focus_income:
|
|
try:
|
|
# Sort by yield and adjust allocations
|
|
sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False)
|
|
total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum()
|
|
|
|
# Calculate new allocations based on yield
|
|
etf_allocations = []
|
|
for _, row in sorted_alloc.iterrows():
|
|
yield_val = float(row["Yield (%)"].rstrip('%'))
|
|
allocation = (yield_val / total_yield) * 100
|
|
etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation})
|
|
|
|
# Get the mode and target from session state
|
|
mode = st.session_state.mode
|
|
target = st.session_state.target
|
|
initial_capital = st.session_state.initial_capital
|
|
|
|
# Use the same allocation functions as the main navigation
|
|
if mode == "Income Target":
|
|
final_alloc = allocate_for_income(df, target, etf_allocations)
|
|
else: # Capital Target
|
|
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
|
|
|
|
if final_alloc is not None:
|
|
st.session_state.final_alloc = final_alloc
|
|
st.success("Portfolio adjusted to focus on income!")
|
|
st.rerun()
|
|
except Exception as e:
|
|
st.error(f"Error focusing on income: {str(e)}")
|
|
|
|
elif focus_capital:
|
|
try:
|
|
# Calculate equal weight allocation (same as equal weight)
|
|
num_etfs = len(edited_df)
|
|
equal_allocation = 100 / num_etfs
|
|
|
|
# Create new allocations in the format expected by allocation functions
|
|
etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()]
|
|
|
|
# Get the mode and target from session state
|
|
mode = st.session_state.mode
|
|
target = st.session_state.target
|
|
initial_capital = st.session_state.initial_capital
|
|
|
|
# Use the same allocation functions as the main navigation
|
|
if mode == "Income Target":
|
|
final_alloc = allocate_for_income(df, target, etf_allocations)
|
|
else: # Capital Target
|
|
final_alloc = allocate_for_capital(df, initial_capital, etf_allocations)
|
|
|
|
if final_alloc is not None:
|
|
st.session_state.final_alloc = final_alloc
|
|
st.success("Portfolio adjusted to focus on capital!")
|
|
st.rerun()
|
|
except Exception as e:
|
|
st.error(f"Error focusing on capital: {str(e)}")
|
|
except Exception as e:
|
|
st.error(f"Error displaying allocation details: {str(e)}")
|
|
logger.error(f"Error in allocation display: {str(e)}")
|
|
logger.error(traceback.format_exc()) |