Compare commits

..

8 Commits
master ... Dev

18 changed files with 1265 additions and 888 deletions

9
.gitignore vendored
View File

@ -2,9 +2,14 @@
.env
#.env.*
# Cache directories
# Ignore all runtime cache data (created at runtime, not code)
cache/
**/cache/
# But DO track the cache manager Python code in the package
!ETF_Portal/ETF_Portal/cache/*.py
!ETF_Portal/ETF_Portal/cache/__init__.py
!ETF_Portal/ETF_Portal/cache/
# Python
__pycache__/

View File

View File

@ -7,10 +7,11 @@ import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Optional
from typing import Dict, Optional, List
import yfinance as yf
import logging
from pathlib import Path
from ..api.factory import APIFactory
logger = logging.getLogger(__name__)
@ -28,6 +29,14 @@ class DataService:
self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings'
self.cache_timeout = timedelta(hours=1)
# Create cache directories if they don't exist
for directory in [self.cache_dir, self.yf_cache_dir, self.fmp_cache_dir,
self.fmp_profiles_dir, self.fmp_historical_dir, self.fmp_holdings_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Initialize API factory
self.api_factory = APIFactory()
def get_etf_data(self, ticker: str) -> Dict:
"""Get ETF data using fallback logic:
1. Try FMP cache
@ -191,8 +200,81 @@ class DataService:
def _get_from_fmp(self, ticker: str) -> Optional[Dict]:
"""Get data from FMP API"""
# TODO: Implement FMP API integration
return None
try:
# Get FMP client
fmp_client = self.api_factory.get_client('fmp')
# Get ETF profile
profile = fmp_client.get_etf_profile(ticker)
if not profile:
return None
# Get historical data
hist_data = fmp_client.get_etf_historical_data(ticker)
if hist_data.empty:
return None
# Get holdings
holdings = fmp_client.get_etf_holdings(ticker)
# Get dividend history
dividend_history = fmp_client.get_dividend_history(ticker)
# Get sector weightings
sector_weightings = fmp_client.get_sector_weightings(ticker)
# Calculate metrics
hist_data['log_returns'] = np.log(hist_data['close'] / hist_data['close'].shift(1))
returns = hist_data['log_returns'].dropna()
# Calculate annualized volatility using daily log returns
volatility = returns.std() * np.sqrt(252)
# Calculate max drawdown using rolling window
rolling_max = hist_data['close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist_data['close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate Sharpe ratio (assuming risk-free rate of 0.02)
risk_free_rate = 0.02
excess_returns = returns - risk_free_rate/252
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
# Calculate Sortino ratio
downside_returns = returns[returns < 0]
sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std()
# Calculate dividend trend
if not dividend_history.empty:
dividend_history['date'] = pd.to_datetime(dividend_history['date'])
dividend_history = dividend_history.sort_values('date')
dividend_trend = dividend_history['dividend'].pct_change().mean() * 100
else:
dividend_trend = 0.0
# Calculate age in years
if 'inceptionDate' in profile:
inception_date = pd.to_datetime(profile['inceptionDate'])
age_years = (pd.Timestamp.now() - inception_date).days / 365.25
else:
age_years = 0.0
return {
'info': profile,
'hist': hist_data.to_dict('records'),
'holdings': holdings,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years < 2
}
except Exception as e:
logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
return None
def _get_from_yfinance(self, ticker: str) -> Optional[Dict]:
"""Get data from yfinance"""
@ -335,3 +417,21 @@ class DataService:
'is_new': False,
'is_estimated': True # Flag to indicate these are estimates
}
def get_etf_list(self) -> List[str]:
"""Get list of available ETFs"""
try:
# Define a list of high-yield ETFs to track
etf_list = [
'JEPI', 'JEPQ', 'FEPI', 'CONY', 'MSTY', 'SDIV', 'DIV', 'VIGI',
'VYM', 'VIG', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM',
'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD',
'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO'
]
# Remove duplicates while preserving order
return list(dict.fromkeys(etf_list))
except Exception as e:
logger.error(f"Error getting ETF list: {str(e)}")
return []

View File

@ -0,0 +1,23 @@
"""
ETF Selection Service package initialization
"""
from .service import ETFSelectionService
from .models import InvestmentGoal, RiskTolerance, ETF, ETFUniverse
from .exceptions import (
ETFSelectionError, ETFDataError, ETFNotFoundError,
ValidationError, PortfolioOptimizationError
)
__all__ = [
'ETFSelectionService',
'InvestmentGoal',
'RiskTolerance',
'ETF',
'ETFUniverse',
'ETFSelectionError',
'ETFDataError',
'ETFNotFoundError',
'ValidationError',
'PortfolioOptimizationError'
]

View File

@ -0,0 +1,179 @@
"""
Database operations for ETF Selection Service
"""
import logging
import sqlite3
from typing import Dict, List, Optional
from datetime import datetime
import json
from .models import ETF, ETFUniverse
from .exceptions import DatabaseError, DataUpdateError
logger = logging.getLogger(__name__)
class ETFDatabase:
def __init__(self, db_path: str = "etf_data.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize database tables"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create ETFs table
cursor.execute("""
CREATE TABLE IF NOT EXISTS etfs (
ticker TEXT PRIMARY KEY,
name TEXT,
expense_ratio REAL,
aum REAL,
avg_volume INTEGER,
tracking_error REAL,
volatility REAL,
max_drawdown REAL,
sharpe_ratio REAL,
top_holding_weight REAL,
dividend_yield REAL,
category TEXT,
asset_class TEXT,
sector TEXT,
region TEXT,
strategy TEXT,
last_updated TEXT
)
""")
# Create categories table
cursor.execute("""
CREATE TABLE IF NOT EXISTS categories (
category TEXT PRIMARY KEY,
etfs TEXT,
last_updated TEXT
)
""")
conn.commit()
except sqlite3.Error as e:
raise DatabaseError(f"Failed to initialize database: {str(e)}")
def save_etf(self, etf: ETF):
"""Save ETF data to database"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO etfs (
ticker, name, expense_ratio, aum, avg_volume,
tracking_error, volatility, max_drawdown,
sharpe_ratio, top_holding_weight, dividend_yield,
category, asset_class, sector, region, strategy,
last_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
etf.ticker, etf.name, etf.expense_ratio, etf.aum,
etf.avg_volume, etf.tracking_error, etf.volatility,
etf.max_drawdown, etf.sharpe_ratio, etf.top_holding_weight,
etf.dividend_yield, etf.category, etf.asset_class,
etf.sector, etf.region, etf.strategy,
datetime.now().isoformat()
))
conn.commit()
except sqlite3.Error as e:
raise DataUpdateError(f"Failed to save ETF data: {str(e)}")
def get_etf(self, ticker: str) -> Optional[ETF]:
"""Get ETF data from database"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM etfs WHERE ticker = ?", (ticker,))
row = cursor.fetchone()
if row:
return ETF(
ticker=row[0],
name=row[1],
expense_ratio=row[2],
aum=row[3],
avg_volume=row[4],
tracking_error=row[5],
volatility=row[6],
max_drawdown=row[7],
sharpe_ratio=row[8],
top_holding_weight=row[9],
dividend_yield=row[10],
category=row[11],
asset_class=row[12],
sector=row[13],
region=row[14],
strategy=row[15]
)
return None
except sqlite3.Error as e:
raise DatabaseError(f"Failed to get ETF data: {str(e)}")
def save_universe(self, universe: ETFUniverse):
"""Save ETF universe to database"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Save categories
for category, etfs in universe.categories.items():
cursor.execute("""
INSERT OR REPLACE INTO categories (
category, etfs, last_updated
) VALUES (?, ?, ?)
""", (
category,
json.dumps(etfs),
datetime.now().isoformat()
))
conn.commit()
except sqlite3.Error as e:
raise DataUpdateError(f"Failed to save ETF universe: {str(e)}")
def get_universe(self) -> ETFUniverse:
"""Get ETF universe from database"""
try:
universe = ETFUniverse()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get all ETFs
cursor.execute("SELECT * FROM etfs")
for row in cursor.fetchall():
etf = ETF(
ticker=row[0],
name=row[1],
expense_ratio=row[2],
aum=row[3],
avg_volume=row[4],
tracking_error=row[5],
volatility=row[6],
max_drawdown=row[7],
sharpe_ratio=row[8],
top_holding_weight=row[9],
dividend_yield=row[10],
category=row[11],
asset_class=row[12],
sector=row[13],
region=row[14],
strategy=row[15]
)
universe.etfs[etf.ticker] = etf
# Get categories
cursor.execute("SELECT category, etfs FROM categories")
for row in cursor.fetchall():
universe.categories[row[0]] = json.loads(row[1])
return universe
except sqlite3.Error as e:
raise DatabaseError(f"Failed to get ETF universe: {str(e)}")

View File

@ -0,0 +1,31 @@
"""
Custom exceptions for ETF Selection Service
"""
class ETFSelectionError(Exception):
"""Base exception for ETF selection errors"""
pass
class ETFDataError(ETFSelectionError):
"""Exception raised when ETF data is invalid or missing"""
pass
class ETFNotFoundError(ETFSelectionError):
"""Exception raised when ETF is not found"""
pass
class DatabaseError(ETFSelectionError):
"""Exception raised for database-related errors"""
pass
class DataUpdateError(ETFSelectionError):
"""Exception raised when data update fails"""
pass
class ValidationError(ETFSelectionError):
"""Exception raised when input validation fails"""
pass
class PortfolioOptimizationError(ETFSelectionError):
"""Exception raised when portfolio optimization fails"""
pass

View File

@ -0,0 +1,52 @@
"""
Data models and enums for ETF Selection Service
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, List
class RiskTolerance(Enum):
CONSERVATIVE = "Conservative"
MODERATE = "Moderate"
AGGRESSIVE = "Aggressive"
@dataclass
class InvestmentGoal:
capital_target: float
income_target: Optional[float] = None
risk_tolerance: RiskTolerance = RiskTolerance.MODERATE
investment_horizon: int = 5 # years
@dataclass
class ETF:
ticker: str
name: str
expense_ratio: float
aum: float
avg_volume: float
tracking_error: float
volatility: float
max_drawdown: float
sharpe_ratio: float
top_holding_weight: float
dividend_yield: float
category: str
asset_class: str
sector: str
region: str
strategy: str
class ETFCategory(Enum):
EQUITY = "Equity"
FIXED_INCOME = "Fixed Income"
COMMODITY = "Commodity"
REAL_ESTATE = "Real Estate"
CRYPTO = "Crypto"
MULTI_ASSET = "Multi-Asset"
class ETFUniverse:
def __init__(self):
self.etfs: Dict[str, ETF] = {}
self.categories: Dict[str, List[str]] = {}
self.last_updated: Optional[str] = None

View File

@ -0,0 +1,418 @@
"""
ETF Selection Service for optimizing ETF selection based on investment goals
"""
import logging
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from datetime import datetime
import os
from .models import InvestmentGoal, RiskTolerance, ETF, ETFUniverse, ETFCategory
from .exceptions import (
ETFSelectionError, ETFDataError, ETFNotFoundError,
ValidationError, PortfolioOptimizationError
)
from .database import ETFDatabase
from .utils import (
calculate_volatility, calculate_max_drawdown,
calculate_sharpe_ratio, calculate_sortino_ratio,
calculate_portfolio_metrics, is_cache_valid
)
logger = logging.getLogger(__name__)
class ETFSelectionService:
def __init__(self, data_service):
"""
Initialize ETF Selection Service
Args:
data_service: Service that provides ETF data (must implement get_etf_list() and get_etf_data())
"""
self.data_service = data_service
self.db = ETFDatabase()
self.cache = {}
self.cache_ttl = 3600 # 1 hour cache TTL
self.selection_criteria = {
'tier1': {
'expense_ratio': 0.10, # 0.10% or less
'aum': 5_000_000_000, # $5B or more
'tracking_error': 0.05, # 0.05% or less
'avg_volume': 100_000 # 100K shares/day
},
'tier2': {
'expense_ratio': 0.25, # 0.25% or less
'aum': 1_000_000_000, # $1B or more
'tracking_error': 0.10, # 0.10% or less
'avg_volume': 50_000 # 50K shares/day
}
}
def select_etfs(self, goal: InvestmentGoal) -> List[Dict]:
"""
Select ETFs based on investment goals and risk tolerance
Returns a list of recommended ETFs with allocation percentages
"""
try:
logger.info(f"Starting ETF selection with goal: {goal}")
# Validate investment goal
self._validate_investment_goal(goal)
# Get ETF data as DataFrame
etfs_df = self._get_etfs_dataframe()
if etfs_df.empty:
raise ETFDataError("No ETFs available for selection")
logger.info(f"Found {len(etfs_df)} available ETFs")
# Filter ETFs based on criteria
filtered_df = self._filter_etfs_vectorized(etfs_df, goal)
if filtered_df.empty:
raise ETFSelectionError(
"No ETFs passed the filtering criteria. Please try adjusting your risk tolerance or investment goals."
)
logger.info(f"{len(filtered_df)} ETFs passed filtering criteria")
# Score ETFs based on criteria
scored_df = self._score_etfs_vectorized(filtered_df, goal)
if scored_df.empty:
raise ETFSelectionError(
"No ETFs passed the scoring criteria. Please try adjusting your risk tolerance or investment goals."
)
logger.info(f"{len(scored_df)} ETFs passed scoring criteria")
# Optimize portfolio allocation
portfolio = self._optimize_portfolio_vectorized(scored_df, goal)
if portfolio.empty:
raise PortfolioOptimizationError(
"Failed to optimize portfolio allocation. Please try adjusting your investment goals or risk tolerance."
)
logger.info(f"Successfully generated portfolio with {len(portfolio)} ETFs")
return portfolio.to_dict('records')
except ETFSelectionError as e:
logger.error(str(e))
raise
except Exception as e:
error_msg = f"Unexpected error during ETF selection: {str(e)}"
logger.error(error_msg, exc_info=True)
raise ETFSelectionError(error_msg)
def _validate_investment_goal(self, goal: InvestmentGoal) -> None:
"""Validate investment goal parameters"""
if goal.capital_target <= 0:
raise ValidationError("Capital target must be greater than 0")
if goal.income_target and goal.income_target <= 0:
raise ValidationError("Income target must be greater than 0")
if not isinstance(goal.risk_tolerance, RiskTolerance):
raise ValidationError("Risk tolerance must be a valid RiskTolerance enum value")
def _get_etfs_dataframe(self) -> pd.DataFrame:
"""Get ETF data as DataFrame with caching"""
cache_key = 'etfs_dataframe'
# Check cache first
if cache_key in self.cache:
cache_data, timestamp = self.cache[cache_key]
if is_cache_valid(timestamp, self.cache_ttl):
return cache_data
try:
# Get list of ETFs
etf_list = self.data_service.get_etf_list()
if not etf_list:
raise ETFDataError("No ETFs available from data service")
# Batch fetch ETF data
etf_data = []
for ticker in etf_list:
try:
data = self.data_service.get_etf_data(ticker)
if data:
etf_data.append({
'ticker': ticker,
'name': data.get('info', {}).get('longName', ticker),
'expense_ratio': data.get('info', {}).get('annualReportExpenseRatio', 0.5) / 100,
'aum': data.get('info', {}).get('totalAssets', 0),
'avg_volume': data.get('info', {}).get('averageVolume', 0),
'tracking_error': 0.0, # Not available from yfinance
'volatility': float(data.get('volatility', 0)),
'max_drawdown': float(data.get('max_drawdown', 0)),
'sharpe_ratio': float(data.get('sharpe_ratio', 0)),
'top_holding_weight': 0.0, # Not available from yfinance
'dividend_yield': float(data.get('dividend_yield', 0)) / 100,
'category': data.get('info', {}).get('category', 'Unknown'),
'asset_class': data.get('info', {}).get('assetClass', 'Unknown'),
'sector': data.get('info', {}).get('sector', 'Unknown'),
'region': data.get('info', {}).get('region', 'Unknown'),
'strategy': data.get('info', {}).get('strategy', 'Unknown')
})
except Exception as e:
logger.warning(f"Error processing ETF {ticker}: {str(e)}")
continue
if not etf_data:
raise ETFDataError("No ETFs could be processed")
# Convert to DataFrame
df = pd.DataFrame(etf_data)
# Cache the result
self.cache[cache_key] = (df, datetime.now().isoformat())
return df
except Exception as e:
raise ETFDataError(f"Error fetching ETF data: {str(e)}")
def _filter_etfs_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
"""Filter ETFs using vectorized operations"""
# Get criteria based on risk tolerance
criteria = self.selection_criteria['tier1' if goal.risk_tolerance == RiskTolerance.CONSERVATIVE else 'tier2']
# Create boolean masks for filtering
mask = (
(df['expense_ratio'] <= criteria['expense_ratio']) &
(df['aum'] >= criteria['aum']) &
(df['tracking_error'] <= criteria['tracking_error']) &
(df['avg_volume'] >= criteria['avg_volume'])
)
return df[mask]
def _score_etfs_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
"""Score ETFs using vectorized operations"""
# Calculate risk score
df['risk_score'] = (
df['volatility'] * 0.3 +
df['max_drawdown'] * 0.3 +
(1 - df['sharpe_ratio']) * 0.4
)
# Calculate income score if income target is specified
if goal.income_target:
df['income_score'] = df['dividend_yield'] / goal.income_target
# Calculate final score
if goal.income_target:
df['final_score'] = (
(1 - df['risk_score']) * 0.6 +
df['income_score'] * 0.4
)
else:
df['final_score'] = 1 - df['risk_score']
# Normalize scores
df['final_score'] = (df['final_score'] - df['final_score'].min()) / (df['final_score'].max() - df['final_score'].min())
# Filter out ETFs with low scores
return df[df['final_score'] >= 0.5]
def _optimize_portfolio_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
"""Optimize portfolio allocation using vectorized operations"""
# Sort by final score
df = df.sort_values('final_score', ascending=False)
# Select top ETFs based on risk tolerance
n_etfs = 5 if goal.risk_tolerance == RiskTolerance.CONSERVATIVE else 10
df = df.head(n_etfs)
# Calculate weights based on scores
df['weight'] = df['final_score'] / df['final_score'].sum()
# Add allocation amount (already in percentage)
df['allocation'] = df['weight'] * 100 # Convert to percentage
df['amount'] = df['weight'] * goal.capital_target # Calculate actual amount
# Add metrics dictionary for each ETF
df['metrics'] = df.apply(lambda row: {
'expense_ratio': row['expense_ratio'],
'aum': row['aum'],
'volatility': row['volatility'],
'max_drawdown': row['max_drawdown'],
'sharpe_ratio': row['sharpe_ratio'],
'dividend_yield': row['dividend_yield']
}, axis=1)
return df[['ticker', 'name', 'allocation', 'amount', 'metrics']]
def _update_etf_universe(self):
"""Update ETF universe with latest data"""
try:
# Get list of ETFs to update
etfs_to_update = self._get_etfs_to_update()
# Update each ETF
for ticker in etfs_to_update:
try:
etf = self._fetch_etf_data(ticker)
if etf:
self.db.save_etf(etf)
except Exception as e:
logger.warning(f"Failed to update ETF {ticker}: {str(e)}")
continue
# Update universe metadata
self._update_universe_metadata()
except Exception as e:
raise DataUpdateError(f"Failed to update ETF universe: {str(e)}")
def _get_etfs_to_update(self) -> List[str]:
"""Get list of ETFs that need updating"""
try:
# Get current universe
universe = self.db.get_universe()
# Get list of all ETFs
all_etfs = self.data_service.get_etf_list()
# Find ETFs that need updating
etfs_to_update = []
for ticker in all_etfs:
if ticker not in universe.etfs:
etfs_to_update.append(ticker)
else:
# Check if data is stale
etf = universe.etfs[ticker]
if not is_cache_valid(etf.last_updated, self.cache_ttl):
etfs_to_update.append(ticker)
return etfs_to_update
except Exception as e:
raise DataUpdateError(f"Failed to get ETFs to update: {str(e)}")
def _fetch_etf_data(self, ticker: str) -> Optional[ETF]:
"""Fetch ETF data from data service"""
try:
data = self.data_service.get_etf_data(ticker)
if not data:
return None
info = data.get('info', {})
return ETF(
ticker=ticker,
name=info.get('longName', ticker),
expense_ratio=info.get('annualReportExpenseRatio', 0.5) / 100,
aum=info.get('totalAssets', 0),
avg_volume=info.get('averageVolume', 0),
tracking_error=0.0, # Not available from yfinance
volatility=float(data.get('volatility', 0)),
max_drawdown=float(data.get('max_drawdown', 0)),
sharpe_ratio=float(data.get('sharpe_ratio', 0)),
top_holding_weight=0.0, # Not available from yfinance
dividend_yield=float(data.get('dividend_yield', 0)) / 100,
category=info.get('category', 'Unknown'),
asset_class=info.get('assetClass', 'Unknown'),
sector=info.get('sector', 'Unknown'),
region=info.get('region', 'Unknown'),
strategy=info.get('strategy', 'Unknown')
)
except Exception as e:
logger.warning(f"Failed to fetch ETF data for {ticker}: {str(e)}")
return None
def _determine_category(self, info: Dict) -> ETFCategory:
"""Determine ETF category from info"""
category = info.get('category', '').lower()
if 'equity' in category:
return ETFCategory.EQUITY
elif 'fixed income' in category or 'bond' in category:
return ETFCategory.FIXED_INCOME
elif 'commodity' in category:
return ETFCategory.COMMODITY
elif 'real estate' in category:
return ETFCategory.REAL_ESTATE
elif 'crypto' in category:
return ETFCategory.CRYPTO
else:
return ETFCategory.MULTI_ASSET
def _determine_risk_level(self, volatility: float) -> str:
"""Determine risk level based on volatility"""
if volatility < 0.1:
return 'Low'
elif volatility < 0.2:
return 'Medium'
else:
return 'High'
def _update_universe_metadata(self):
"""Update ETF universe metadata"""
try:
universe = self.db.get_universe()
# Group ETFs by category
categories = {}
for etf in universe.etfs.values():
if etf.category not in categories:
categories[etf.category] = []
categories[etf.category].append(etf.ticker)
# Update categories
universe.categories = categories
universe.last_updated = datetime.now().isoformat()
# Save to database
self.db.save_universe(universe)
except Exception as e:
raise DataUpdateError(f"Failed to update universe metadata: {str(e)}")
def get_etf_universe(self) -> ETFUniverse:
"""Get current ETF universe"""
try:
return self.db.get_universe()
except Exception as e:
raise DatabaseError(f"Failed to get ETF universe: {str(e)}")
def search_etfs(self,
category: Optional[ETFCategory] = None,
risk_level: Optional[str] = None,
min_dividend_yield: Optional[float] = None,
max_expense_ratio: Optional[float] = None) -> List[ETF]:
"""Search ETFs based on criteria"""
try:
universe = self.get_etf_universe()
# Filter ETFs
filtered_etfs = []
for etf in universe.etfs.values():
if category and etf.category != category.value:
continue
if risk_level and self._determine_risk_level(etf.volatility) != risk_level:
continue
if min_dividend_yield and etf.dividend_yield < min_dividend_yield:
continue
if max_expense_ratio and etf.expense_ratio > max_expense_ratio:
continue
filtered_etfs.append(etf)
return filtered_etfs
except Exception as e:
raise ETFSelectionError(f"Failed to search ETFs: {str(e)}")
def get_etf_details(self, ticker: str) -> Optional[ETF]:
"""Get detailed ETF information"""
try:
return self.db.get_etf(ticker)
except Exception as e:
raise ETFNotFoundError(f"Failed to get ETF details for {ticker}: {str(e)}")
def clear_cache(self):
"""Clear service cache"""
self.cache.clear()

View File

@ -0,0 +1,49 @@
"""
Utility functions for ETF selection service
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
def calculate_volatility(returns: pd.Series) -> float:
"""Calculate annualized volatility from returns series."""
return returns.std() * np.sqrt(252)
def calculate_max_drawdown(prices: pd.Series) -> float:
"""Calculate maximum drawdown from price series."""
return (prices / prices.cummax() - 1).min()
def calculate_sharpe_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float:
"""Calculate Sharpe ratio from returns series."""
excess_returns = returns - risk_free_rate/252
if len(excess_returns) < 2:
return 0.0
return np.sqrt(252) * excess_returns.mean() / excess_returns.std()
def calculate_sortino_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float:
"""Calculate Sortino ratio from returns series."""
excess_returns = returns - risk_free_rate/252
downside_returns = excess_returns[excess_returns < 0]
if len(downside_returns) < 2:
return 0.0
return np.sqrt(252) * excess_returns.mean() / downside_returns.std()
def calculate_portfolio_metrics(weights: np.ndarray, returns: pd.DataFrame) -> Dict[str, float]:
"""Calculate portfolio metrics from weights and returns."""
portfolio_returns = returns.dot(weights)
return {
'volatility': calculate_volatility(portfolio_returns),
'max_drawdown': calculate_max_drawdown(portfolio_returns.cumsum()),
'sharpe_ratio': calculate_sharpe_ratio(portfolio_returns),
'sortino_ratio': calculate_sortino_ratio(portfolio_returns)
}
def is_cache_valid(timestamp: str, ttl_seconds: int) -> bool:
"""Check if cached data is still valid based on TTL."""
try:
cache_time = datetime.fromisoformat(timestamp)
return datetime.now() - cache_time < timedelta(seconds=ttl_seconds)
except (ValueError, TypeError):
return False

BIN
etf_data.db Normal file

Binary file not shown.

View File

@ -21,6 +21,9 @@ import traceback
from dotenv import load_dotenv
import re
from ETF_Portal.services.drip_service import DRIPService, DripConfig
from ETF_Portal.services.etf_selection_service import ETFSelectionService, InvestmentGoal, RiskTolerance
from ETF_Portal.services.nav_erosion_service import NavErosionService
from ETF_Portal.services.data_service import DataService
# Load environment variables
load_dotenv(override=True) # Force reload of environment variables
@ -1132,60 +1135,20 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
return
try:
# Calculate key metrics
total_capital = final_alloc["Capital Allocated ($)"].sum()
total_income = final_alloc["Income Contributed ($)"].sum()
# Calculate weighted average yield
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
# Display metrics in columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Capital", format_large_number(total_capital))
with col2:
st.metric("Annual Income", format_large_number(total_income))
st.metric("Monthly Income", format_large_number(total_income/12))
with col3:
st.metric("Average Yield", f"{weighted_yield:.2f}%")
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
# Display allocation chart
fig = px.pie(
final_alloc,
values="Allocation (%)",
names="Ticker",
title="Portfolio Allocation by ETF",
hover_data={
"Ticker": True,
"Allocation (%)": ":.2f",
"Yield (%)": ":.2f",
"Capital Allocated ($)": ":,.2f",
"Income Contributed ($)": ":,.2f"
}
)
st.plotly_chart(fig, use_container_width=True)
# Display detailed allocation table
st.subheader("Detailed Allocation")
# Use the allocation from session state for all calculations and display
display_df = final_alloc.copy()
numeric_columns = ["Allocation (%)", "Yield (%)", "Price", "Shares", "Capital Allocated ($)", "Income Contributed ($)"]
for col in numeric_columns:
if col in display_df.columns:
display_df[col] = pd.to_numeric(display_df[col], errors='coerce')
display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
# Format large numbers in the display DataFrame
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(format_large_number)
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(format_large_number)
display_df["Monthly Income"] = display_df["Monthly Income"].apply(format_large_number)
# Ensure data_source column exists and rename it for display
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${float(x):,.2f}")
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${float(x):,.2f}")
display_df["Monthly Income"] = display_df["Monthly Income"].apply(lambda x: f"${float(x):,.2f}")
if "data_source" in display_df.columns:
display_df = display_df.rename(columns={"data_source": "Data Source"})
else:
display_df["Data Source"] = "Unknown"
# Select and order columns for display
display_columns = [
"Ticker",
"Allocation (%)",
@ -1198,15 +1161,43 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
"Risk Level",
"Data Source"
]
# Format the display
st.dataframe(
display_df[display_columns].style.format({
"Allocation (%)": "{:.2f}%",
"Yield (%)": "{:.2f}%",
"Price": "${:,.2f}",
"Shares": "{:,.4f}"
}),
editor_key = "allocation_editor"
def parse_currency(val):
if isinstance(val, str):
return float(val.replace('$', '').replace(',', ''))
return float(val)
# --- Portfolio Summary Metrics (use session state allocation) ---
total_capital = display_df["Capital Allocated ($)"].apply(parse_currency).sum()
total_income = display_df["Income Contributed ($)"].apply(parse_currency).sum()
weighted_yield = (display_df["Allocation (%)"] * display_df["Yield (%)"]).sum() / 100
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Capital", f"${total_capital:,.2f}")
with col2:
st.metric("Annual Income", f"${total_income:,.2f}")
st.metric("Monthly Income", f"${total_income/12:,.2f}")
with col3:
st.metric("Average Yield", f"{weighted_yield:.2f}%")
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
# --- Pie chart (use session state allocation) ---
fig = px.pie(
display_df,
values="Allocation (%)",
names="Ticker",
title="Portfolio Allocation by ETF",
hover_data={
"Ticker": True,
"Allocation (%)": ":.2f",
"Yield (%)": ":.2f",
"Capital Allocated ($)": ":,.2f",
"Income Contributed ($)": ":,.2f"
}
)
st.plotly_chart(fig, use_container_width=True)
# --- Editable allocation table below the pie chart ---
st.subheader("Detailed Allocation")
edited_df = st.data_editor(
display_df[display_columns],
column_config={
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
"Allocation (%)": st.column_config.NumberColumn(
@ -1217,9 +1208,21 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
format="%.1f",
required=True
),
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
"Price": st.column_config.TextColumn("Price", disabled=True),
"Shares": st.column_config.TextColumn("Shares", disabled=True),
"Yield (%)": st.column_config.NumberColumn(
"Yield (%)",
format="%.2f",
disabled=True
),
"Price": st.column_config.NumberColumn(
"Price",
format="%.2f",
disabled=True
),
"Shares": st.column_config.NumberColumn(
"Shares",
format="%.2f",
disabled=True
),
"Capital Allocated ($)": st.column_config.TextColumn("Capital Allocated ($)", disabled=True),
"Monthly Income": st.column_config.TextColumn("Monthly Income", disabled=True),
"Income Contributed ($)": st.column_config.TextColumn("Income Contributed ($)", disabled=True),
@ -1227,9 +1230,35 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
"Data Source": st.column_config.TextColumn("Data Source", disabled=True)
},
hide_index=True,
use_container_width=True
use_container_width=True,
key=editor_key
)
edited_df = edited_df.reset_index(drop=True)
# --- Update Allocations Button ---
if st.button("Update Allocations", type="primary"):
try:
edited_df["Allocation (%)"] = pd.to_numeric(edited_df["Allocation (%)"], errors='coerce')
total_allocation = edited_df["Allocation (%)"].sum()
if abs(total_allocation - 100.0) > 0.01:
st.error(f"Total allocation must be 100%. Current total: {total_allocation:.2f}%")
else:
edited_df["Capital Allocated ($)"] = edited_df["Capital Allocated ($)"].apply(parse_currency)
edited_df["Income Contributed ($)"] = edited_df["Income Contributed ($)"].apply(parse_currency)
edited_df["Monthly Income"] = edited_df["Monthly Income"].apply(parse_currency)
total_capital = edited_df["Capital Allocated ($)"].sum()
for idx, row in edited_df.iterrows():
edited_df.at[idx, "Capital Allocated ($)"] = total_capital * (row["Allocation (%)"] / 100)
edited_df.at[idx, "Income Contributed ($)"] = edited_df.at[idx, "Capital Allocated ($)"] * (row["Yield (%)"] / 100)
edited_df.at[idx, "Monthly Income"] = edited_df.at[idx, "Income Contributed ($)"] / 12
edited_df.at[idx, "Shares"] = edited_df.at[idx, "Capital Allocated ($)"] / row["Price"]
# Update the session state with new allocations and trigger rerun
st.session_state.final_alloc = edited_df
st.success("Allocations updated successfully! Click 'Run Portfolio Simulation' to recalculate.")
st.rerun()
except Exception as e:
st.error(f"Error updating allocations: {str(e)}")
logger.error(f"Error in allocation update: {str(e)}")
logger.error(traceback.format_exc())
except Exception as e:
st.error(f"Error calculating portfolio summary: {str(e)}")
logger.error(f"Error in portfolio_summary: {str(e)}")
@ -1373,7 +1402,7 @@ def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[D
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
if mask.any():
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] # Already in percentage
else:
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
@ -1392,7 +1421,7 @@ def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[D
logger.error("Weighted yield is zero")
return None
# Calculate required capital based on weighted yield (convert percentage to decimal)
# Calculate required capital based on weighted yield
required_capital = annual_income / (weighted_yield / 100)
# Calculate capital allocation and income
@ -1439,7 +1468,7 @@ def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocatio
for alloc in etf_allocations:
mask = final_alloc["Ticker"] == alloc["ticker"]
if mask.any():
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] # Already in percentage
else:
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
@ -1659,45 +1688,28 @@ def remove_ticker(ticker_to_remove: str) -> None:
# Display current tickers in the main space
if st.session_state.etf_allocations:
st.subheader("Selected ETFs")
st.markdown("""
<style>
.ticker-container {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin-bottom: 20px;
}
.ticker-item {
display: inline-flex;
align-items: center;
color: #2ecc71;
font-size: 1.1em;
font-weight: 500;
}
.ticker-close {
margin-left: 2px;
cursor: pointer;
font-size: 1.1em;
opacity: 0.7;
}
.ticker-close:hover {
opacity: 1;
}
</style>
""", unsafe_allow_html=True)
# Create a container for tickers
ticker_container = st.container()
with ticker_container:
# Display each ticker with a close button
for etf in st.session_state.etf_allocations:
col1, col2 = st.columns([0.05, 0.95]) # Adjusted column ratio
with col1:
# Create four columns for the tables with smaller widths
col1, col2, col3, col4 = st.columns([1, 1, 1, 1])
# Split the ETFs into four groups
etf_groups = [[] for _ in range(4)]
for i, etf in enumerate(st.session_state.etf_allocations):
group_index = i % 4
etf_groups[group_index].append(etf)
# Display each group in its own column
for i, (col, etf_group) in enumerate(zip([col1, col2, col3, col4], etf_groups)):
with col:
for etf in etf_group:
st.markdown(f"""
<div style="display: flex; align-items: center; gap: 2px; margin-bottom: -15px;">
<span style="color: #2ecc71; font-weight: 500;">{etf['ticker']}</span>
</div>
""", unsafe_allow_html=True)
if st.button("×", key=f"remove_{etf['ticker']}",
help=f"Remove {etf['ticker']} from portfolio"):
remove_ticker(etf['ticker'])
with col2:
st.markdown(f"<div class='ticker-item'>{etf['ticker']}</div>", unsafe_allow_html=True)
# Debug information
logger.info("=== Session State Debug ===")
@ -1829,7 +1841,15 @@ with st.sidebar:
# Create a container for ETF input
with st.container():
# Input field for ETF ticker only
# Input field for ETF ticker with improved visibility
st.markdown("""
<style>
.stTextInput input {
color: #2ecc71 !important;
font-weight: 500 !important;
}
</style>
""", unsafe_allow_html=True)
new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., SCHD)")
# Add button to add ETF
@ -2116,15 +2136,24 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
st.subheader("📈 Strategy Performance Summary")
col1, col2, col3, col4 = st.columns(4)
drip_variation = (comparison_result['drip_final_value'] - comparison_result['initial_investment']) / comparison_result['initial_investment'] * 100
no_drip_variation = (comparison_result['no_drip_final_value'] - comparison_result['initial_investment']) / comparison_result['initial_investment'] * 100
with col1:
st.metric(
"DRIP Final Value",
f"${comparison_result['drip_final_value']:,.2f}"
f"${comparison_result['drip_final_value']:,.2f}",
delta=f"{drip_variation:+.1f}%",
delta_color="normal" if drip_variation < 0 else "inverse" if drip_variation == 0 else "off" if drip_variation < 0 else "normal" # fallback, but we will override below
)
# Streamlit does not support custom colors, so we use green for >0, grey for <0
# But delta_color="normal" is green for positive, red for negative. We'll use normal for green, off for grey.
with col2:
st.metric(
"No-DRIP Final Value",
f"${comparison_result['no_drip_final_value']:,.2f}"
f"${comparison_result['no_drip_final_value']:,.2f}",
delta=f"{no_drip_variation:+.1f}%",
delta_color="normal" if no_drip_variation > 0 else "off"
)
with col3:
winner = comparison_result['winner']
@ -2848,8 +2877,275 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
st.error("Unable to analyze ETF erosion risk. Please try again.")
with tab4:
st.subheader("🤖 AI Suggestions")
st.write("This tab will contain AI suggestions for portfolio optimization.")
st.subheader("🤖 AI Portfolio Suggestions")
try:
# Get values from session state
capital_target = st.session_state.initial_capital if st.session_state.mode == "Capital Target" else 3000.0
income_target = st.session_state.target * 12 if st.session_state.mode == "Income Target" else 0.0
risk_tolerance = st.session_state.risk_tolerance
investment_horizon = 5 # Default to 5 years if not specified
# Initialize services
from ETF_Portal.services.data_service import DataService
from ETF_Portal.services.etf_selection_service import ETFSelectionService
from ETF_Portal.services.etf_selection_service import InvestmentGoal, RiskTolerance
data_service = DataService()
selection_service = ETFSelectionService(data_service)
# Create investment goal
goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target if income_target > 0 else None,
risk_tolerance=RiskTolerance[risk_tolerance.upper()],
investment_horizon=investment_horizon
)
# Get AI suggestions for different strategies
with st.spinner("Analyzing ETFs and generating portfolio suggestions..."):
try:
# Strategy 1: Balanced Growth
balanced_goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target if income_target > 0 else None,
risk_tolerance=RiskTolerance.MODERATE,
investment_horizon=investment_horizon
)
balanced_portfolio = selection_service.select_etfs(balanced_goal)
# Strategy 2: Income Focus
income_goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target * 1.2 if income_target > 0 else capital_target * 0.06, # 6% target yield
risk_tolerance=RiskTolerance.CONSERVATIVE,
investment_horizon=investment_horizon
)
income_portfolio = selection_service.select_etfs(income_goal)
# Strategy 3: Growth Focus
growth_goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target * 0.8 if income_target > 0 else capital_target * 0.03, # 3% target yield
risk_tolerance=RiskTolerance.AGGRESSIVE,
investment_horizon=investment_horizon
)
growth_portfolio = selection_service.select_etfs(growth_goal)
# Strategy 4: Risk-Adjusted (uses user's risk tolerance)
risk_adjusted_goal = InvestmentGoal(
capital_target=capital_target,
income_target=income_target if income_target > 0 else None, # No default yield target
risk_tolerance=RiskTolerance[risk_tolerance.upper()],
investment_horizon=investment_horizon
)
risk_adjusted_portfolio = selection_service.select_etfs(risk_adjusted_goal)
# Create tabs for each strategy
strategy_tabs = st.tabs([
"🔄 Balanced Growth",
"💰 Income Focus",
"📈 Growth Focus",
"⚖️ Risk-Adjusted"
])
# Display Balanced Growth Strategy
with strategy_tabs[0]:
st.write("### Balanced Growth Strategy")
st.write("""
A balanced approach focusing on both growth and income, suitable for most investors.
- Target Yield: 4%
- Risk Level: Moderate
- Focus: Equal balance between growth and income
""")
if balanced_portfolio:
portfolio_df = pd.DataFrame(balanced_portfolio)
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
portfolio_df['Amount ($)'] = portfolio_df['amount']
st.dataframe(
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
hide_index=True
)
# Display metrics
metrics_df = pd.DataFrame([
{
'Ticker': etf['ticker'],
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
'AUM ($B)': etf['metrics']['aum'] / 1e9,
'Volatility (%)': etf['metrics']['volatility'] * 100,
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
'Dividend Yield (%)': etf['metrics']['dividend_yield']
}
for etf in balanced_portfolio
])
st.dataframe(metrics_df, hide_index=True)
else:
st.error("Could not generate balanced growth portfolio.")
# Display Income Focus Strategy
with strategy_tabs[1]:
st.write("### Income Focus Strategy")
st.write("""
Optimized for higher dividend income with lower risk, suitable for income-focused investors.
- Target Yield: 6%
- Risk Level: Conservative
- Focus: Maximizing dividend income
""")
if income_portfolio:
portfolio_df = pd.DataFrame(income_portfolio)
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
portfolio_df['Amount ($)'] = portfolio_df['amount']
st.dataframe(
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
hide_index=True
)
# Display metrics
metrics_df = pd.DataFrame([
{
'Ticker': etf['ticker'],
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
'AUM ($B)': etf['metrics']['aum'] / 1e9,
'Volatility (%)': etf['metrics']['volatility'] * 100,
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
'Dividend Yield (%)': etf['metrics']['dividend_yield']
}
for etf in income_portfolio
])
st.dataframe(metrics_df, hide_index=True)
else:
st.error("Could not generate income focus portfolio.")
# Display Growth Focus Strategy
with strategy_tabs[2]:
st.write("### Growth Focus Strategy")
st.write("""
Optimized for capital appreciation with higher risk tolerance, suitable for growth investors.
- Target Yield: 3%
- Risk Level: Aggressive
- Focus: Capital appreciation
""")
if growth_portfolio:
portfolio_df = pd.DataFrame(growth_portfolio)
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
portfolio_df['Amount ($)'] = portfolio_df['amount']
st.dataframe(
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
hide_index=True
)
# Display metrics
metrics_df = pd.DataFrame([
{
'Ticker': etf['ticker'],
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
'AUM ($B)': etf['metrics']['aum'] / 1e9,
'Volatility (%)': etf['metrics']['volatility'] * 100,
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
'Dividend Yield (%)': etf['metrics']['dividend_yield']
}
for etf in growth_portfolio
])
st.dataframe(metrics_df, hide_index=True)
else:
st.error("Could not generate growth focus portfolio.")
# Display Risk-Adjusted Strategy
with strategy_tabs[3]:
st.write("### Risk-Adjusted Strategy")
st.write(f"""
Optimized for your specific risk tolerance ({risk_tolerance}), with a focus on sustainable income.
- Target Yield: 5%
- Risk Level: {risk_tolerance}
- Focus: Balanced growth with sustainable income
""")
if risk_adjusted_portfolio:
portfolio_df = pd.DataFrame(risk_adjusted_portfolio)
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
portfolio_df['Amount ($)'] = portfolio_df['amount']
st.dataframe(
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
hide_index=True
)
# Display metrics
metrics_df = pd.DataFrame([
{
'Ticker': etf['ticker'],
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
'AUM ($B)': etf['metrics']['aum'] / 1e9,
'Volatility (%)': etf['metrics']['volatility'] * 100,
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
'Dividend Yield (%)': etf['metrics']['dividend_yield']
}
for etf in risk_adjusted_portfolio
])
st.dataframe(metrics_df, hide_index=True)
else:
st.error("Could not generate risk-adjusted portfolio.")
# Add buttons to apply each strategy
st.write("### Apply Strategy")
col1, col2, col3, col4 = st.columns(4)
with col1:
if st.button("Apply Balanced Growth", key="apply_balanced"):
if balanced_portfolio:
st.session_state.etf_allocations = [
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
for etf in balanced_portfolio
]
st.success("Applied Balanced Growth strategy!")
st.rerun()
with col2:
if st.button("Apply Income Focus", key="apply_income"):
if income_portfolio:
st.session_state.etf_allocations = [
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
for etf in income_portfolio
]
st.success("Applied Income Focus strategy!")
st.rerun()
with col3:
if st.button("Apply Growth Focus", key="apply_growth"):
if growth_portfolio:
st.session_state.etf_allocations = [
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
for etf in growth_portfolio
]
st.success("Applied Growth Focus strategy!")
st.rerun()
with col4:
if st.button("Apply Risk-Adjusted", key="apply_risk_adjusted"):
if risk_adjusted_portfolio:
st.session_state.etf_allocations = [
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
for etf in risk_adjusted_portfolio
]
st.success("Applied Risk-Adjusted strategy!")
st.rerun()
except ValueError as e:
st.error(str(e))
except Exception as e:
st.error(f"An unexpected error occurred: {str(e)}")
logger.error(f"Error generating portfolio suggestions: {str(e)}", exc_info=True)
except Exception as e:
st.error(f"Error initializing services: {str(e)}")
logger.error(f"Error initializing services: {str(e)}", exc_info=True)
with tab5:
st.subheader("📊 ETF Details")

View File

@ -1,4 +0,0 @@
from .service import DripService
from .models import DripConfig, DripResult, MonthlyData, PortfolioAllocation
__all__ = ['DripService', 'DripConfig', 'DripResult', 'MonthlyData', 'PortfolioAllocation']

View File

@ -1,23 +0,0 @@
import logging
import sys
def setup_logger():
# Create logger
logger = logging.getLogger('drip_service')
logger.setLevel(logging.DEBUG)
# Create console handler with formatting
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(console_handler)
return logger
# Create logger instance
logger = setup_logger()

View File

@ -1,46 +0,0 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
@dataclass
class PortfolioAllocation:
ticker: str
price: float
yield_annual: float
initial_shares: float
initial_allocation: float
distribution: str
@dataclass
class MonthlyData:
month: int
total_value: float
monthly_income: float
cumulative_income: float
shares: Dict[str, float]
prices: Dict[str, float]
yields: Dict[str, float]
@dataclass
class DripConfig:
months: int
erosion_type: str
erosion_level: Dict
dividend_frequency: Dict[str, int] = None
def __post_init__(self):
if self.dividend_frequency is None:
self.dividend_frequency = {
"Monthly": 12,
"Quarterly": 4,
"Semi-Annually": 2,
"Annually": 1,
"Unknown": 12 # Default to monthly if unknown
}
@dataclass
class DripResult:
monthly_data: List[MonthlyData]
final_portfolio_value: float
total_income: float
total_shares: Dict[str, float]

View File

@ -1,455 +0,0 @@
from typing import Dict, List, Optional, Tuple, Any
import pandas as pd
import numpy as np
import traceback
from dataclasses import dataclass, field
from enum import Enum
from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult
from ..nav_erosion_service import NavErosionService
from .logger import logger
class DistributionFrequency(Enum):
"""Enum for distribution frequencies"""
MONTHLY = ("Monthly", 12)
QUARTERLY = ("Quarterly", 4)
SEMI_ANNUALLY = ("Semi-Annually", 2)
ANNUALLY = ("Annually", 1)
UNKNOWN = ("Unknown", 12)
def __init__(self, name: str, payments_per_year: int):
self.display_name = name
self.payments_per_year = payments_per_year
@dataclass
class TickerData:
"""Data structure for individual ticker information"""
ticker: str
price: float
annual_yield: float
shares: float
allocation_pct: float
distribution_freq: DistributionFrequency
@property
def market_value(self) -> float:
return self.price * self.shares
@property
def monthly_yield(self) -> float:
return self.annual_yield / 12
@property
def distribution_yield(self) -> float:
return self.annual_yield / self.distribution_freq.payments_per_year
class DripService:
"""Enhanced DRIP calculation service with improved performance and accuracy"""
def __init__(self) -> None:
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
self.nav_erosion_service = NavErosionService()
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
"""
Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy.
Args:
portfolio_df: DataFrame containing portfolio allocation
config: DripConfig object with simulation parameters
Returns:
DripResult object containing the simulation results
"""
try:
# Validate inputs
self._validate_inputs(portfolio_df, config)
# Get erosion data from nav_erosion_service
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
logger.info(f"Erosion data results: {erosion_data.results}")
# Initialize erosion rates dictionary
erosion_rates = {}
# Use erosion rates from nav_erosion_service
for ticker in portfolio_df["Ticker"]:
# Find the result for this ticker in erosion_data.results
result = next((r for r in erosion_data.results if r.ticker == ticker), None)
if result:
erosion_rates[ticker] = {
"nav": result.monthly_nav_erosion_rate,
"yield": result.monthly_yield_erosion_rate
}
logger.info(f"=== EROSION RATE DEBUG ===")
logger.info(f"Ticker: {ticker}")
logger.info(f"Erosion rates from nav_erosion_service:")
logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}")
logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}")
logger.info(f"=== END EROSION RATE DEBUG ===\n")
else:
# Use default erosion rates if not found
erosion_rates[ticker] = {
"nav": 0.05, # 5% per month (very high, for test)
"yield": 0.07 # 7% per month (very high, for test)
}
logger.info(f"=== EROSION RATE DEBUG ===")
logger.info(f"Ticker: {ticker}")
logger.info(f"Using default erosion rates:")
logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}")
logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}")
logger.info(f"=== END EROSION RATE DEBUG ===\n")
# Log the final erosion rates dictionary
logger.info(f"Final erosion rates dictionary: {erosion_rates}")
# Initialize portfolio data
ticker_data = self._initialize_ticker_data(portfolio_df)
# Pre-calculate distribution schedule for performance
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
# Initialize simulation state
simulation_state = self._initialize_simulation_state(ticker_data)
monthly_data: List[MonthlyData] = []
# Run monthly simulation
for month in range(1, config.months + 1):
logger.info(f"\n=== Starting Month {month} ===")
logger.info(f"Initial state for month {month}:")
for ticker in ticker_data.keys():
logger.info(f" {ticker}:")
logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}")
logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}")
logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}")
month_result = self._simulate_month(
month,
simulation_state,
ticker_data,
erosion_rates,
distribution_schedule
)
monthly_data.append(month_result)
logger.info(f"Final state for month {month}:")
for ticker in ticker_data.keys():
logger.info(f" {ticker}:")
logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}")
logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}")
logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}")
logger.info(f"=== End Month {month} ===\n")
# Calculate final results
return self._create_drip_result(monthly_data, simulation_state)
except Exception as e:
logger.error(f"Error calculating DRIP growth: {str(e)}")
logger.error(traceback.format_exc())
raise
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
"""Validate input parameters"""
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
if config.months <= 0:
raise ValueError("Months must be positive")
if portfolio_df.empty:
raise ValueError("Portfolio DataFrame is empty")
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
"""Initialize ticker data with validation"""
ticker_data = {}
for _, row in portfolio_df.iterrows():
ticker = row["Ticker"]
# Handle distribution frequency
dist_period = row.get("Distribution Period", "Monthly")
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
ticker_data[ticker] = TickerData(
ticker=ticker,
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
shares=max(0.0, float(row["Shares"])),
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
distribution_freq=dist_freq
)
return ticker_data
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
"""Pre-calculate which months each ticker pays distributions"""
schedule = {}
for ticker, data in ticker_data.items():
distribution_months = []
freq = data.distribution_freq
for month in range(1, total_months + 1):
if self._is_distribution_month(month, freq):
distribution_months.append(month)
schedule[ticker] = distribution_months
return schedule
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
"""Initialize simulation state variables"""
return {
'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()},
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
'cumulative_income': 0.0
}
def _simulate_month(
self,
month: int,
state: Dict[str, Any],
ticker_data: Dict[str, TickerData],
erosion_rates: Dict[str, Dict[str, float]],
distribution_schedule: Dict[str, List[int]]
) -> MonthlyData:
"""Simulate a single month with improved accuracy"""
# Debug logging for erosion rates
logger.info(f"\n=== EROSION RATES DEBUG ===")
logger.info(f"Erosion rates dictionary: {erosion_rates}")
for ticker, rates in erosion_rates.items():
logger.info(f" {ticker}:")
logger.info(f" nav: {rates['nav']:.4%}")
logger.info(f" yield: {rates['yield']:.4%}")
logger.info(f"=== END EROSION RATES DEBUG ===\n")
# Apply erosion first
for ticker, rates in erosion_rates.items():
if ticker in state['current_prices']:
# Get monthly erosion rates (already in decimal form)
monthly_nav_erosion = rates['nav']
monthly_yield_erosion = rates['yield']
# Get current values
old_price = state['current_prices'][ticker]
old_yield = state['current_yields'][ticker]
# Debug logging
logger.info(f"\n=== EROSION CALCULATION DEBUG ===")
logger.info(f"Ticker: {ticker}")
logger.info(f"Raw erosion rates from nav_erosion_service:")
logger.info(f" monthly_nav_erosion: {monthly_nav_erosion:.4%}")
logger.info(f" monthly_yield_erosion: {monthly_yield_erosion:.4%}")
logger.info(f"Current values:")
logger.info(f" old_price: ${old_price:.4f}")
logger.info(f" old_yield: {old_yield:.4%}")
# Calculate new values
new_price = old_price * (1 - monthly_nav_erosion)
new_yield = old_yield * (1 - monthly_yield_erosion)
logger.info(f"Calculated new values:")
logger.info(f" new_price = ${old_price:.4f} * (1 - {monthly_nav_erosion:.4%})")
logger.info(f" new_price = ${old_price:.4f} * {1 - monthly_nav_erosion:.4f}")
logger.info(f" new_price = ${new_price:.4f}")
logger.info(f" new_yield = {old_yield:.4%} * (1 - {monthly_yield_erosion:.4%})")
logger.info(f" new_yield = {old_yield:.4%} * {1 - monthly_yield_erosion:.4f}")
logger.info(f" new_yield = {new_yield:.4%}")
# Apply the new values with bounds checking
state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices
state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields
logger.info(f"Final values after bounds checking:")
logger.info(f" final_price: ${state['current_prices'][ticker]:.4f}")
logger.info(f" final_yield: {state['current_yields'][ticker]:.4%}")
logger.info(f"=== END EROSION CALCULATION DEBUG ===\n")
# Log the actual erosion being applied
logger.info(f"Applied erosion to {ticker}:")
logger.info(f" NAV: {monthly_nav_erosion:.4%} -> New price: ${state['current_prices'][ticker]:.2f}")
logger.info(f" Yield: {monthly_yield_erosion:.4%} -> New yield: {state['current_yields'][ticker]:.2%}")
# Calculate monthly income from distributions using eroded values
monthly_income = self._calculate_monthly_distributions(
month, state, ticker_data, distribution_schedule
)
# Update cumulative income
state['cumulative_income'] += monthly_income
# Reinvest dividends (DRIP)
self._reinvest_dividends(month, state, distribution_schedule)
# Calculate total portfolio value with bounds checking
total_value = 0.0
for ticker in ticker_data.keys():
shares = state['current_shares'][ticker]
price = state['current_prices'][ticker]
if shares > 0 and price > 0:
total_value += shares * price
return MonthlyData(
month=month,
total_value=total_value,
monthly_income=monthly_income,
cumulative_income=state['cumulative_income'],
shares=state['current_shares'].copy(),
prices=state['current_prices'].copy(),
yields=state['current_yields'].copy()
)
def _calculate_monthly_distributions(
self,
month: int,
state: Dict[str, Any],
ticker_data: Dict[str, TickerData],
distribution_schedule: Dict[str, List[int]]
) -> float:
"""Calculate distributions for the current month"""
monthly_income = 0.0
for ticker, data in ticker_data.items():
if month in distribution_schedule[ticker]:
shares = state['current_shares'][ticker]
price = state['current_prices'][ticker]
yield_rate = state['current_yields'][ticker]
# Calculate distribution amount using annual yield divided by payments per year
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
distribution_amount = shares * price * distribution_yield
monthly_income += distribution_amount
return monthly_income
def _reinvest_dividends(
self,
month: int,
state: Dict[str, Any],
distribution_schedule: Dict[str, List[int]]
) -> None:
"""Reinvest dividends for tickers that distributed in this month"""
for ticker, distribution_months in distribution_schedule.items():
if month in distribution_months:
shares = state['current_shares'][ticker]
price = state['current_prices'][ticker]
yield_rate = state['current_yields'][ticker]
# Calculate dividend income using the correct distribution frequency
freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY)
dividend_income = shares * price * (yield_rate / freq.payments_per_year)
# Purchase additional shares
if price > 0:
new_shares = dividend_income / price
state['current_shares'][ticker] += new_shares
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
"""Check if current month is a distribution month"""
if frequency == DistributionFrequency.MONTHLY:
return True
elif frequency == DistributionFrequency.QUARTERLY:
return month % 3 == 0
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
return month % 6 == 0
elif frequency == DistributionFrequency.ANNUALLY:
return month % 12 == 0
else:
return True # Default to monthly for unknown
def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult:
"""Create final DRIP result object"""
if not monthly_data:
raise ValueError("No monthly data generated")
final_data = monthly_data[-1]
return DripResult(
monthly_data=monthly_data,
final_portfolio_value=final_data.total_value,
total_income=final_data.cumulative_income,
total_shares=state['current_shares'].copy()
)
# Utility methods for analysis and comparison
def calculate_drip_vs_no_drip_comparison(
self,
portfolio_df: pd.DataFrame,
config: DripConfig
) -> Dict[str, Any]:
"""Calculate comparison between DRIP and no-DRIP scenarios"""
# Calculate DRIP scenario
drip_result = self.calculate_drip_growth(portfolio_df, config)
# Calculate no-DRIP scenario (dividends not reinvested)
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
# Calculate comparison metrics
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
return {
'drip_final_value': drip_result.final_portfolio_value,
'no_drip_final_value': no_drip_result['final_value'],
'drip_advantage': drip_advantage,
'percentage_advantage': percentage_advantage,
'total_dividends_reinvested': drip_result.total_income,
'cash_dividends_no_drip': no_drip_result['total_dividends']
}
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
"""Calculate scenario where dividends are not reinvested"""
ticker_data = self._initialize_ticker_data(portfolio_df)
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
erosion_rates = {
result.ticker: {
"nav": result.monthly_nav_erosion_rate,
"yield": result.monthly_yield_erosion_rate
}
for result in erosion_data.results
}
state = self._initialize_simulation_state(ticker_data)
total_dividends = 0.0
for month in range(1, config.months + 1):
# Calculate dividends but don't reinvest
monthly_dividends = self._calculate_monthly_distributions(
month, state, ticker_data,
self._create_distribution_schedule(ticker_data, config.months)
)
total_dividends += monthly_dividends
# Apply erosion
for ticker, rates in erosion_rates.items():
if ticker in state['current_prices']:
# Get monthly erosion rates (already in decimal form)
monthly_nav_erosion = rates['nav']
monthly_yield_erosion = rates['yield']
# Apply NAV erosion (decrease price)
old_price = state['current_prices'][ticker]
new_price = old_price * (1 - monthly_nav_erosion)
state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices
# Apply yield erosion (decrease yield)
old_yield = state['current_yields'][ticker]
new_yield = old_yield * (1 - monthly_yield_erosion)
state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields
final_value = sum(
state['current_shares'][ticker] * state['current_prices'][ticker]
for ticker in ticker_data.keys()
)
return {
'final_value': final_value,
'total_dividends': total_dividends
}

View File

@ -1,8 +0,0 @@
"""
Nav Erosion Service package
"""
from .service import NavErosionService
from .models import NavErosionResult
__all__ = ['NavErosionService', 'NavErosionResult']

View File

@ -1,31 +0,0 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
@dataclass
class NavErosionConfig:
max_erosion_level: int = 9
max_monthly_erosion: float = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
use_per_ticker: bool = False
global_nav_rate: float = 0
per_ticker_rates: Dict[str, float] = None
@dataclass
class NavErosionResult:
ticker: str
nav_erosion_rate: float
monthly_erosion_rate: float
annual_erosion_rate: float
risk_level: int # 0-9 scale
risk_explanation: str
max_drawdown: float
volatility: float
is_new_etf: bool
etf_age_years: Optional[float]
@dataclass
class NavErosionAnalysis:
results: List[NavErosionResult]
portfolio_nav_risk: float # Average risk level
portfolio_erosion_rate: float # Weighted average erosion rate
risk_summary: str

View File

@ -1,209 +0,0 @@
from typing import Dict, List, Tuple
from .models import NavErosionConfig, NavErosionResult, NavErosionAnalysis
from enum import Enum
from dataclasses import dataclass
import streamlit as st
class ETFType(Enum):
INCOME = "Income"
GROWTH = "Growth"
BALANCED = "Balanced"
@dataclass
class NavErosionResult:
"""Result of NAV erosion analysis for a single ETF"""
ticker: str
nav_erosion_rate: float # Annual NAV erosion rate
yield_erosion_rate: float # Annual yield erosion rate
monthly_nav_erosion_rate: float # Monthly NAV erosion rate
monthly_yield_erosion_rate: float # Monthly yield erosion rate
risk_level: int
risk_explanation: str
max_drawdown: float
volatility: float
is_new_etf: bool
etf_age_years: float
@dataclass
class NavErosionAnalysis:
"""Complete NAV erosion analysis results"""
results: List[NavErosionResult]
portfolio_nav_risk: float = 0.0
portfolio_erosion_rate: float = 0.0
risk_summary: str = ""
class NavErosionService:
def __init__(self):
self.NAV_COMPONENT_WEIGHTS = {
'drawdown': 0.4,
'volatility': 0.3,
'sharpe': 0.15,
'sortino': 0.15
}
# Default erosion rates based on risk level (0-9)
self.RISK_TO_EROSION = {
0: 0.01, # 1% annual
1: 0.02, # 2% annual
2: 0.03, # 3% annual
3: 0.04, # 4% annual
4: 0.05, # 5% annual
5: 0.06, # 6% annual
6: 0.07, # 7% annual
7: 0.08, # 8% annual
8: 0.09, # 9% annual
9: 0.10 # 10% annual
}
def analyze_etf_erosion_risk(self, tickers: List[str]) -> NavErosionAnalysis:
"""Analyze NAV erosion risk for a list of ETFs"""
results = []
print("\n=== NAV EROSION SERVICE DEBUG ===")
print(f"Session state keys: {st.session_state.keys()}")
print(f"Erosion level from session state: {st.session_state.get('erosion_level')}")
for ticker in tickers:
# Get erosion rates from session state
erosion_level = st.session_state.get('erosion_level', {'nav': 5.0, 'yield': 5.0})
annual_nav_erosion = erosion_level['nav'] / 100 # Convert from percentage to decimal
annual_yield_erosion = erosion_level['yield'] / 100 # Convert from percentage to decimal
# Convert annual rates to monthly
monthly_nav_erosion = 1 - (1 - annual_nav_erosion) ** (1/12)
monthly_yield_erosion = 1 - (1 - annual_yield_erosion) ** (1/12)
print(f"\n=== NAV EROSION SERVICE DEBUG ===")
print(f"Ticker: {ticker}")
print(f"Session State Values:")
print(f" Annual NAV Erosion: {annual_nav_erosion:.4%}")
print(f" Annual Yield Erosion: {annual_yield_erosion:.4%}")
print(f" Monthly NAV Erosion: {monthly_nav_erosion:.4%}")
print(f" Monthly Yield Erosion: {monthly_yield_erosion:.4%}")
print(f"=== END NAV EROSION SERVICE DEBUG ===\n")
result = NavErosionResult(
ticker=ticker,
nav_erosion_rate=annual_nav_erosion,
yield_erosion_rate=annual_yield_erosion,
monthly_nav_erosion_rate=monthly_nav_erosion,
monthly_yield_erosion_rate=monthly_yield_erosion,
risk_level=5, # Arbitrary risk level
risk_explanation="Using erosion rates from session state",
max_drawdown=0.2,
volatility=0.25,
is_new_etf=False,
etf_age_years=1.0
)
results.append(result)
print(f"Created NavErosionResult for {ticker}:")
print(f" monthly_nav_erosion_rate: {result.monthly_nav_erosion_rate:.4%}")
print(f" monthly_yield_erosion_rate: {result.monthly_yield_erosion_rate:.4%}")
# Calculate portfolio-level metrics
portfolio_nav_risk = sum(r.risk_level for r in results) / len(results)
portfolio_erosion_rate = sum(r.nav_erosion_rate for r in results) / len(results)
analysis = NavErosionAnalysis(
results=results,
portfolio_nav_risk=portfolio_nav_risk,
portfolio_erosion_rate=portfolio_erosion_rate,
risk_summary="Portfolio has moderate NAV erosion risk"
)
print("\nFinal NavErosionAnalysis:")
for r in analysis.results:
print(f" {r.ticker}:")
print(f" monthly_nav_erosion_rate: {r.monthly_nav_erosion_rate:.4%}")
print(f" monthly_yield_erosion_rate: {r.monthly_yield_erosion_rate:.4%}")
print("=== END NAV EROSION SERVICE DEBUG ===\n")
return analysis
def _calculate_nav_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]:
"""Calculate NAV risk components with ETF-type specific adjustments"""
components = {}
# Base risk calculation with ETF-type specific thresholds
if etf_data.get('max_drawdown') is not None:
if etf_type == ETFType.INCOME:
# Income ETFs typically have lower drawdowns
if etf_data['max_drawdown'] > 0.25:
components['drawdown'] = 7
elif etf_data['max_drawdown'] > 0.15:
components['drawdown'] = 5
elif etf_data['max_drawdown'] > 0.10:
components['drawdown'] = 3
else:
components['drawdown'] = 2
elif etf_type == ETFType.GROWTH:
# Growth ETFs typically have higher drawdowns
if etf_data['max_drawdown'] > 0.35:
components['drawdown'] = 7
elif etf_data['max_drawdown'] > 0.25:
components['drawdown'] = 5
elif etf_data['max_drawdown'] > 0.15:
components['drawdown'] = 3
else:
components['drawdown'] = 2
else: # BALANCED
# Balanced ETFs have moderate drawdowns
if etf_data['max_drawdown'] > 0.30:
components['drawdown'] = 7
elif etf_data['max_drawdown'] > 0.20:
components['drawdown'] = 5
elif etf_data['max_drawdown'] > 0.12:
components['drawdown'] = 3
else:
components['drawdown'] = 2
else:
components['drawdown'] = 4 # Default medium risk if no data
# Rest of the method remains unchanged
if etf_data.get('volatility') is not None:
if etf_data['volatility'] > 0.40:
components['volatility'] = 7
elif etf_data['volatility'] > 0.25:
components['volatility'] = 5
elif etf_data['volatility'] > 0.15:
components['volatility'] = 3
else:
components['volatility'] = 2
else:
components['volatility'] = 4
if etf_data.get('sharpe_ratio') is not None:
if etf_data['sharpe_ratio'] >= 2.0:
components['sharpe'] = 1
elif etf_data['sharpe_ratio'] >= 1.5:
components['sharpe'] = 2
elif etf_data['sharpe_ratio'] >= 1.0:
components['sharpe'] = 3
elif etf_data['sharpe_ratio'] >= 0.5:
components['sharpe'] = 4
else:
components['sharpe'] = 5
else:
components['sharpe'] = 4
if etf_data.get('sortino_ratio') is not None:
if etf_data['sortino_ratio'] >= 2.0:
components['sortino'] = 1
elif etf_data['sortino_ratio'] >= 1.5:
components['sortino'] = 2
elif etf_data['sortino_ratio'] >= 1.0:
components['sortino'] = 3
elif etf_data['sortino_ratio'] >= 0.5:
components['sortino'] = 4
else:
components['sortino'] = 5
else:
components['sortino'] = 4
# Calculate weighted NAV risk
nav_risk = sum(
components[component] * weight
for component, weight in self.NAV_COMPONENT_WEIGHTS.items()
)
return nav_risk, components