Adding AI Suggestions tab

This commit is contained in:
Pascal BIBEHE 2025-06-04 02:36:55 +02:00
parent 19f713673e
commit f7cf624721
3 changed files with 639 additions and 4 deletions

View File

@ -7,7 +7,7 @@ import json
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, Optional from typing import Dict, Optional, List
import yfinance as yf import yfinance as yf
import logging import logging
from pathlib import Path from pathlib import Path
@ -28,6 +28,11 @@ class DataService:
self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings' self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings'
self.cache_timeout = timedelta(hours=1) self.cache_timeout = timedelta(hours=1)
# Create cache directories if they don't exist
for directory in [self.cache_dir, self.yf_cache_dir, self.fmp_cache_dir,
self.fmp_profiles_dir, self.fmp_historical_dir, self.fmp_holdings_dir]:
directory.mkdir(parents=True, exist_ok=True)
def get_etf_data(self, ticker: str) -> Dict: def get_etf_data(self, ticker: str) -> Dict:
"""Get ETF data using fallback logic: """Get ETF data using fallback logic:
1. Try FMP cache 1. Try FMP cache
@ -335,3 +340,62 @@ class DataService:
'is_new': False, 'is_new': False,
'is_estimated': True # Flag to indicate these are estimates 'is_estimated': True # Flag to indicate these are estimates
} }
def get_etf_list(self) -> List[str]:
"""Get list of available ETFs with fallback logic:
1. Try FMP API
2. Try yfinance
3. Use default list as last resort
"""
try:
# Try FMP API first
api_key = os.environ.get("FMP_API_KEY")
if api_key:
from ..api import APIFactory
api_factory = APIFactory(api_key)
fmp_client = api_factory.get_client('fmp')
etfs = fmp_client.get_all_etfs()
if etfs:
logger.info(f"Retrieved {len(etfs)} ETFs from FMP API")
return [etf['symbol'] for etf in etfs if etf.get('symbol')]
# Try yfinance as fallback
try:
import yfinance as yf
# Get list of ETFs from yfinance
etf_list = yf.download("^GSPC", period="1d", progress=False).index # Just to initialize yfinance
etf_list = yf.download("^GSPC", period="1d", progress=False).index # Get S&P 500 components
if not etf_list.empty:
logger.info("Retrieved ETF list from yfinance")
return list(etf_list)
except Exception as e:
logger.warning(f"Error getting ETF list from yfinance: {str(e)}")
# Use default list as last resort
logger.warning("Using default ETF list as fallback")
return [
# Core ETFs
'VTI', 'VOO', 'VEA', 'VWO', 'BND', # Vanguard
'SPY', 'QQQ', 'DIA', 'IWM', 'EFA', # iShares
'AGG', 'TLT', 'LQD', 'HYG', # Fixed Income
# Income ETFs
'JEPI', 'FEPI', 'MSTY', 'SCHD', 'VIG',
# Sector ETFs
'XLK', 'XLF', 'XLV', 'XLE', 'XLU', # Tech, Financial, Healthcare, Energy, Utilities
# Factor ETFs
'MTUM', 'VLUE', 'QUAL', 'SIZE', # Momentum, Value, Quality, Size
# International
'EWJ', 'EWU', 'EWZ', 'EWC', # Japan, UK, Brazil, Canada
# Alternative
'GLD', 'VNQ', 'REM' # Gold, REITs, Mortgage REITs
]
except Exception as e:
logger.error(f"Error getting ETF list: {str(e)}")
# Return default list as last resort
return [
'VTI', 'VOO', 'SPY', 'QQQ', 'BND', # Core ETFs
'JEPI', 'FEPI', 'MSTY', 'SCHD', # Income ETFs
'XLK', 'XLF', 'XLV', 'XLE', 'XLU' # Sector ETFs
]

View File

@ -0,0 +1,491 @@
"""
ETF Selection Service for optimizing ETF selection based on investment goals
"""
import logging
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pandas as pd
import numpy as np
from datetime import datetime
import os
logger = logging.getLogger(__name__)
class RiskTolerance(Enum):
CONSERVATIVE = "Conservative"
MODERATE = "Moderate"
AGGRESSIVE = "Aggressive"
@dataclass
class InvestmentGoal:
capital_target: float
income_target: Optional[float] = None
risk_tolerance: RiskTolerance = RiskTolerance.MODERATE
investment_horizon: int = 5 # years
class ETFSelectionService:
def __init__(self, data_service):
self.data_service = data_service
self.selection_criteria = {
'tier1': {
'expense_ratio': 0.10, # 0.10% or less
'aum': 5_000_000_000, # $5B or more
'tracking_error': 0.05, # 0.05% or less
'avg_volume': 100_000 # 100K shares/day
},
'tier2': {
'expense_ratio': 0.25, # 0.25% or less
'aum': 1_000_000_000, # $1B or more
'tracking_error': 0.10, # 0.10% or less
'avg_volume': 50_000 # 50K shares/day
}
}
def select_etfs(self, goal: InvestmentGoal) -> List[Dict]:
"""
Select ETFs based on investment goals and risk tolerance
Returns a list of recommended ETFs with allocation percentages
"""
try:
logger.info(f"Starting ETF selection with goal: {goal}")
# Validate investment goal
self._validate_investment_goal(goal)
# Get all available ETFs
etfs = self._get_available_etfs()
if not etfs:
error_msg = "No ETFs available for selection. Please check your data source connection."
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Found {len(etfs)} available ETFs")
# Filter ETFs based on criteria
filtered_etfs = self._filter_etfs(etfs, goal)
if not filtered_etfs:
error_msg = (
"No ETFs passed the filtering criteria. This could be due to:\n"
"1. High expense ratios (>0.25%)\n"
"2. Low assets under management (<$1B)\n"
"3. High tracking error (>0.10%)\n"
"4. Low trading volume (<50K shares/day)\n"
"Please try adjusting your risk tolerance or investment goals."
)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"{len(filtered_etfs)} ETFs passed filtering criteria")
# Score ETFs based on criteria
scored_etfs = self._score_etfs(filtered_etfs, goal)
if not scored_etfs:
error_msg = (
"No ETFs passed the scoring criteria. This could be due to:\n"
"1. Poor risk-adjusted returns (low Sharpe ratio)\n"
"2. High volatility\n"
"3. Large drawdowns\n"
"4. Insufficient dividend yield for income goals\n"
"Please try adjusting your risk tolerance or investment goals."
)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"{len(scored_etfs)} ETFs passed scoring criteria")
# Optimize portfolio allocation
portfolio = self._optimize_portfolio(scored_etfs, goal)
if not portfolio:
error_msg = (
"Failed to optimize portfolio allocation. This could be due to:\n"
"1. Insufficient diversification opportunities\n"
"2. Conflicting investment goals\n"
"3. Risk tolerance constraints\n"
"Please try adjusting your investment goals or risk tolerance."
)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully generated portfolio with {len(portfolio)} ETFs")
return portfolio
except ValueError as ve:
# Re-raise ValueError with the same message
raise ve
except Exception as e:
error_msg = f"Unexpected error during ETF selection: {str(e)}"
logger.error(error_msg, exc_info=True)
raise ValueError(f"Unable to generate portfolio suggestions: {str(e)}")
def _validate_investment_goal(self, goal: InvestmentGoal) -> None:
"""Validate investment goal parameters"""
if goal.capital_target <= 0:
raise ValueError("Capital target must be greater than 0")
if goal.income_target and goal.income_target <= 0:
raise ValueError("Income target must be greater than 0")
if not isinstance(goal.risk_tolerance, RiskTolerance):
raise ValueError("Risk tolerance must be a valid RiskTolerance enum value")
def _get_available_etfs(self) -> List[Dict]:
"""Get list of available ETFs with their data"""
try:
# Get list of ETFs
etf_list = self.data_service.get_etf_list()
if not etf_list:
error_msg = "No ETFs available from data service. Please check your data source connection."
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Retrieved {len(etf_list)} ETFs from data service")
# Process each ETF
processed_etfs = []
for ticker in etf_list:
try:
# Get ETF data
etf_data = self.data_service.get_etf_data(ticker)
if not etf_data:
logger.warning(f"No data available for {ticker}")
continue
# Process ETF data
processed_etf = {
'ticker': ticker,
'name': etf_data.get('info', {}).get('longName', ticker),
'expense_ratio': etf_data.get('info', {}).get('annualReportExpenseRatio', 0.5) / 100,
'aum': etf_data.get('info', {}).get('totalAssets', 0),
'avg_volume': etf_data.get('info', {}).get('averageVolume', 0),
'tracking_error': 0.0, # Not available from yfinance
'volatility': float(etf_data.get('volatility', 0)),
'max_drawdown': float(etf_data.get('max_drawdown', 0)),
'sharpe_ratio': float(etf_data.get('sharpe_ratio', 0)),
'top_holding_weight': 0.0, # Not available from yfinance
'dividend_yield': float(etf_data.get('dividend_yield', 0)) / 100,
'category': etf_data.get('info', {}).get('category', 'Unknown'),
'asset_class': etf_data.get('info', {}).get('assetClass', 'Unknown'),
'sector': etf_data.get('info', {}).get('sector', 'Unknown'),
'region': etf_data.get('info', {}).get('region', 'Unknown'),
'strategy': etf_data.get('info', {}).get('strategy', 'Unknown')
}
processed_etfs.append(processed_etf)
except Exception as e:
logger.warning(f"Error processing ETF {ticker}: {str(e)}")
continue
if not processed_etfs:
error_msg = "No ETFs could be processed. Please check your data source connection."
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Successfully processed {len(processed_etfs)} ETFs")
return processed_etfs
except Exception as e:
error_msg = f"Error fetching available ETFs: {str(e)}"
logger.error(error_msg)
raise ValueError(error_msg)
def _get_etfs_from_yfinance(self) -> List[Dict]:
"""Get ETF data from yfinance as fallback"""
try:
# Get list of ETFs from data service
etf_list = self.data_service.get_etf_list()
if not etf_list:
logger.error("No ETF list available from data service")
return []
processed_etfs = []
for ticker in etf_list:
try:
# Get ETF data from data service
etf_data = self.data_service.get_etf_data(ticker)
if not etf_data:
continue
# Process ETF data
processed_etf = {
'ticker': ticker,
'name': etf_data.get('info', {}).get('longName', ticker),
'expense_ratio': etf_data.get('info', {}).get('annualReportExpenseRatio', 0.5) / 100,
'aum': etf_data.get('info', {}).get('totalAssets', 0),
'avg_volume': etf_data.get('info', {}).get('averageVolume', 0),
'tracking_error': 0.0, # Not available from yfinance
'volatility': float(etf_data.get('volatility', 0)),
'max_drawdown': float(etf_data.get('max_drawdown', 0)),
'sharpe_ratio': float(etf_data.get('sharpe_ratio', 0)),
'top_holding_weight': 0.0, # Not available from yfinance
'dividend_yield': float(etf_data.get('dividend_yield', 0)) / 100,
'category': etf_data.get('info', {}).get('category', 'Unknown'),
'asset_class': etf_data.get('info', {}).get('assetClass', 'Unknown'),
'sector': etf_data.get('info', {}).get('sector', 'Unknown'),
'region': etf_data.get('info', {}).get('region', 'Unknown'),
'strategy': etf_data.get('info', {}).get('strategy', 'Unknown')
}
processed_etfs.append(processed_etf)
except Exception as e:
logger.warning(f"Error processing ETF {ticker} from yfinance: {str(e)}")
continue
return processed_etfs
except Exception as e:
logger.error(f"Error fetching ETFs from yfinance: {str(e)}")
return []
def _filter_etfs(self, etfs: List[Dict], goal: InvestmentGoal) -> List[Dict]:
"""Filter ETFs based on selection criteria and investment goals"""
filtered = []
for etf in etfs:
try:
# Skip ETFs with red flags
if self._has_red_flags(etf):
continue
# Apply investment goal specific filters
if goal.income_target:
# For income-focused goals, ensure minimum dividend yield
min_yield = 0.02 # 2% minimum yield
if goal.income_target > 0.05: # If income target is high
min_yield = 0.04 # Require higher yield
if etf.get('dividend_yield', 0) < min_yield:
continue
# Apply risk tolerance specific filters
if goal.risk_tolerance == RiskTolerance.CONSERVATIVE:
# For conservative investors, focus on:
# - Lower volatility
# - Higher dividend yield
# - Lower max drawdown
# - More established ETFs
if (etf.get('volatility', 1.0) > 0.15 or
etf.get('max_drawdown', 1.0) > 0.20 or
etf.get('aum', 0) < 1_000_000_000 or # $1B minimum
etf.get('asset_class', '').lower() in ['leveraged', 'inverse']):
continue
elif goal.risk_tolerance == RiskTolerance.AGGRESSIVE:
# For aggressive investors, focus on:
# - Higher potential returns (Sharpe ratio)
# - Growth potential
# - Sector/theme exposure
if (etf.get('sharpe_ratio', 0) < 0.8 or
etf.get('category', '').lower() in ['bond', 'fixed income'] or
etf.get('aum', 0) < 500_000_000): # $500M minimum
continue
# Check if ETF meets tier criteria
tier = self._get_etf_tier(etf)
if tier > 0: # Only include ETFs that meet at least tier 2 criteria
etf['tier'] = tier
filtered.append(etf)
except Exception as e:
logger.warning(f"Error filtering ETF {etf.get('ticker', 'unknown')}: {str(e)}")
continue
return filtered
def _has_red_flags(self, etf: Dict) -> bool:
"""Check if ETF has any red flags"""
try:
# High expense ratio
if etf.get('expense_ratio', 1.0) > 0.50:
logger.debug(f"ETF {etf.get('ticker')} rejected: High expense ratio")
return True
# Small AUM
if etf.get('aum', 0) < 100_000_000: # $100M
logger.debug(f"ETF {etf.get('ticker')} rejected: Small AUM")
return True
# High tracking error
if etf.get('tracking_error', 1.0) > 0.50:
logger.debug(f"ETF {etf.get('ticker')} rejected: High tracking error")
return True
# Concentrated holdings
if etf.get('top_holding_weight', 0) > 0.20: # 20%
logger.debug(f"ETF {etf.get('ticker')} rejected: Concentrated holdings")
return True
return False
except Exception as e:
logger.warning(f"Error checking red flags for ETF {etf.get('ticker', 'unknown')}: {str(e)}")
return True
def _get_etf_tier(self, etf: Dict) -> int:
"""Determine ETF tier based on criteria"""
try:
# Tier 1 criteria
if (etf.get('expense_ratio', 1.0) <= self.selection_criteria['tier1']['expense_ratio'] and
etf.get('aum', 0) >= self.selection_criteria['tier1']['aum'] and
etf.get('tracking_error', 1.0) <= self.selection_criteria['tier1']['tracking_error'] and
etf.get('avg_volume', 0) >= self.selection_criteria['tier1']['avg_volume']):
return 1
# Tier 2 criteria
if (etf.get('expense_ratio', 1.0) <= self.selection_criteria['tier2']['expense_ratio'] and
etf.get('aum', 0) >= self.selection_criteria['tier2']['aum'] and
etf.get('tracking_error', 1.0) <= self.selection_criteria['tier2']['tracking_error'] and
etf.get('avg_volume', 0) >= self.selection_criteria['tier2']['avg_volume']):
return 2
return 0
except Exception as e:
logger.warning(f"Error determining tier for ETF {etf.get('ticker', 'unknown')}: {str(e)}")
return 0
def _score_etfs(self, etfs: List[Dict], goal: InvestmentGoal) -> List[Dict]:
"""Score ETFs based on investment goals and risk tolerance"""
scored = []
for etf in etfs:
try:
score = 0
score_components = {}
# Base score from tier
tier_score = (3 - etf['tier']) * 10 # Higher score for better tier
score += tier_score
score_components['tier'] = tier_score
# Risk-adjusted return score
if etf.get('sharpe_ratio'):
sharpe_score = min(etf['sharpe_ratio'] * 5, 20) # Max 20 points
score += sharpe_score
score_components['sharpe'] = sharpe_score
# Volatility score (lower is better)
if etf.get('volatility'):
vol_score = max(0, 20 - (etf['volatility'] * 100)) # Max 20 points
score += vol_score
score_components['volatility'] = vol_score
# Income goal specific scoring
if goal.income_target:
# Score based on dividend yield and stability
if etf.get('dividend_yield'):
# Higher score for ETFs with yield closer to target
target_yield = goal.income_target / goal.capital_target
yield_diff = abs(etf['dividend_yield'] - target_yield)
div_score = max(0, 20 - (yield_diff * 100)) # Max 20 points
score += div_score
score_components['dividend'] = div_score
# Bonus for stable dividends
if etf.get('dividend_trend', 0) > 0:
score += 5
score_components['dividend_stability'] = 5
# AUM score
aum_billions = etf.get('aum', 0) / 1_000_000_000
aum_score = min(aum_billions, 10) # Max 10 points
score += aum_score
score_components['aum'] = aum_score
# Risk tolerance specific scoring
if goal.risk_tolerance == RiskTolerance.CONSERVATIVE:
# Favor stability and income
if etf.get('dividend_yield', 0) > 0.03: # 3% yield
score += 10
score_components['income_focus'] = 10
if etf.get('volatility', 1.0) < 0.12: # Low volatility
score += 10
score_components['stability'] = 10
if etf.get('asset_class', '').lower() in ['equity', 'fixed income']:
score += 5
score_components['asset_class'] = 5
elif goal.risk_tolerance == RiskTolerance.AGGRESSIVE:
# Favor growth and momentum
if etf.get('sharpe_ratio', 0) > 1.2: # High Sharpe
score += 10
score_components['risk_adjusted_return'] = 10
if etf.get('category', '').lower() in ['technology', 'growth']:
score += 10
score_components['growth_potential'] = 10
if etf.get('strategy', '').lower() in ['momentum', 'growth']:
score += 5
score_components['strategy'] = 5
# Add score and components to ETF data
etf['score'] = score
etf['score_components'] = score_components
scored.append(etf)
except Exception as e:
logger.warning(f"Error scoring ETF {etf.get('ticker', 'unknown')}: {str(e)}")
continue
# Sort by score in descending order
scored.sort(key=lambda x: x.get('score', 0), reverse=True)
return scored
def _optimize_portfolio(self, scored_etfs: List[Dict], goal: InvestmentGoal) -> List[Dict]:
"""Optimize portfolio allocation based on investment goals"""
try:
# Select top ETFs based on score
top_etfs = scored_etfs[:5] # Limit to top 5 ETFs
# Calculate initial weights based on scores
total_score = sum(etf['score'] for etf in top_etfs)
if total_score == 0:
raise ValueError("No valid scores for portfolio optimization")
# Assign weights based on scores
for etf in top_etfs:
etf['weight'] = etf['score'] / total_score
# Adjust weights based on risk tolerance
if goal.risk_tolerance == RiskTolerance.CONSERVATIVE:
# Favor ETFs with lower volatility and higher Sharpe ratio
for etf in top_etfs:
if etf.get('volatility', 0) > 0.15 or etf.get('sharpe_ratio', 0) < 1.0:
etf['weight'] *= 0.5
elif goal.risk_tolerance == RiskTolerance.AGGRESSIVE:
# Favor ETFs with higher potential returns
for etf in top_etfs:
if etf.get('sharpe_ratio', 0) > 1.5:
etf['weight'] *= 1.5
# Normalize weights
total_weight = sum(etf['weight'] for etf in top_etfs)
for etf in top_etfs:
etf['weight'] /= total_weight
# Format portfolio output
portfolio = []
for etf in top_etfs:
portfolio.append({
'ticker': etf['ticker'],
'name': etf['name'],
'allocation': etf['weight'],
'amount': goal.capital_target * etf['weight'],
'score': etf['score'],
'tier': etf['tier'],
'metrics': {
'expense_ratio': etf.get('expense_ratio'),
'aum': etf.get('aum'),
'tracking_error': etf.get('tracking_error'),
'volatility': etf.get('volatility'),
'sharpe_ratio': etf.get('sharpe_ratio'),
'dividend_yield': etf.get('dividend_yield'),
'max_drawdown': etf.get('max_drawdown')
}
})
return portfolio
except Exception as e:
logger.error(f"Error optimizing portfolio: {str(e)}")
raise ValueError(f"Failed to optimize portfolio allocation: {str(e)}")

View File

@ -2848,8 +2848,88 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
st.error("Unable to analyze ETF erosion risk. Please try again.") st.error("Unable to analyze ETF erosion risk. Please try again.")
with tab4: with tab4:
st.subheader("🤖 AI Suggestions") st.subheader("🤖 AI Portfolio Suggestions")
st.write("This tab will contain AI suggestions for portfolio optimization.")
try:
# Get values from session state
capital_target = st.session_state.initial_capital if st.session_state.mode == "Capital Target" else 3000.0
income_target = st.session_state.target * 12 if st.session_state.mode == "Income Target" else 0.0
risk_tolerance = st.session_state.risk_tolerance
investment_horizon = 5 # Default to 5 years if not specified
# Initialize services
from ETF_Portal.services.data_service import DataService
from ETF_Portal.services.etf_selection_service import ETFSelectionService, InvestmentGoal, RiskTolerance
data_service = DataService()
selection_service = ETFSelectionService(data_service)
# Create investment goal
goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target if income_target > 0 else None,
risk_tolerance=RiskTolerance[risk_tolerance.upper()],
investment_horizon=investment_horizon
)
# Get AI suggestions
with st.spinner("Analyzing ETFs and generating portfolio suggestions..."):
try:
portfolio = selection_service.select_etfs(goal)
if portfolio:
# Display portfolio suggestions
st.success("Portfolio suggestions generated successfully!")
# Create a DataFrame for better display
portfolio_df = pd.DataFrame(portfolio)
portfolio_df['Allocation (%)'] = portfolio_df['allocation'] * 100
portfolio_df['Amount ($)'] = portfolio_df['amount']
# Display portfolio summary
st.write("### Portfolio Summary")
st.dataframe(
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
hide_index=True
)
# Display detailed metrics
st.write("### Detailed Metrics")
metrics_df = pd.DataFrame([
{
'Ticker': etf['ticker'],
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
'AUM ($B)': etf['metrics']['aum'] / 1e9,
'Volatility (%)': etf['metrics']['volatility'] * 100,
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
'Dividend Yield (%)': etf['metrics']['dividend_yield'] * 100
}
for etf in portfolio
])
st.dataframe(metrics_df, hide_index=True)
# Display portfolio allocation chart
st.write("### Portfolio Allocation")
fig = px.pie(
portfolio_df,
values='Allocation (%)',
names='ticker',
title='Portfolio Allocation by ETF'
)
st.plotly_chart(fig)
else:
st.error("No portfolio suggestions could be generated. Please try different parameters.")
except ValueError as e:
st.error(str(e))
except Exception as e:
st.error(f"An unexpected error occurred: {str(e)}")
logger.error(f"Error generating portfolio suggestions: {str(e)}", exc_info=True)
except Exception as e:
st.error(f"Error initializing services: {str(e)}")
logger.error(f"Error initializing services: {str(e)}", exc_info=True)
with tab5: with tab5:
st.subheader("📊 ETF Details") st.subheader("📊 ETF Details")