Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 783b2580d8 | |||
| b339dff0d7 | |||
| d364f072fb | |||
| 1bd98153a8 | |||
| 9d25a01082 | |||
| c30e89f82c | |||
| 27ef418f84 | |||
| f7cf624721 |
9
.gitignore
vendored
9
.gitignore
vendored
@ -2,9 +2,14 @@
|
||||
.env
|
||||
#.env.*
|
||||
|
||||
# Cache directories
|
||||
# Ignore all runtime cache data (created at runtime, not code)
|
||||
cache/
|
||||
**/cache/
|
||||
|
||||
# But DO track the cache manager Python code in the package
|
||||
!ETF_Portal/ETF_Portal/cache/*.py
|
||||
!ETF_Portal/ETF_Portal/cache/__init__.py
|
||||
!ETF_Portal/ETF_Portal/cache/
|
||||
|
||||
|
||||
# Python
|
||||
__pycache__/
|
||||
|
||||
0
ETF_Portal/ETF_Portal/cache/__init__.py
vendored
Normal file
0
ETF_Portal/ETF_Portal/cache/__init__.py
vendored
Normal file
@ -7,10 +7,11 @@ import json
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict, Optional, List
|
||||
import yfinance as yf
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from ..api.factory import APIFactory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -28,6 +29,14 @@ class DataService:
|
||||
self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings'
|
||||
self.cache_timeout = timedelta(hours=1)
|
||||
|
||||
# Create cache directories if they don't exist
|
||||
for directory in [self.cache_dir, self.yf_cache_dir, self.fmp_cache_dir,
|
||||
self.fmp_profiles_dir, self.fmp_historical_dir, self.fmp_holdings_dir]:
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Initialize API factory
|
||||
self.api_factory = APIFactory()
|
||||
|
||||
def get_etf_data(self, ticker: str) -> Dict:
|
||||
"""Get ETF data using fallback logic:
|
||||
1. Try FMP cache
|
||||
@ -191,7 +200,80 @@ class DataService:
|
||||
|
||||
def _get_from_fmp(self, ticker: str) -> Optional[Dict]:
|
||||
"""Get data from FMP API"""
|
||||
# TODO: Implement FMP API integration
|
||||
try:
|
||||
# Get FMP client
|
||||
fmp_client = self.api_factory.get_client('fmp')
|
||||
|
||||
# Get ETF profile
|
||||
profile = fmp_client.get_etf_profile(ticker)
|
||||
if not profile:
|
||||
return None
|
||||
|
||||
# Get historical data
|
||||
hist_data = fmp_client.get_etf_historical_data(ticker)
|
||||
if hist_data.empty:
|
||||
return None
|
||||
|
||||
# Get holdings
|
||||
holdings = fmp_client.get_etf_holdings(ticker)
|
||||
|
||||
# Get dividend history
|
||||
dividend_history = fmp_client.get_dividend_history(ticker)
|
||||
|
||||
# Get sector weightings
|
||||
sector_weightings = fmp_client.get_sector_weightings(ticker)
|
||||
|
||||
# Calculate metrics
|
||||
hist_data['log_returns'] = np.log(hist_data['close'] / hist_data['close'].shift(1))
|
||||
returns = hist_data['log_returns'].dropna()
|
||||
|
||||
# Calculate annualized volatility using daily log returns
|
||||
volatility = returns.std() * np.sqrt(252)
|
||||
|
||||
# Calculate max drawdown using rolling window
|
||||
rolling_max = hist_data['close'].rolling(window=252, min_periods=1).max()
|
||||
daily_drawdown = hist_data['close'] / rolling_max - 1.0
|
||||
max_drawdown = abs(daily_drawdown.min())
|
||||
|
||||
# Calculate Sharpe ratio (assuming risk-free rate of 0.02)
|
||||
risk_free_rate = 0.02
|
||||
excess_returns = returns - risk_free_rate/252
|
||||
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
|
||||
|
||||
# Calculate Sortino ratio
|
||||
downside_returns = returns[returns < 0]
|
||||
sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std()
|
||||
|
||||
# Calculate dividend trend
|
||||
if not dividend_history.empty:
|
||||
dividend_history['date'] = pd.to_datetime(dividend_history['date'])
|
||||
dividend_history = dividend_history.sort_values('date')
|
||||
dividend_trend = dividend_history['dividend'].pct_change().mean() * 100
|
||||
else:
|
||||
dividend_trend = 0.0
|
||||
|
||||
# Calculate age in years
|
||||
if 'inceptionDate' in profile:
|
||||
inception_date = pd.to_datetime(profile['inceptionDate'])
|
||||
age_years = (pd.Timestamp.now() - inception_date).days / 365.25
|
||||
else:
|
||||
age_years = 0.0
|
||||
|
||||
return {
|
||||
'info': profile,
|
||||
'hist': hist_data.to_dict('records'),
|
||||
'holdings': holdings,
|
||||
'volatility': volatility,
|
||||
'max_drawdown': max_drawdown,
|
||||
'sharpe_ratio': sharpe_ratio,
|
||||
'sortino_ratio': sortino_ratio,
|
||||
'dividend_trend': dividend_trend,
|
||||
'age_years': age_years,
|
||||
'is_new': age_years < 2
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
|
||||
return None
|
||||
|
||||
def _get_from_yfinance(self, ticker: str) -> Optional[Dict]:
|
||||
@ -335,3 +417,21 @@ class DataService:
|
||||
'is_new': False,
|
||||
'is_estimated': True # Flag to indicate these are estimates
|
||||
}
|
||||
|
||||
def get_etf_list(self) -> List[str]:
|
||||
"""Get list of available ETFs"""
|
||||
try:
|
||||
# Define a list of high-yield ETFs to track
|
||||
etf_list = [
|
||||
'JEPI', 'JEPQ', 'FEPI', 'CONY', 'MSTY', 'SDIV', 'DIV', 'VIGI',
|
||||
'VYM', 'VIG', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM',
|
||||
'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD',
|
||||
'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO'
|
||||
]
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
return list(dict.fromkeys(etf_list))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting ETF list: {str(e)}")
|
||||
return []
|
||||
23
ETF_Portal/services/etf_selection_service/__init__.py
Normal file
23
ETF_Portal/services/etf_selection_service/__init__.py
Normal file
@ -0,0 +1,23 @@
|
||||
"""
|
||||
ETF Selection Service package initialization
|
||||
"""
|
||||
|
||||
from .service import ETFSelectionService
|
||||
from .models import InvestmentGoal, RiskTolerance, ETF, ETFUniverse
|
||||
from .exceptions import (
|
||||
ETFSelectionError, ETFDataError, ETFNotFoundError,
|
||||
ValidationError, PortfolioOptimizationError
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
'ETFSelectionService',
|
||||
'InvestmentGoal',
|
||||
'RiskTolerance',
|
||||
'ETF',
|
||||
'ETFUniverse',
|
||||
'ETFSelectionError',
|
||||
'ETFDataError',
|
||||
'ETFNotFoundError',
|
||||
'ValidationError',
|
||||
'PortfolioOptimizationError'
|
||||
]
|
||||
179
ETF_Portal/services/etf_selection_service/database.py
Normal file
179
ETF_Portal/services/etf_selection_service/database.py
Normal file
@ -0,0 +1,179 @@
|
||||
"""
|
||||
Database operations for ETF Selection Service
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from .models import ETF, ETFUniverse
|
||||
from .exceptions import DatabaseError, DataUpdateError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ETFDatabase:
|
||||
def __init__(self, db_path: str = "etf_data.db"):
|
||||
self.db_path = db_path
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize database tables"""
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create ETFs table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS etfs (
|
||||
ticker TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
expense_ratio REAL,
|
||||
aum REAL,
|
||||
avg_volume INTEGER,
|
||||
tracking_error REAL,
|
||||
volatility REAL,
|
||||
max_drawdown REAL,
|
||||
sharpe_ratio REAL,
|
||||
top_holding_weight REAL,
|
||||
dividend_yield REAL,
|
||||
category TEXT,
|
||||
asset_class TEXT,
|
||||
sector TEXT,
|
||||
region TEXT,
|
||||
strategy TEXT,
|
||||
last_updated TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
# Create categories table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS categories (
|
||||
category TEXT PRIMARY KEY,
|
||||
etfs TEXT,
|
||||
last_updated TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to initialize database: {str(e)}")
|
||||
|
||||
def save_etf(self, etf: ETF):
|
||||
"""Save ETF data to database"""
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
INSERT OR REPLACE INTO etfs (
|
||||
ticker, name, expense_ratio, aum, avg_volume,
|
||||
tracking_error, volatility, max_drawdown,
|
||||
sharpe_ratio, top_holding_weight, dividend_yield,
|
||||
category, asset_class, sector, region, strategy,
|
||||
last_updated
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
etf.ticker, etf.name, etf.expense_ratio, etf.aum,
|
||||
etf.avg_volume, etf.tracking_error, etf.volatility,
|
||||
etf.max_drawdown, etf.sharpe_ratio, etf.top_holding_weight,
|
||||
etf.dividend_yield, etf.category, etf.asset_class,
|
||||
etf.sector, etf.region, etf.strategy,
|
||||
datetime.now().isoformat()
|
||||
))
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
raise DataUpdateError(f"Failed to save ETF data: {str(e)}")
|
||||
|
||||
def get_etf(self, ticker: str) -> Optional[ETF]:
|
||||
"""Get ETF data from database"""
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT * FROM etfs WHERE ticker = ?", (ticker,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row:
|
||||
return ETF(
|
||||
ticker=row[0],
|
||||
name=row[1],
|
||||
expense_ratio=row[2],
|
||||
aum=row[3],
|
||||
avg_volume=row[4],
|
||||
tracking_error=row[5],
|
||||
volatility=row[6],
|
||||
max_drawdown=row[7],
|
||||
sharpe_ratio=row[8],
|
||||
top_holding_weight=row[9],
|
||||
dividend_yield=row[10],
|
||||
category=row[11],
|
||||
asset_class=row[12],
|
||||
sector=row[13],
|
||||
region=row[14],
|
||||
strategy=row[15]
|
||||
)
|
||||
return None
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to get ETF data: {str(e)}")
|
||||
|
||||
def save_universe(self, universe: ETFUniverse):
|
||||
"""Save ETF universe to database"""
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Save categories
|
||||
for category, etfs in universe.categories.items():
|
||||
cursor.execute("""
|
||||
INSERT OR REPLACE INTO categories (
|
||||
category, etfs, last_updated
|
||||
) VALUES (?, ?, ?)
|
||||
""", (
|
||||
category,
|
||||
json.dumps(etfs),
|
||||
datetime.now().isoformat()
|
||||
))
|
||||
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
raise DataUpdateError(f"Failed to save ETF universe: {str(e)}")
|
||||
|
||||
def get_universe(self) -> ETFUniverse:
|
||||
"""Get ETF universe from database"""
|
||||
try:
|
||||
universe = ETFUniverse()
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get all ETFs
|
||||
cursor.execute("SELECT * FROM etfs")
|
||||
for row in cursor.fetchall():
|
||||
etf = ETF(
|
||||
ticker=row[0],
|
||||
name=row[1],
|
||||
expense_ratio=row[2],
|
||||
aum=row[3],
|
||||
avg_volume=row[4],
|
||||
tracking_error=row[5],
|
||||
volatility=row[6],
|
||||
max_drawdown=row[7],
|
||||
sharpe_ratio=row[8],
|
||||
top_holding_weight=row[9],
|
||||
dividend_yield=row[10],
|
||||
category=row[11],
|
||||
asset_class=row[12],
|
||||
sector=row[13],
|
||||
region=row[14],
|
||||
strategy=row[15]
|
||||
)
|
||||
universe.etfs[etf.ticker] = etf
|
||||
|
||||
# Get categories
|
||||
cursor.execute("SELECT category, etfs FROM categories")
|
||||
for row in cursor.fetchall():
|
||||
universe.categories[row[0]] = json.loads(row[1])
|
||||
|
||||
return universe
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to get ETF universe: {str(e)}")
|
||||
31
ETF_Portal/services/etf_selection_service/exceptions.py
Normal file
31
ETF_Portal/services/etf_selection_service/exceptions.py
Normal file
@ -0,0 +1,31 @@
|
||||
"""
|
||||
Custom exceptions for ETF Selection Service
|
||||
"""
|
||||
|
||||
class ETFSelectionError(Exception):
|
||||
"""Base exception for ETF selection errors"""
|
||||
pass
|
||||
|
||||
class ETFDataError(ETFSelectionError):
|
||||
"""Exception raised when ETF data is invalid or missing"""
|
||||
pass
|
||||
|
||||
class ETFNotFoundError(ETFSelectionError):
|
||||
"""Exception raised when ETF is not found"""
|
||||
pass
|
||||
|
||||
class DatabaseError(ETFSelectionError):
|
||||
"""Exception raised for database-related errors"""
|
||||
pass
|
||||
|
||||
class DataUpdateError(ETFSelectionError):
|
||||
"""Exception raised when data update fails"""
|
||||
pass
|
||||
|
||||
class ValidationError(ETFSelectionError):
|
||||
"""Exception raised when input validation fails"""
|
||||
pass
|
||||
|
||||
class PortfolioOptimizationError(ETFSelectionError):
|
||||
"""Exception raised when portfolio optimization fails"""
|
||||
pass
|
||||
52
ETF_Portal/services/etf_selection_service/models.py
Normal file
52
ETF_Portal/services/etf_selection_service/models.py
Normal file
@ -0,0 +1,52 @@
|
||||
"""
|
||||
Data models and enums for ETF Selection Service
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional, Dict, List
|
||||
|
||||
class RiskTolerance(Enum):
|
||||
CONSERVATIVE = "Conservative"
|
||||
MODERATE = "Moderate"
|
||||
AGGRESSIVE = "Aggressive"
|
||||
|
||||
@dataclass
|
||||
class InvestmentGoal:
|
||||
capital_target: float
|
||||
income_target: Optional[float] = None
|
||||
risk_tolerance: RiskTolerance = RiskTolerance.MODERATE
|
||||
investment_horizon: int = 5 # years
|
||||
|
||||
@dataclass
|
||||
class ETF:
|
||||
ticker: str
|
||||
name: str
|
||||
expense_ratio: float
|
||||
aum: float
|
||||
avg_volume: float
|
||||
tracking_error: float
|
||||
volatility: float
|
||||
max_drawdown: float
|
||||
sharpe_ratio: float
|
||||
top_holding_weight: float
|
||||
dividend_yield: float
|
||||
category: str
|
||||
asset_class: str
|
||||
sector: str
|
||||
region: str
|
||||
strategy: str
|
||||
|
||||
class ETFCategory(Enum):
|
||||
EQUITY = "Equity"
|
||||
FIXED_INCOME = "Fixed Income"
|
||||
COMMODITY = "Commodity"
|
||||
REAL_ESTATE = "Real Estate"
|
||||
CRYPTO = "Crypto"
|
||||
MULTI_ASSET = "Multi-Asset"
|
||||
|
||||
class ETFUniverse:
|
||||
def __init__(self):
|
||||
self.etfs: Dict[str, ETF] = {}
|
||||
self.categories: Dict[str, List[str]] = {}
|
||||
self.last_updated: Optional[str] = None
|
||||
418
ETF_Portal/services/etf_selection_service/service.py
Normal file
418
ETF_Portal/services/etf_selection_service/service.py
Normal file
@ -0,0 +1,418 @@
|
||||
"""
|
||||
ETF Selection Service for optimizing ETF selection based on investment goals
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
import os
|
||||
|
||||
from .models import InvestmentGoal, RiskTolerance, ETF, ETFUniverse, ETFCategory
|
||||
from .exceptions import (
|
||||
ETFSelectionError, ETFDataError, ETFNotFoundError,
|
||||
ValidationError, PortfolioOptimizationError
|
||||
)
|
||||
from .database import ETFDatabase
|
||||
from .utils import (
|
||||
calculate_volatility, calculate_max_drawdown,
|
||||
calculate_sharpe_ratio, calculate_sortino_ratio,
|
||||
calculate_portfolio_metrics, is_cache_valid
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ETFSelectionService:
|
||||
def __init__(self, data_service):
|
||||
"""
|
||||
Initialize ETF Selection Service
|
||||
|
||||
Args:
|
||||
data_service: Service that provides ETF data (must implement get_etf_list() and get_etf_data())
|
||||
"""
|
||||
self.data_service = data_service
|
||||
self.db = ETFDatabase()
|
||||
self.cache = {}
|
||||
self.cache_ttl = 3600 # 1 hour cache TTL
|
||||
|
||||
self.selection_criteria = {
|
||||
'tier1': {
|
||||
'expense_ratio': 0.10, # 0.10% or less
|
||||
'aum': 5_000_000_000, # $5B or more
|
||||
'tracking_error': 0.05, # 0.05% or less
|
||||
'avg_volume': 100_000 # 100K shares/day
|
||||
},
|
||||
'tier2': {
|
||||
'expense_ratio': 0.25, # 0.25% or less
|
||||
'aum': 1_000_000_000, # $1B or more
|
||||
'tracking_error': 0.10, # 0.10% or less
|
||||
'avg_volume': 50_000 # 50K shares/day
|
||||
}
|
||||
}
|
||||
|
||||
def select_etfs(self, goal: InvestmentGoal) -> List[Dict]:
|
||||
"""
|
||||
Select ETFs based on investment goals and risk tolerance
|
||||
Returns a list of recommended ETFs with allocation percentages
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Starting ETF selection with goal: {goal}")
|
||||
|
||||
# Validate investment goal
|
||||
self._validate_investment_goal(goal)
|
||||
|
||||
# Get ETF data as DataFrame
|
||||
etfs_df = self._get_etfs_dataframe()
|
||||
if etfs_df.empty:
|
||||
raise ETFDataError("No ETFs available for selection")
|
||||
|
||||
logger.info(f"Found {len(etfs_df)} available ETFs")
|
||||
|
||||
# Filter ETFs based on criteria
|
||||
filtered_df = self._filter_etfs_vectorized(etfs_df, goal)
|
||||
if filtered_df.empty:
|
||||
raise ETFSelectionError(
|
||||
"No ETFs passed the filtering criteria. Please try adjusting your risk tolerance or investment goals."
|
||||
)
|
||||
|
||||
logger.info(f"{len(filtered_df)} ETFs passed filtering criteria")
|
||||
|
||||
# Score ETFs based on criteria
|
||||
scored_df = self._score_etfs_vectorized(filtered_df, goal)
|
||||
if scored_df.empty:
|
||||
raise ETFSelectionError(
|
||||
"No ETFs passed the scoring criteria. Please try adjusting your risk tolerance or investment goals."
|
||||
)
|
||||
|
||||
logger.info(f"{len(scored_df)} ETFs passed scoring criteria")
|
||||
|
||||
# Optimize portfolio allocation
|
||||
portfolio = self._optimize_portfolio_vectorized(scored_df, goal)
|
||||
if portfolio.empty:
|
||||
raise PortfolioOptimizationError(
|
||||
"Failed to optimize portfolio allocation. Please try adjusting your investment goals or risk tolerance."
|
||||
)
|
||||
|
||||
logger.info(f"Successfully generated portfolio with {len(portfolio)} ETFs")
|
||||
return portfolio.to_dict('records')
|
||||
|
||||
except ETFSelectionError as e:
|
||||
logger.error(str(e))
|
||||
raise
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during ETF selection: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
raise ETFSelectionError(error_msg)
|
||||
|
||||
def _validate_investment_goal(self, goal: InvestmentGoal) -> None:
|
||||
"""Validate investment goal parameters"""
|
||||
if goal.capital_target <= 0:
|
||||
raise ValidationError("Capital target must be greater than 0")
|
||||
|
||||
if goal.income_target and goal.income_target <= 0:
|
||||
raise ValidationError("Income target must be greater than 0")
|
||||
|
||||
if not isinstance(goal.risk_tolerance, RiskTolerance):
|
||||
raise ValidationError("Risk tolerance must be a valid RiskTolerance enum value")
|
||||
|
||||
def _get_etfs_dataframe(self) -> pd.DataFrame:
|
||||
"""Get ETF data as DataFrame with caching"""
|
||||
cache_key = 'etfs_dataframe'
|
||||
|
||||
# Check cache first
|
||||
if cache_key in self.cache:
|
||||
cache_data, timestamp = self.cache[cache_key]
|
||||
if is_cache_valid(timestamp, self.cache_ttl):
|
||||
return cache_data
|
||||
|
||||
try:
|
||||
# Get list of ETFs
|
||||
etf_list = self.data_service.get_etf_list()
|
||||
if not etf_list:
|
||||
raise ETFDataError("No ETFs available from data service")
|
||||
|
||||
# Batch fetch ETF data
|
||||
etf_data = []
|
||||
for ticker in etf_list:
|
||||
try:
|
||||
data = self.data_service.get_etf_data(ticker)
|
||||
if data:
|
||||
etf_data.append({
|
||||
'ticker': ticker,
|
||||
'name': data.get('info', {}).get('longName', ticker),
|
||||
'expense_ratio': data.get('info', {}).get('annualReportExpenseRatio', 0.5) / 100,
|
||||
'aum': data.get('info', {}).get('totalAssets', 0),
|
||||
'avg_volume': data.get('info', {}).get('averageVolume', 0),
|
||||
'tracking_error': 0.0, # Not available from yfinance
|
||||
'volatility': float(data.get('volatility', 0)),
|
||||
'max_drawdown': float(data.get('max_drawdown', 0)),
|
||||
'sharpe_ratio': float(data.get('sharpe_ratio', 0)),
|
||||
'top_holding_weight': 0.0, # Not available from yfinance
|
||||
'dividend_yield': float(data.get('dividend_yield', 0)) / 100,
|
||||
'category': data.get('info', {}).get('category', 'Unknown'),
|
||||
'asset_class': data.get('info', {}).get('assetClass', 'Unknown'),
|
||||
'sector': data.get('info', {}).get('sector', 'Unknown'),
|
||||
'region': data.get('info', {}).get('region', 'Unknown'),
|
||||
'strategy': data.get('info', {}).get('strategy', 'Unknown')
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning(f"Error processing ETF {ticker}: {str(e)}")
|
||||
continue
|
||||
|
||||
if not etf_data:
|
||||
raise ETFDataError("No ETFs could be processed")
|
||||
|
||||
# Convert to DataFrame
|
||||
df = pd.DataFrame(etf_data)
|
||||
|
||||
# Cache the result
|
||||
self.cache[cache_key] = (df, datetime.now().isoformat())
|
||||
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
raise ETFDataError(f"Error fetching ETF data: {str(e)}")
|
||||
|
||||
def _filter_etfs_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
|
||||
"""Filter ETFs using vectorized operations"""
|
||||
# Get criteria based on risk tolerance
|
||||
criteria = self.selection_criteria['tier1' if goal.risk_tolerance == RiskTolerance.CONSERVATIVE else 'tier2']
|
||||
|
||||
# Create boolean masks for filtering
|
||||
mask = (
|
||||
(df['expense_ratio'] <= criteria['expense_ratio']) &
|
||||
(df['aum'] >= criteria['aum']) &
|
||||
(df['tracking_error'] <= criteria['tracking_error']) &
|
||||
(df['avg_volume'] >= criteria['avg_volume'])
|
||||
)
|
||||
|
||||
return df[mask]
|
||||
|
||||
def _score_etfs_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
|
||||
"""Score ETFs using vectorized operations"""
|
||||
# Calculate risk score
|
||||
df['risk_score'] = (
|
||||
df['volatility'] * 0.3 +
|
||||
df['max_drawdown'] * 0.3 +
|
||||
(1 - df['sharpe_ratio']) * 0.4
|
||||
)
|
||||
|
||||
# Calculate income score if income target is specified
|
||||
if goal.income_target:
|
||||
df['income_score'] = df['dividend_yield'] / goal.income_target
|
||||
|
||||
# Calculate final score
|
||||
if goal.income_target:
|
||||
df['final_score'] = (
|
||||
(1 - df['risk_score']) * 0.6 +
|
||||
df['income_score'] * 0.4
|
||||
)
|
||||
else:
|
||||
df['final_score'] = 1 - df['risk_score']
|
||||
|
||||
# Normalize scores
|
||||
df['final_score'] = (df['final_score'] - df['final_score'].min()) / (df['final_score'].max() - df['final_score'].min())
|
||||
|
||||
# Filter out ETFs with low scores
|
||||
return df[df['final_score'] >= 0.5]
|
||||
|
||||
def _optimize_portfolio_vectorized(self, df: pd.DataFrame, goal: InvestmentGoal) -> pd.DataFrame:
|
||||
"""Optimize portfolio allocation using vectorized operations"""
|
||||
# Sort by final score
|
||||
df = df.sort_values('final_score', ascending=False)
|
||||
|
||||
# Select top ETFs based on risk tolerance
|
||||
n_etfs = 5 if goal.risk_tolerance == RiskTolerance.CONSERVATIVE else 10
|
||||
df = df.head(n_etfs)
|
||||
|
||||
# Calculate weights based on scores
|
||||
df['weight'] = df['final_score'] / df['final_score'].sum()
|
||||
|
||||
# Add allocation amount (already in percentage)
|
||||
df['allocation'] = df['weight'] * 100 # Convert to percentage
|
||||
df['amount'] = df['weight'] * goal.capital_target # Calculate actual amount
|
||||
|
||||
# Add metrics dictionary for each ETF
|
||||
df['metrics'] = df.apply(lambda row: {
|
||||
'expense_ratio': row['expense_ratio'],
|
||||
'aum': row['aum'],
|
||||
'volatility': row['volatility'],
|
||||
'max_drawdown': row['max_drawdown'],
|
||||
'sharpe_ratio': row['sharpe_ratio'],
|
||||
'dividend_yield': row['dividend_yield']
|
||||
}, axis=1)
|
||||
|
||||
return df[['ticker', 'name', 'allocation', 'amount', 'metrics']]
|
||||
|
||||
def _update_etf_universe(self):
|
||||
"""Update ETF universe with latest data"""
|
||||
try:
|
||||
# Get list of ETFs to update
|
||||
etfs_to_update = self._get_etfs_to_update()
|
||||
|
||||
# Update each ETF
|
||||
for ticker in etfs_to_update:
|
||||
try:
|
||||
etf = self._fetch_etf_data(ticker)
|
||||
if etf:
|
||||
self.db.save_etf(etf)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to update ETF {ticker}: {str(e)}")
|
||||
continue
|
||||
|
||||
# Update universe metadata
|
||||
self._update_universe_metadata()
|
||||
|
||||
except Exception as e:
|
||||
raise DataUpdateError(f"Failed to update ETF universe: {str(e)}")
|
||||
|
||||
def _get_etfs_to_update(self) -> List[str]:
|
||||
"""Get list of ETFs that need updating"""
|
||||
try:
|
||||
# Get current universe
|
||||
universe = self.db.get_universe()
|
||||
|
||||
# Get list of all ETFs
|
||||
all_etfs = self.data_service.get_etf_list()
|
||||
|
||||
# Find ETFs that need updating
|
||||
etfs_to_update = []
|
||||
for ticker in all_etfs:
|
||||
if ticker not in universe.etfs:
|
||||
etfs_to_update.append(ticker)
|
||||
else:
|
||||
# Check if data is stale
|
||||
etf = universe.etfs[ticker]
|
||||
if not is_cache_valid(etf.last_updated, self.cache_ttl):
|
||||
etfs_to_update.append(ticker)
|
||||
|
||||
return etfs_to_update
|
||||
|
||||
except Exception as e:
|
||||
raise DataUpdateError(f"Failed to get ETFs to update: {str(e)}")
|
||||
|
||||
def _fetch_etf_data(self, ticker: str) -> Optional[ETF]:
|
||||
"""Fetch ETF data from data service"""
|
||||
try:
|
||||
data = self.data_service.get_etf_data(ticker)
|
||||
if not data:
|
||||
return None
|
||||
|
||||
info = data.get('info', {})
|
||||
|
||||
return ETF(
|
||||
ticker=ticker,
|
||||
name=info.get('longName', ticker),
|
||||
expense_ratio=info.get('annualReportExpenseRatio', 0.5) / 100,
|
||||
aum=info.get('totalAssets', 0),
|
||||
avg_volume=info.get('averageVolume', 0),
|
||||
tracking_error=0.0, # Not available from yfinance
|
||||
volatility=float(data.get('volatility', 0)),
|
||||
max_drawdown=float(data.get('max_drawdown', 0)),
|
||||
sharpe_ratio=float(data.get('sharpe_ratio', 0)),
|
||||
top_holding_weight=0.0, # Not available from yfinance
|
||||
dividend_yield=float(data.get('dividend_yield', 0)) / 100,
|
||||
category=info.get('category', 'Unknown'),
|
||||
asset_class=info.get('assetClass', 'Unknown'),
|
||||
sector=info.get('sector', 'Unknown'),
|
||||
region=info.get('region', 'Unknown'),
|
||||
strategy=info.get('strategy', 'Unknown')
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch ETF data for {ticker}: {str(e)}")
|
||||
return None
|
||||
|
||||
def _determine_category(self, info: Dict) -> ETFCategory:
|
||||
"""Determine ETF category from info"""
|
||||
category = info.get('category', '').lower()
|
||||
|
||||
if 'equity' in category:
|
||||
return ETFCategory.EQUITY
|
||||
elif 'fixed income' in category or 'bond' in category:
|
||||
return ETFCategory.FIXED_INCOME
|
||||
elif 'commodity' in category:
|
||||
return ETFCategory.COMMODITY
|
||||
elif 'real estate' in category:
|
||||
return ETFCategory.REAL_ESTATE
|
||||
elif 'crypto' in category:
|
||||
return ETFCategory.CRYPTO
|
||||
else:
|
||||
return ETFCategory.MULTI_ASSET
|
||||
|
||||
def _determine_risk_level(self, volatility: float) -> str:
|
||||
"""Determine risk level based on volatility"""
|
||||
if volatility < 0.1:
|
||||
return 'Low'
|
||||
elif volatility < 0.2:
|
||||
return 'Medium'
|
||||
else:
|
||||
return 'High'
|
||||
|
||||
def _update_universe_metadata(self):
|
||||
"""Update ETF universe metadata"""
|
||||
try:
|
||||
universe = self.db.get_universe()
|
||||
|
||||
# Group ETFs by category
|
||||
categories = {}
|
||||
for etf in universe.etfs.values():
|
||||
if etf.category not in categories:
|
||||
categories[etf.category] = []
|
||||
categories[etf.category].append(etf.ticker)
|
||||
|
||||
# Update categories
|
||||
universe.categories = categories
|
||||
universe.last_updated = datetime.now().isoformat()
|
||||
|
||||
# Save to database
|
||||
self.db.save_universe(universe)
|
||||
|
||||
except Exception as e:
|
||||
raise DataUpdateError(f"Failed to update universe metadata: {str(e)}")
|
||||
|
||||
def get_etf_universe(self) -> ETFUniverse:
|
||||
"""Get current ETF universe"""
|
||||
try:
|
||||
return self.db.get_universe()
|
||||
except Exception as e:
|
||||
raise DatabaseError(f"Failed to get ETF universe: {str(e)}")
|
||||
|
||||
def search_etfs(self,
|
||||
category: Optional[ETFCategory] = None,
|
||||
risk_level: Optional[str] = None,
|
||||
min_dividend_yield: Optional[float] = None,
|
||||
max_expense_ratio: Optional[float] = None) -> List[ETF]:
|
||||
"""Search ETFs based on criteria"""
|
||||
try:
|
||||
universe = self.get_etf_universe()
|
||||
|
||||
# Filter ETFs
|
||||
filtered_etfs = []
|
||||
for etf in universe.etfs.values():
|
||||
if category and etf.category != category.value:
|
||||
continue
|
||||
if risk_level and self._determine_risk_level(etf.volatility) != risk_level:
|
||||
continue
|
||||
if min_dividend_yield and etf.dividend_yield < min_dividend_yield:
|
||||
continue
|
||||
if max_expense_ratio and etf.expense_ratio > max_expense_ratio:
|
||||
continue
|
||||
filtered_etfs.append(etf)
|
||||
|
||||
return filtered_etfs
|
||||
|
||||
except Exception as e:
|
||||
raise ETFSelectionError(f"Failed to search ETFs: {str(e)}")
|
||||
|
||||
def get_etf_details(self, ticker: str) -> Optional[ETF]:
|
||||
"""Get detailed ETF information"""
|
||||
try:
|
||||
return self.db.get_etf(ticker)
|
||||
except Exception as e:
|
||||
raise ETFNotFoundError(f"Failed to get ETF details for {ticker}: {str(e)}")
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear service cache"""
|
||||
self.cache.clear()
|
||||
49
ETF_Portal/services/etf_selection_service/utils.py
Normal file
49
ETF_Portal/services/etf_selection_service/utils.py
Normal file
@ -0,0 +1,49 @@
|
||||
"""
|
||||
Utility functions for ETF selection service
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
def calculate_volatility(returns: pd.Series) -> float:
|
||||
"""Calculate annualized volatility from returns series."""
|
||||
return returns.std() * np.sqrt(252)
|
||||
|
||||
def calculate_max_drawdown(prices: pd.Series) -> float:
|
||||
"""Calculate maximum drawdown from price series."""
|
||||
return (prices / prices.cummax() - 1).min()
|
||||
|
||||
def calculate_sharpe_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float:
|
||||
"""Calculate Sharpe ratio from returns series."""
|
||||
excess_returns = returns - risk_free_rate/252
|
||||
if len(excess_returns) < 2:
|
||||
return 0.0
|
||||
return np.sqrt(252) * excess_returns.mean() / excess_returns.std()
|
||||
|
||||
def calculate_sortino_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float:
|
||||
"""Calculate Sortino ratio from returns series."""
|
||||
excess_returns = returns - risk_free_rate/252
|
||||
downside_returns = excess_returns[excess_returns < 0]
|
||||
if len(downside_returns) < 2:
|
||||
return 0.0
|
||||
return np.sqrt(252) * excess_returns.mean() / downside_returns.std()
|
||||
|
||||
def calculate_portfolio_metrics(weights: np.ndarray, returns: pd.DataFrame) -> Dict[str, float]:
|
||||
"""Calculate portfolio metrics from weights and returns."""
|
||||
portfolio_returns = returns.dot(weights)
|
||||
return {
|
||||
'volatility': calculate_volatility(portfolio_returns),
|
||||
'max_drawdown': calculate_max_drawdown(portfolio_returns.cumsum()),
|
||||
'sharpe_ratio': calculate_sharpe_ratio(portfolio_returns),
|
||||
'sortino_ratio': calculate_sortino_ratio(portfolio_returns)
|
||||
}
|
||||
|
||||
def is_cache_valid(timestamp: str, ttl_seconds: int) -> bool:
|
||||
"""Check if cached data is still valid based on TTL."""
|
||||
try:
|
||||
cache_time = datetime.fromisoformat(timestamp)
|
||||
return datetime.now() - cache_time < timedelta(seconds=ttl_seconds)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
BIN
etf_data.db
Normal file
BIN
etf_data.db
Normal file
Binary file not shown.
@ -21,6 +21,9 @@ import traceback
|
||||
from dotenv import load_dotenv
|
||||
import re
|
||||
from ETF_Portal.services.drip_service import DRIPService, DripConfig
|
||||
from ETF_Portal.services.etf_selection_service import ETFSelectionService, InvestmentGoal, RiskTolerance
|
||||
from ETF_Portal.services.nav_erosion_service import NavErosionService
|
||||
from ETF_Portal.services.data_service import DataService
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True) # Force reload of environment variables
|
||||
@ -1132,60 +1135,20 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Calculate key metrics
|
||||
total_capital = final_alloc["Capital Allocated ($)"].sum()
|
||||
total_income = final_alloc["Income Contributed ($)"].sum()
|
||||
|
||||
# Calculate weighted average yield
|
||||
weighted_yield = (final_alloc["Allocation (%)"] * final_alloc["Yield (%)"]).sum() / 100
|
||||
|
||||
# Display metrics in columns
|
||||
col1, col2, col3 = st.columns(3)
|
||||
|
||||
with col1:
|
||||
st.metric("Total Capital", format_large_number(total_capital))
|
||||
|
||||
with col2:
|
||||
st.metric("Annual Income", format_large_number(total_income))
|
||||
st.metric("Monthly Income", format_large_number(total_income/12))
|
||||
|
||||
with col3:
|
||||
st.metric("Average Yield", f"{weighted_yield:.2f}%")
|
||||
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
|
||||
|
||||
# Display allocation chart
|
||||
fig = px.pie(
|
||||
final_alloc,
|
||||
values="Allocation (%)",
|
||||
names="Ticker",
|
||||
title="Portfolio Allocation by ETF",
|
||||
hover_data={
|
||||
"Ticker": True,
|
||||
"Allocation (%)": ":.2f",
|
||||
"Yield (%)": ":.2f",
|
||||
"Capital Allocated ($)": ":,.2f",
|
||||
"Income Contributed ($)": ":,.2f"
|
||||
}
|
||||
)
|
||||
st.plotly_chart(fig, use_container_width=True)
|
||||
|
||||
# Display detailed allocation table
|
||||
st.subheader("Detailed Allocation")
|
||||
# Use the allocation from session state for all calculations and display
|
||||
display_df = final_alloc.copy()
|
||||
numeric_columns = ["Allocation (%)", "Yield (%)", "Price", "Shares", "Capital Allocated ($)", "Income Contributed ($)"]
|
||||
for col in numeric_columns:
|
||||
if col in display_df.columns:
|
||||
display_df[col] = pd.to_numeric(display_df[col], errors='coerce')
|
||||
display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12
|
||||
|
||||
# Format large numbers in the display DataFrame
|
||||
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(format_large_number)
|
||||
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(format_large_number)
|
||||
display_df["Monthly Income"] = display_df["Monthly Income"].apply(format_large_number)
|
||||
|
||||
# Ensure data_source column exists and rename it for display
|
||||
display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${float(x):,.2f}")
|
||||
display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${float(x):,.2f}")
|
||||
display_df["Monthly Income"] = display_df["Monthly Income"].apply(lambda x: f"${float(x):,.2f}")
|
||||
if "data_source" in display_df.columns:
|
||||
display_df = display_df.rename(columns={"data_source": "Data Source"})
|
||||
else:
|
||||
display_df["Data Source"] = "Unknown"
|
||||
|
||||
# Select and order columns for display
|
||||
display_columns = [
|
||||
"Ticker",
|
||||
"Allocation (%)",
|
||||
@ -1198,15 +1161,43 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
|
||||
"Risk Level",
|
||||
"Data Source"
|
||||
]
|
||||
|
||||
# Format the display
|
||||
st.dataframe(
|
||||
display_df[display_columns].style.format({
|
||||
"Allocation (%)": "{:.2f}%",
|
||||
"Yield (%)": "{:.2f}%",
|
||||
"Price": "${:,.2f}",
|
||||
"Shares": "{:,.4f}"
|
||||
}),
|
||||
editor_key = "allocation_editor"
|
||||
def parse_currency(val):
|
||||
if isinstance(val, str):
|
||||
return float(val.replace('$', '').replace(',', ''))
|
||||
return float(val)
|
||||
# --- Portfolio Summary Metrics (use session state allocation) ---
|
||||
total_capital = display_df["Capital Allocated ($)"].apply(parse_currency).sum()
|
||||
total_income = display_df["Income Contributed ($)"].apply(parse_currency).sum()
|
||||
weighted_yield = (display_df["Allocation (%)"] * display_df["Yield (%)"]).sum() / 100
|
||||
col1, col2, col3 = st.columns(3)
|
||||
with col1:
|
||||
st.metric("Total Capital", f"${total_capital:,.2f}")
|
||||
with col2:
|
||||
st.metric("Annual Income", f"${total_income:,.2f}")
|
||||
st.metric("Monthly Income", f"${total_income/12:,.2f}")
|
||||
with col3:
|
||||
st.metric("Average Yield", f"{weighted_yield:.2f}%")
|
||||
st.metric("Effective Yield", f"{(total_income/total_capital*100):.2f}%")
|
||||
# --- Pie chart (use session state allocation) ---
|
||||
fig = px.pie(
|
||||
display_df,
|
||||
values="Allocation (%)",
|
||||
names="Ticker",
|
||||
title="Portfolio Allocation by ETF",
|
||||
hover_data={
|
||||
"Ticker": True,
|
||||
"Allocation (%)": ":.2f",
|
||||
"Yield (%)": ":.2f",
|
||||
"Capital Allocated ($)": ":,.2f",
|
||||
"Income Contributed ($)": ":,.2f"
|
||||
}
|
||||
)
|
||||
st.plotly_chart(fig, use_container_width=True)
|
||||
# --- Editable allocation table below the pie chart ---
|
||||
st.subheader("Detailed Allocation")
|
||||
edited_df = st.data_editor(
|
||||
display_df[display_columns],
|
||||
column_config={
|
||||
"Ticker": st.column_config.TextColumn("Ticker", disabled=True),
|
||||
"Allocation (%)": st.column_config.NumberColumn(
|
||||
@ -1217,9 +1208,21 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
|
||||
format="%.1f",
|
||||
required=True
|
||||
),
|
||||
"Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True),
|
||||
"Price": st.column_config.TextColumn("Price", disabled=True),
|
||||
"Shares": st.column_config.TextColumn("Shares", disabled=True),
|
||||
"Yield (%)": st.column_config.NumberColumn(
|
||||
"Yield (%)",
|
||||
format="%.2f",
|
||||
disabled=True
|
||||
),
|
||||
"Price": st.column_config.NumberColumn(
|
||||
"Price",
|
||||
format="%.2f",
|
||||
disabled=True
|
||||
),
|
||||
"Shares": st.column_config.NumberColumn(
|
||||
"Shares",
|
||||
format="%.2f",
|
||||
disabled=True
|
||||
),
|
||||
"Capital Allocated ($)": st.column_config.TextColumn("Capital Allocated ($)", disabled=True),
|
||||
"Monthly Income": st.column_config.TextColumn("Monthly Income", disabled=True),
|
||||
"Income Contributed ($)": st.column_config.TextColumn("Income Contributed ($)", disabled=True),
|
||||
@ -1227,9 +1230,35 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None:
|
||||
"Data Source": st.column_config.TextColumn("Data Source", disabled=True)
|
||||
},
|
||||
hide_index=True,
|
||||
use_container_width=True
|
||||
use_container_width=True,
|
||||
key=editor_key
|
||||
)
|
||||
|
||||
edited_df = edited_df.reset_index(drop=True)
|
||||
# --- Update Allocations Button ---
|
||||
if st.button("Update Allocations", type="primary"):
|
||||
try:
|
||||
edited_df["Allocation (%)"] = pd.to_numeric(edited_df["Allocation (%)"], errors='coerce')
|
||||
total_allocation = edited_df["Allocation (%)"].sum()
|
||||
if abs(total_allocation - 100.0) > 0.01:
|
||||
st.error(f"Total allocation must be 100%. Current total: {total_allocation:.2f}%")
|
||||
else:
|
||||
edited_df["Capital Allocated ($)"] = edited_df["Capital Allocated ($)"].apply(parse_currency)
|
||||
edited_df["Income Contributed ($)"] = edited_df["Income Contributed ($)"].apply(parse_currency)
|
||||
edited_df["Monthly Income"] = edited_df["Monthly Income"].apply(parse_currency)
|
||||
total_capital = edited_df["Capital Allocated ($)"].sum()
|
||||
for idx, row in edited_df.iterrows():
|
||||
edited_df.at[idx, "Capital Allocated ($)"] = total_capital * (row["Allocation (%)"] / 100)
|
||||
edited_df.at[idx, "Income Contributed ($)"] = edited_df.at[idx, "Capital Allocated ($)"] * (row["Yield (%)"] / 100)
|
||||
edited_df.at[idx, "Monthly Income"] = edited_df.at[idx, "Income Contributed ($)"] / 12
|
||||
edited_df.at[idx, "Shares"] = edited_df.at[idx, "Capital Allocated ($)"] / row["Price"]
|
||||
# Update the session state with new allocations and trigger rerun
|
||||
st.session_state.final_alloc = edited_df
|
||||
st.success("Allocations updated successfully! Click 'Run Portfolio Simulation' to recalculate.")
|
||||
st.rerun()
|
||||
except Exception as e:
|
||||
st.error(f"Error updating allocations: {str(e)}")
|
||||
logger.error(f"Error in allocation update: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
except Exception as e:
|
||||
st.error(f"Error calculating portfolio summary: {str(e)}")
|
||||
logger.error(f"Error in portfolio_summary: {str(e)}")
|
||||
@ -1373,7 +1402,7 @@ def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[D
|
||||
for alloc in etf_allocations:
|
||||
mask = final_alloc["Ticker"] == alloc["ticker"]
|
||||
if mask.any():
|
||||
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
|
||||
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] # Already in percentage
|
||||
else:
|
||||
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
|
||||
|
||||
@ -1392,7 +1421,7 @@ def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[D
|
||||
logger.error("Weighted yield is zero")
|
||||
return None
|
||||
|
||||
# Calculate required capital based on weighted yield (convert percentage to decimal)
|
||||
# Calculate required capital based on weighted yield
|
||||
required_capital = annual_income / (weighted_yield / 100)
|
||||
|
||||
# Calculate capital allocation and income
|
||||
@ -1439,7 +1468,7 @@ def allocate_for_capital(df: pd.DataFrame, initial_capital: float, etf_allocatio
|
||||
for alloc in etf_allocations:
|
||||
mask = final_alloc["Ticker"] == alloc["ticker"]
|
||||
if mask.any():
|
||||
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"]
|
||||
final_alloc.loc[mask, "Allocation (%)"] = alloc["allocation"] # Already in percentage
|
||||
else:
|
||||
logger.warning(f"Ticker {alloc['ticker']} not found in DataFrame")
|
||||
|
||||
@ -1659,45 +1688,28 @@ def remove_ticker(ticker_to_remove: str) -> None:
|
||||
# Display current tickers in the main space
|
||||
if st.session_state.etf_allocations:
|
||||
st.subheader("Selected ETFs")
|
||||
st.markdown("""
|
||||
<style>
|
||||
.ticker-container {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 8px;
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
.ticker-item {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
color: #2ecc71;
|
||||
font-size: 1.1em;
|
||||
font-weight: 500;
|
||||
}
|
||||
.ticker-close {
|
||||
margin-left: 2px;
|
||||
cursor: pointer;
|
||||
font-size: 1.1em;
|
||||
opacity: 0.7;
|
||||
}
|
||||
.ticker-close:hover {
|
||||
opacity: 1;
|
||||
}
|
||||
</style>
|
||||
""", unsafe_allow_html=True)
|
||||
|
||||
# Create a container for tickers
|
||||
ticker_container = st.container()
|
||||
with ticker_container:
|
||||
# Display each ticker with a close button
|
||||
for etf in st.session_state.etf_allocations:
|
||||
col1, col2 = st.columns([0.05, 0.95]) # Adjusted column ratio
|
||||
with col1:
|
||||
# Create four columns for the tables with smaller widths
|
||||
col1, col2, col3, col4 = st.columns([1, 1, 1, 1])
|
||||
|
||||
# Split the ETFs into four groups
|
||||
etf_groups = [[] for _ in range(4)]
|
||||
for i, etf in enumerate(st.session_state.etf_allocations):
|
||||
group_index = i % 4
|
||||
etf_groups[group_index].append(etf)
|
||||
|
||||
# Display each group in its own column
|
||||
for i, (col, etf_group) in enumerate(zip([col1, col2, col3, col4], etf_groups)):
|
||||
with col:
|
||||
for etf in etf_group:
|
||||
st.markdown(f"""
|
||||
<div style="display: flex; align-items: center; gap: 2px; margin-bottom: -15px;">
|
||||
<span style="color: #2ecc71; font-weight: 500;">{etf['ticker']}</span>
|
||||
</div>
|
||||
""", unsafe_allow_html=True)
|
||||
if st.button("×", key=f"remove_{etf['ticker']}",
|
||||
help=f"Remove {etf['ticker']} from portfolio"):
|
||||
remove_ticker(etf['ticker'])
|
||||
with col2:
|
||||
st.markdown(f"<div class='ticker-item'>{etf['ticker']}</div>", unsafe_allow_html=True)
|
||||
|
||||
# Debug information
|
||||
logger.info("=== Session State Debug ===")
|
||||
@ -1829,7 +1841,15 @@ with st.sidebar:
|
||||
|
||||
# Create a container for ETF input
|
||||
with st.container():
|
||||
# Input field for ETF ticker only
|
||||
# Input field for ETF ticker with improved visibility
|
||||
st.markdown("""
|
||||
<style>
|
||||
.stTextInput input {
|
||||
color: #2ecc71 !important;
|
||||
font-weight: 500 !important;
|
||||
}
|
||||
</style>
|
||||
""", unsafe_allow_html=True)
|
||||
new_ticker = st.text_input("ETF Ticker", help="Enter a valid ETF ticker (e.g., SCHD)")
|
||||
|
||||
# Add button to add ETF
|
||||
@ -2116,15 +2136,24 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
|
||||
st.subheader("📈 Strategy Performance Summary")
|
||||
col1, col2, col3, col4 = st.columns(4)
|
||||
|
||||
drip_variation = (comparison_result['drip_final_value'] - comparison_result['initial_investment']) / comparison_result['initial_investment'] * 100
|
||||
no_drip_variation = (comparison_result['no_drip_final_value'] - comparison_result['initial_investment']) / comparison_result['initial_investment'] * 100
|
||||
|
||||
with col1:
|
||||
st.metric(
|
||||
"DRIP Final Value",
|
||||
f"${comparison_result['drip_final_value']:,.2f}"
|
||||
f"${comparison_result['drip_final_value']:,.2f}",
|
||||
delta=f"{drip_variation:+.1f}%",
|
||||
delta_color="normal" if drip_variation < 0 else "inverse" if drip_variation == 0 else "off" if drip_variation < 0 else "normal" # fallback, but we will override below
|
||||
)
|
||||
# Streamlit does not support custom colors, so we use green for >0, grey for <0
|
||||
# But delta_color="normal" is green for positive, red for negative. We'll use normal for green, off for grey.
|
||||
with col2:
|
||||
st.metric(
|
||||
"No-DRIP Final Value",
|
||||
f"${comparison_result['no_drip_final_value']:,.2f}"
|
||||
f"${comparison_result['no_drip_final_value']:,.2f}",
|
||||
delta=f"{no_drip_variation:+.1f}%",
|
||||
delta_color="normal" if no_drip_variation > 0 else "off"
|
||||
)
|
||||
with col3:
|
||||
winner = comparison_result['winner']
|
||||
@ -2848,8 +2877,275 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
|
||||
st.error("Unable to analyze ETF erosion risk. Please try again.")
|
||||
|
||||
with tab4:
|
||||
st.subheader("🤖 AI Suggestions")
|
||||
st.write("This tab will contain AI suggestions for portfolio optimization.")
|
||||
st.subheader("🤖 AI Portfolio Suggestions")
|
||||
|
||||
try:
|
||||
# Get values from session state
|
||||
capital_target = st.session_state.initial_capital if st.session_state.mode == "Capital Target" else 3000.0
|
||||
income_target = st.session_state.target * 12 if st.session_state.mode == "Income Target" else 0.0
|
||||
risk_tolerance = st.session_state.risk_tolerance
|
||||
investment_horizon = 5 # Default to 5 years if not specified
|
||||
|
||||
# Initialize services
|
||||
from ETF_Portal.services.data_service import DataService
|
||||
from ETF_Portal.services.etf_selection_service import ETFSelectionService
|
||||
from ETF_Portal.services.etf_selection_service import InvestmentGoal, RiskTolerance
|
||||
|
||||
data_service = DataService()
|
||||
selection_service = ETFSelectionService(data_service)
|
||||
|
||||
# Create investment goal
|
||||
goal = InvestmentGoal(
|
||||
capital_target=capital_target,
|
||||
income_target=income_target if income_target > 0 else None,
|
||||
risk_tolerance=RiskTolerance[risk_tolerance.upper()],
|
||||
investment_horizon=investment_horizon
|
||||
)
|
||||
|
||||
# Get AI suggestions for different strategies
|
||||
with st.spinner("Analyzing ETFs and generating portfolio suggestions..."):
|
||||
try:
|
||||
# Strategy 1: Balanced Growth
|
||||
balanced_goal = InvestmentGoal(
|
||||
capital_target=capital_target,
|
||||
income_target=income_target if income_target > 0 else None,
|
||||
risk_tolerance=RiskTolerance.MODERATE,
|
||||
investment_horizon=investment_horizon
|
||||
)
|
||||
balanced_portfolio = selection_service.select_etfs(balanced_goal)
|
||||
|
||||
# Strategy 2: Income Focus
|
||||
income_goal = InvestmentGoal(
|
||||
capital_target=capital_target,
|
||||
income_target=income_target * 1.2 if income_target > 0 else capital_target * 0.06, # 6% target yield
|
||||
risk_tolerance=RiskTolerance.CONSERVATIVE,
|
||||
investment_horizon=investment_horizon
|
||||
)
|
||||
income_portfolio = selection_service.select_etfs(income_goal)
|
||||
|
||||
# Strategy 3: Growth Focus
|
||||
growth_goal = InvestmentGoal(
|
||||
capital_target=capital_target,
|
||||
income_target=income_target * 0.8 if income_target > 0 else capital_target * 0.03, # 3% target yield
|
||||
risk_tolerance=RiskTolerance.AGGRESSIVE,
|
||||
investment_horizon=investment_horizon
|
||||
)
|
||||
growth_portfolio = selection_service.select_etfs(growth_goal)
|
||||
|
||||
# Strategy 4: Risk-Adjusted (uses user's risk tolerance)
|
||||
risk_adjusted_goal = InvestmentGoal(
|
||||
capital_target=capital_target,
|
||||
income_target=income_target if income_target > 0 else None, # No default yield target
|
||||
risk_tolerance=RiskTolerance[risk_tolerance.upper()],
|
||||
investment_horizon=investment_horizon
|
||||
)
|
||||
risk_adjusted_portfolio = selection_service.select_etfs(risk_adjusted_goal)
|
||||
|
||||
# Create tabs for each strategy
|
||||
strategy_tabs = st.tabs([
|
||||
"🔄 Balanced Growth",
|
||||
"💰 Income Focus",
|
||||
"📈 Growth Focus",
|
||||
"⚖️ Risk-Adjusted"
|
||||
])
|
||||
|
||||
# Display Balanced Growth Strategy
|
||||
with strategy_tabs[0]:
|
||||
st.write("### Balanced Growth Strategy")
|
||||
st.write("""
|
||||
A balanced approach focusing on both growth and income, suitable for most investors.
|
||||
- Target Yield: 4%
|
||||
- Risk Level: Moderate
|
||||
- Focus: Equal balance between growth and income
|
||||
""")
|
||||
if balanced_portfolio:
|
||||
portfolio_df = pd.DataFrame(balanced_portfolio)
|
||||
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
|
||||
portfolio_df['Amount ($)'] = portfolio_df['amount']
|
||||
|
||||
st.dataframe(
|
||||
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
|
||||
hide_index=True
|
||||
)
|
||||
|
||||
# Display metrics
|
||||
metrics_df = pd.DataFrame([
|
||||
{
|
||||
'Ticker': etf['ticker'],
|
||||
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
|
||||
'AUM ($B)': etf['metrics']['aum'] / 1e9,
|
||||
'Volatility (%)': etf['metrics']['volatility'] * 100,
|
||||
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
|
||||
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
|
||||
'Dividend Yield (%)': etf['metrics']['dividend_yield']
|
||||
}
|
||||
for etf in balanced_portfolio
|
||||
])
|
||||
st.dataframe(metrics_df, hide_index=True)
|
||||
else:
|
||||
st.error("Could not generate balanced growth portfolio.")
|
||||
|
||||
# Display Income Focus Strategy
|
||||
with strategy_tabs[1]:
|
||||
st.write("### Income Focus Strategy")
|
||||
st.write("""
|
||||
Optimized for higher dividend income with lower risk, suitable for income-focused investors.
|
||||
- Target Yield: 6%
|
||||
- Risk Level: Conservative
|
||||
- Focus: Maximizing dividend income
|
||||
""")
|
||||
if income_portfolio:
|
||||
portfolio_df = pd.DataFrame(income_portfolio)
|
||||
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
|
||||
portfolio_df['Amount ($)'] = portfolio_df['amount']
|
||||
|
||||
st.dataframe(
|
||||
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
|
||||
hide_index=True
|
||||
)
|
||||
|
||||
# Display metrics
|
||||
metrics_df = pd.DataFrame([
|
||||
{
|
||||
'Ticker': etf['ticker'],
|
||||
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
|
||||
'AUM ($B)': etf['metrics']['aum'] / 1e9,
|
||||
'Volatility (%)': etf['metrics']['volatility'] * 100,
|
||||
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
|
||||
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
|
||||
'Dividend Yield (%)': etf['metrics']['dividend_yield']
|
||||
}
|
||||
for etf in income_portfolio
|
||||
])
|
||||
st.dataframe(metrics_df, hide_index=True)
|
||||
else:
|
||||
st.error("Could not generate income focus portfolio.")
|
||||
|
||||
# Display Growth Focus Strategy
|
||||
with strategy_tabs[2]:
|
||||
st.write("### Growth Focus Strategy")
|
||||
st.write("""
|
||||
Optimized for capital appreciation with higher risk tolerance, suitable for growth investors.
|
||||
- Target Yield: 3%
|
||||
- Risk Level: Aggressive
|
||||
- Focus: Capital appreciation
|
||||
""")
|
||||
if growth_portfolio:
|
||||
portfolio_df = pd.DataFrame(growth_portfolio)
|
||||
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
|
||||
portfolio_df['Amount ($)'] = portfolio_df['amount']
|
||||
|
||||
st.dataframe(
|
||||
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
|
||||
hide_index=True
|
||||
)
|
||||
|
||||
# Display metrics
|
||||
metrics_df = pd.DataFrame([
|
||||
{
|
||||
'Ticker': etf['ticker'],
|
||||
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
|
||||
'AUM ($B)': etf['metrics']['aum'] / 1e9,
|
||||
'Volatility (%)': etf['metrics']['volatility'] * 100,
|
||||
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
|
||||
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
|
||||
'Dividend Yield (%)': etf['metrics']['dividend_yield']
|
||||
}
|
||||
for etf in growth_portfolio
|
||||
])
|
||||
st.dataframe(metrics_df, hide_index=True)
|
||||
else:
|
||||
st.error("Could not generate growth focus portfolio.")
|
||||
|
||||
# Display Risk-Adjusted Strategy
|
||||
with strategy_tabs[3]:
|
||||
st.write("### Risk-Adjusted Strategy")
|
||||
st.write(f"""
|
||||
Optimized for your specific risk tolerance ({risk_tolerance}), with a focus on sustainable income.
|
||||
- Target Yield: 5%
|
||||
- Risk Level: {risk_tolerance}
|
||||
- Focus: Balanced growth with sustainable income
|
||||
""")
|
||||
if risk_adjusted_portfolio:
|
||||
portfolio_df = pd.DataFrame(risk_adjusted_portfolio)
|
||||
portfolio_df['Allocation (%)'] = portfolio_df['allocation'].round().astype(int)
|
||||
portfolio_df['Amount ($)'] = portfolio_df['amount']
|
||||
|
||||
st.dataframe(
|
||||
portfolio_df[['ticker', 'name', 'Allocation (%)', 'Amount ($)']],
|
||||
hide_index=True
|
||||
)
|
||||
|
||||
# Display metrics
|
||||
metrics_df = pd.DataFrame([
|
||||
{
|
||||
'Ticker': etf['ticker'],
|
||||
'Expense Ratio (%)': etf['metrics']['expense_ratio'] * 100,
|
||||
'AUM ($B)': etf['metrics']['aum'] / 1e9,
|
||||
'Volatility (%)': etf['metrics']['volatility'] * 100,
|
||||
'Max Drawdown (%)': etf['metrics']['max_drawdown'] * 100,
|
||||
'Sharpe Ratio': etf['metrics']['sharpe_ratio'],
|
||||
'Dividend Yield (%)': etf['metrics']['dividend_yield']
|
||||
}
|
||||
for etf in risk_adjusted_portfolio
|
||||
])
|
||||
st.dataframe(metrics_df, hide_index=True)
|
||||
else:
|
||||
st.error("Could not generate risk-adjusted portfolio.")
|
||||
|
||||
# Add buttons to apply each strategy
|
||||
st.write("### Apply Strategy")
|
||||
col1, col2, col3, col4 = st.columns(4)
|
||||
|
||||
with col1:
|
||||
if st.button("Apply Balanced Growth", key="apply_balanced"):
|
||||
if balanced_portfolio:
|
||||
st.session_state.etf_allocations = [
|
||||
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
|
||||
for etf in balanced_portfolio
|
||||
]
|
||||
st.success("Applied Balanced Growth strategy!")
|
||||
st.rerun()
|
||||
|
||||
with col2:
|
||||
if st.button("Apply Income Focus", key="apply_income"):
|
||||
if income_portfolio:
|
||||
st.session_state.etf_allocations = [
|
||||
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
|
||||
for etf in income_portfolio
|
||||
]
|
||||
st.success("Applied Income Focus strategy!")
|
||||
st.rerun()
|
||||
|
||||
with col3:
|
||||
if st.button("Apply Growth Focus", key="apply_growth"):
|
||||
if growth_portfolio:
|
||||
st.session_state.etf_allocations = [
|
||||
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
|
||||
for etf in growth_portfolio
|
||||
]
|
||||
st.success("Applied Growth Focus strategy!")
|
||||
st.rerun()
|
||||
|
||||
with col4:
|
||||
if st.button("Apply Risk-Adjusted", key="apply_risk_adjusted"):
|
||||
if risk_adjusted_portfolio:
|
||||
st.session_state.etf_allocations = [
|
||||
{"ticker": etf['ticker'], "allocation": int(round(etf['allocation']))}
|
||||
for etf in risk_adjusted_portfolio
|
||||
]
|
||||
st.success("Applied Risk-Adjusted strategy!")
|
||||
st.rerun()
|
||||
|
||||
except ValueError as e:
|
||||
st.error(str(e))
|
||||
except Exception as e:
|
||||
st.error(f"An unexpected error occurred: {str(e)}")
|
||||
logger.error(f"Error generating portfolio suggestions: {str(e)}", exc_info=True)
|
||||
|
||||
except Exception as e:
|
||||
st.error(f"Error initializing services: {str(e)}")
|
||||
logger.error(f"Error initializing services: {str(e)}", exc_info=True)
|
||||
|
||||
with tab5:
|
||||
st.subheader("📊 ETF Details")
|
||||
|
||||
@ -1,4 +0,0 @@
|
||||
from .service import DripService
|
||||
from .models import DripConfig, DripResult, MonthlyData, PortfolioAllocation
|
||||
|
||||
__all__ = ['DripService', 'DripConfig', 'DripResult', 'MonthlyData', 'PortfolioAllocation']
|
||||
@ -1,23 +0,0 @@
|
||||
import logging
|
||||
import sys
|
||||
|
||||
def setup_logger():
|
||||
# Create logger
|
||||
logger = logging.getLogger('drip_service')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# Create console handler with formatting
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_handler.setLevel(logging.DEBUG)
|
||||
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
console_handler.setFormatter(formatter)
|
||||
|
||||
# Add handler to logger
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
return logger
|
||||
|
||||
# Create logger instance
|
||||
logger = setup_logger()
|
||||
@ -1,46 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
|
||||
@dataclass
|
||||
class PortfolioAllocation:
|
||||
ticker: str
|
||||
price: float
|
||||
yield_annual: float
|
||||
initial_shares: float
|
||||
initial_allocation: float
|
||||
distribution: str
|
||||
|
||||
@dataclass
|
||||
class MonthlyData:
|
||||
month: int
|
||||
total_value: float
|
||||
monthly_income: float
|
||||
cumulative_income: float
|
||||
shares: Dict[str, float]
|
||||
prices: Dict[str, float]
|
||||
yields: Dict[str, float]
|
||||
|
||||
@dataclass
|
||||
class DripConfig:
|
||||
months: int
|
||||
erosion_type: str
|
||||
erosion_level: Dict
|
||||
dividend_frequency: Dict[str, int] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.dividend_frequency is None:
|
||||
self.dividend_frequency = {
|
||||
"Monthly": 12,
|
||||
"Quarterly": 4,
|
||||
"Semi-Annually": 2,
|
||||
"Annually": 1,
|
||||
"Unknown": 12 # Default to monthly if unknown
|
||||
}
|
||||
|
||||
@dataclass
|
||||
class DripResult:
|
||||
monthly_data: List[MonthlyData]
|
||||
final_portfolio_value: float
|
||||
total_income: float
|
||||
total_shares: Dict[str, float]
|
||||
@ -1,455 +0,0 @@
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import traceback
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult
|
||||
from ..nav_erosion_service import NavErosionService
|
||||
from .logger import logger
|
||||
|
||||
class DistributionFrequency(Enum):
|
||||
"""Enum for distribution frequencies"""
|
||||
MONTHLY = ("Monthly", 12)
|
||||
QUARTERLY = ("Quarterly", 4)
|
||||
SEMI_ANNUALLY = ("Semi-Annually", 2)
|
||||
ANNUALLY = ("Annually", 1)
|
||||
UNKNOWN = ("Unknown", 12)
|
||||
|
||||
def __init__(self, name: str, payments_per_year: int):
|
||||
self.display_name = name
|
||||
self.payments_per_year = payments_per_year
|
||||
|
||||
@dataclass
|
||||
class TickerData:
|
||||
"""Data structure for individual ticker information"""
|
||||
ticker: str
|
||||
price: float
|
||||
annual_yield: float
|
||||
shares: float
|
||||
allocation_pct: float
|
||||
distribution_freq: DistributionFrequency
|
||||
|
||||
@property
|
||||
def market_value(self) -> float:
|
||||
return self.price * self.shares
|
||||
|
||||
@property
|
||||
def monthly_yield(self) -> float:
|
||||
return self.annual_yield / 12
|
||||
|
||||
@property
|
||||
def distribution_yield(self) -> float:
|
||||
return self.annual_yield / self.distribution_freq.payments_per_year
|
||||
|
||||
class DripService:
|
||||
"""Enhanced DRIP calculation service with improved performance and accuracy"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
||||
self.nav_erosion_service = NavErosionService()
|
||||
|
||||
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
||||
"""
|
||||
Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy.
|
||||
|
||||
Args:
|
||||
portfolio_df: DataFrame containing portfolio allocation
|
||||
config: DripConfig object with simulation parameters
|
||||
|
||||
Returns:
|
||||
DripResult object containing the simulation results
|
||||
"""
|
||||
try:
|
||||
# Validate inputs
|
||||
self._validate_inputs(portfolio_df, config)
|
||||
|
||||
# Get erosion data from nav_erosion_service
|
||||
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
|
||||
logger.info(f"Erosion data results: {erosion_data.results}")
|
||||
|
||||
# Initialize erosion rates dictionary
|
||||
erosion_rates = {}
|
||||
|
||||
# Use erosion rates from nav_erosion_service
|
||||
for ticker in portfolio_df["Ticker"]:
|
||||
# Find the result for this ticker in erosion_data.results
|
||||
result = next((r for r in erosion_data.results if r.ticker == ticker), None)
|
||||
|
||||
if result:
|
||||
erosion_rates[ticker] = {
|
||||
"nav": result.monthly_nav_erosion_rate,
|
||||
"yield": result.monthly_yield_erosion_rate
|
||||
}
|
||||
logger.info(f"=== EROSION RATE DEBUG ===")
|
||||
logger.info(f"Ticker: {ticker}")
|
||||
logger.info(f"Erosion rates from nav_erosion_service:")
|
||||
logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}")
|
||||
logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}")
|
||||
logger.info(f"=== END EROSION RATE DEBUG ===\n")
|
||||
else:
|
||||
# Use default erosion rates if not found
|
||||
erosion_rates[ticker] = {
|
||||
"nav": 0.05, # 5% per month (very high, for test)
|
||||
"yield": 0.07 # 7% per month (very high, for test)
|
||||
}
|
||||
logger.info(f"=== EROSION RATE DEBUG ===")
|
||||
logger.info(f"Ticker: {ticker}")
|
||||
logger.info(f"Using default erosion rates:")
|
||||
logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}")
|
||||
logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}")
|
||||
logger.info(f"=== END EROSION RATE DEBUG ===\n")
|
||||
|
||||
# Log the final erosion rates dictionary
|
||||
logger.info(f"Final erosion rates dictionary: {erosion_rates}")
|
||||
|
||||
# Initialize portfolio data
|
||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
||||
|
||||
# Pre-calculate distribution schedule for performance
|
||||
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
|
||||
|
||||
# Initialize simulation state
|
||||
simulation_state = self._initialize_simulation_state(ticker_data)
|
||||
monthly_data: List[MonthlyData] = []
|
||||
|
||||
# Run monthly simulation
|
||||
for month in range(1, config.months + 1):
|
||||
logger.info(f"\n=== Starting Month {month} ===")
|
||||
logger.info(f"Initial state for month {month}:")
|
||||
for ticker in ticker_data.keys():
|
||||
logger.info(f" {ticker}:")
|
||||
logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}")
|
||||
logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}")
|
||||
logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}")
|
||||
|
||||
month_result = self._simulate_month(
|
||||
month,
|
||||
simulation_state,
|
||||
ticker_data,
|
||||
erosion_rates,
|
||||
distribution_schedule
|
||||
)
|
||||
monthly_data.append(month_result)
|
||||
|
||||
logger.info(f"Final state for month {month}:")
|
||||
for ticker in ticker_data.keys():
|
||||
logger.info(f" {ticker}:")
|
||||
logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}")
|
||||
logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}")
|
||||
logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}")
|
||||
logger.info(f"=== End Month {month} ===\n")
|
||||
|
||||
# Calculate final results
|
||||
return self._create_drip_result(monthly_data, simulation_state)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating DRIP growth: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
raise
|
||||
|
||||
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
|
||||
"""Validate input parameters"""
|
||||
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
|
||||
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
|
||||
|
||||
if missing_columns:
|
||||
raise ValueError(f"Missing required columns: {missing_columns}")
|
||||
|
||||
if config.months <= 0:
|
||||
raise ValueError("Months must be positive")
|
||||
|
||||
if portfolio_df.empty:
|
||||
raise ValueError("Portfolio DataFrame is empty")
|
||||
|
||||
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
|
||||
"""Initialize ticker data with validation"""
|
||||
ticker_data = {}
|
||||
|
||||
for _, row in portfolio_df.iterrows():
|
||||
ticker = row["Ticker"]
|
||||
|
||||
# Handle distribution frequency
|
||||
dist_period = row.get("Distribution Period", "Monthly")
|
||||
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
|
||||
|
||||
ticker_data[ticker] = TickerData(
|
||||
ticker=ticker,
|
||||
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
|
||||
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
|
||||
shares=max(0.0, float(row["Shares"])),
|
||||
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
|
||||
distribution_freq=dist_freq
|
||||
)
|
||||
|
||||
return ticker_data
|
||||
|
||||
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
|
||||
"""Pre-calculate which months each ticker pays distributions"""
|
||||
schedule = {}
|
||||
|
||||
for ticker, data in ticker_data.items():
|
||||
distribution_months = []
|
||||
freq = data.distribution_freq
|
||||
|
||||
for month in range(1, total_months + 1):
|
||||
if self._is_distribution_month(month, freq):
|
||||
distribution_months.append(month)
|
||||
|
||||
schedule[ticker] = distribution_months
|
||||
|
||||
return schedule
|
||||
|
||||
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
|
||||
"""Initialize simulation state variables"""
|
||||
return {
|
||||
'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()},
|
||||
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
|
||||
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
|
||||
'cumulative_income': 0.0
|
||||
}
|
||||
|
||||
def _simulate_month(
|
||||
self,
|
||||
month: int,
|
||||
state: Dict[str, Any],
|
||||
ticker_data: Dict[str, TickerData],
|
||||
erosion_rates: Dict[str, Dict[str, float]],
|
||||
distribution_schedule: Dict[str, List[int]]
|
||||
) -> MonthlyData:
|
||||
"""Simulate a single month with improved accuracy"""
|
||||
|
||||
# Debug logging for erosion rates
|
||||
logger.info(f"\n=== EROSION RATES DEBUG ===")
|
||||
logger.info(f"Erosion rates dictionary: {erosion_rates}")
|
||||
for ticker, rates in erosion_rates.items():
|
||||
logger.info(f" {ticker}:")
|
||||
logger.info(f" nav: {rates['nav']:.4%}")
|
||||
logger.info(f" yield: {rates['yield']:.4%}")
|
||||
logger.info(f"=== END EROSION RATES DEBUG ===\n")
|
||||
|
||||
# Apply erosion first
|
||||
for ticker, rates in erosion_rates.items():
|
||||
if ticker in state['current_prices']:
|
||||
# Get monthly erosion rates (already in decimal form)
|
||||
monthly_nav_erosion = rates['nav']
|
||||
monthly_yield_erosion = rates['yield']
|
||||
|
||||
# Get current values
|
||||
old_price = state['current_prices'][ticker]
|
||||
old_yield = state['current_yields'][ticker]
|
||||
|
||||
# Debug logging
|
||||
logger.info(f"\n=== EROSION CALCULATION DEBUG ===")
|
||||
logger.info(f"Ticker: {ticker}")
|
||||
logger.info(f"Raw erosion rates from nav_erosion_service:")
|
||||
logger.info(f" monthly_nav_erosion: {monthly_nav_erosion:.4%}")
|
||||
logger.info(f" monthly_yield_erosion: {monthly_yield_erosion:.4%}")
|
||||
logger.info(f"Current values:")
|
||||
logger.info(f" old_price: ${old_price:.4f}")
|
||||
logger.info(f" old_yield: {old_yield:.4%}")
|
||||
|
||||
# Calculate new values
|
||||
new_price = old_price * (1 - monthly_nav_erosion)
|
||||
new_yield = old_yield * (1 - monthly_yield_erosion)
|
||||
|
||||
logger.info(f"Calculated new values:")
|
||||
logger.info(f" new_price = ${old_price:.4f} * (1 - {monthly_nav_erosion:.4%})")
|
||||
logger.info(f" new_price = ${old_price:.4f} * {1 - monthly_nav_erosion:.4f}")
|
||||
logger.info(f" new_price = ${new_price:.4f}")
|
||||
logger.info(f" new_yield = {old_yield:.4%} * (1 - {monthly_yield_erosion:.4%})")
|
||||
logger.info(f" new_yield = {old_yield:.4%} * {1 - monthly_yield_erosion:.4f}")
|
||||
logger.info(f" new_yield = {new_yield:.4%}")
|
||||
|
||||
# Apply the new values with bounds checking
|
||||
state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices
|
||||
state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields
|
||||
|
||||
logger.info(f"Final values after bounds checking:")
|
||||
logger.info(f" final_price: ${state['current_prices'][ticker]:.4f}")
|
||||
logger.info(f" final_yield: {state['current_yields'][ticker]:.4%}")
|
||||
logger.info(f"=== END EROSION CALCULATION DEBUG ===\n")
|
||||
|
||||
# Log the actual erosion being applied
|
||||
logger.info(f"Applied erosion to {ticker}:")
|
||||
logger.info(f" NAV: {monthly_nav_erosion:.4%} -> New price: ${state['current_prices'][ticker]:.2f}")
|
||||
logger.info(f" Yield: {monthly_yield_erosion:.4%} -> New yield: {state['current_yields'][ticker]:.2%}")
|
||||
|
||||
# Calculate monthly income from distributions using eroded values
|
||||
monthly_income = self._calculate_monthly_distributions(
|
||||
month, state, ticker_data, distribution_schedule
|
||||
)
|
||||
|
||||
# Update cumulative income
|
||||
state['cumulative_income'] += monthly_income
|
||||
|
||||
# Reinvest dividends (DRIP)
|
||||
self._reinvest_dividends(month, state, distribution_schedule)
|
||||
|
||||
# Calculate total portfolio value with bounds checking
|
||||
total_value = 0.0
|
||||
for ticker in ticker_data.keys():
|
||||
shares = state['current_shares'][ticker]
|
||||
price = state['current_prices'][ticker]
|
||||
if shares > 0 and price > 0:
|
||||
total_value += shares * price
|
||||
|
||||
return MonthlyData(
|
||||
month=month,
|
||||
total_value=total_value,
|
||||
monthly_income=monthly_income,
|
||||
cumulative_income=state['cumulative_income'],
|
||||
shares=state['current_shares'].copy(),
|
||||
prices=state['current_prices'].copy(),
|
||||
yields=state['current_yields'].copy()
|
||||
)
|
||||
|
||||
def _calculate_monthly_distributions(
|
||||
self,
|
||||
month: int,
|
||||
state: Dict[str, Any],
|
||||
ticker_data: Dict[str, TickerData],
|
||||
distribution_schedule: Dict[str, List[int]]
|
||||
) -> float:
|
||||
"""Calculate distributions for the current month"""
|
||||
monthly_income = 0.0
|
||||
|
||||
for ticker, data in ticker_data.items():
|
||||
if month in distribution_schedule[ticker]:
|
||||
shares = state['current_shares'][ticker]
|
||||
price = state['current_prices'][ticker]
|
||||
yield_rate = state['current_yields'][ticker]
|
||||
|
||||
# Calculate distribution amount using annual yield divided by payments per year
|
||||
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
|
||||
distribution_amount = shares * price * distribution_yield
|
||||
monthly_income += distribution_amount
|
||||
|
||||
return monthly_income
|
||||
|
||||
def _reinvest_dividends(
|
||||
self,
|
||||
month: int,
|
||||
state: Dict[str, Any],
|
||||
distribution_schedule: Dict[str, List[int]]
|
||||
) -> None:
|
||||
"""Reinvest dividends for tickers that distributed in this month"""
|
||||
|
||||
for ticker, distribution_months in distribution_schedule.items():
|
||||
if month in distribution_months:
|
||||
shares = state['current_shares'][ticker]
|
||||
price = state['current_prices'][ticker]
|
||||
yield_rate = state['current_yields'][ticker]
|
||||
|
||||
# Calculate dividend income using the correct distribution frequency
|
||||
freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY)
|
||||
dividend_income = shares * price * (yield_rate / freq.payments_per_year)
|
||||
|
||||
# Purchase additional shares
|
||||
if price > 0:
|
||||
new_shares = dividend_income / price
|
||||
state['current_shares'][ticker] += new_shares
|
||||
|
||||
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
|
||||
"""Check if current month is a distribution month"""
|
||||
if frequency == DistributionFrequency.MONTHLY:
|
||||
return True
|
||||
elif frequency == DistributionFrequency.QUARTERLY:
|
||||
return month % 3 == 0
|
||||
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
|
||||
return month % 6 == 0
|
||||
elif frequency == DistributionFrequency.ANNUALLY:
|
||||
return month % 12 == 0
|
||||
else:
|
||||
return True # Default to monthly for unknown
|
||||
|
||||
def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult:
|
||||
"""Create final DRIP result object"""
|
||||
if not monthly_data:
|
||||
raise ValueError("No monthly data generated")
|
||||
|
||||
final_data = monthly_data[-1]
|
||||
|
||||
return DripResult(
|
||||
monthly_data=monthly_data,
|
||||
final_portfolio_value=final_data.total_value,
|
||||
total_income=final_data.cumulative_income,
|
||||
total_shares=state['current_shares'].copy()
|
||||
)
|
||||
|
||||
# Utility methods for analysis and comparison
|
||||
def calculate_drip_vs_no_drip_comparison(
|
||||
self,
|
||||
portfolio_df: pd.DataFrame,
|
||||
config: DripConfig
|
||||
) -> Dict[str, Any]:
|
||||
"""Calculate comparison between DRIP and no-DRIP scenarios"""
|
||||
|
||||
# Calculate DRIP scenario
|
||||
drip_result = self.calculate_drip_growth(portfolio_df, config)
|
||||
|
||||
# Calculate no-DRIP scenario (dividends not reinvested)
|
||||
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
|
||||
|
||||
# Calculate comparison metrics
|
||||
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
|
||||
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
|
||||
|
||||
return {
|
||||
'drip_final_value': drip_result.final_portfolio_value,
|
||||
'no_drip_final_value': no_drip_result['final_value'],
|
||||
'drip_advantage': drip_advantage,
|
||||
'percentage_advantage': percentage_advantage,
|
||||
'total_dividends_reinvested': drip_result.total_income,
|
||||
'cash_dividends_no_drip': no_drip_result['total_dividends']
|
||||
}
|
||||
|
||||
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
|
||||
"""Calculate scenario where dividends are not reinvested"""
|
||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
||||
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
|
||||
erosion_rates = {
|
||||
result.ticker: {
|
||||
"nav": result.monthly_nav_erosion_rate,
|
||||
"yield": result.monthly_yield_erosion_rate
|
||||
}
|
||||
for result in erosion_data.results
|
||||
}
|
||||
state = self._initialize_simulation_state(ticker_data)
|
||||
|
||||
total_dividends = 0.0
|
||||
|
||||
for month in range(1, config.months + 1):
|
||||
# Calculate dividends but don't reinvest
|
||||
monthly_dividends = self._calculate_monthly_distributions(
|
||||
month, state, ticker_data,
|
||||
self._create_distribution_schedule(ticker_data, config.months)
|
||||
)
|
||||
total_dividends += monthly_dividends
|
||||
|
||||
# Apply erosion
|
||||
for ticker, rates in erosion_rates.items():
|
||||
if ticker in state['current_prices']:
|
||||
# Get monthly erosion rates (already in decimal form)
|
||||
monthly_nav_erosion = rates['nav']
|
||||
monthly_yield_erosion = rates['yield']
|
||||
|
||||
# Apply NAV erosion (decrease price)
|
||||
old_price = state['current_prices'][ticker]
|
||||
new_price = old_price * (1 - monthly_nav_erosion)
|
||||
state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices
|
||||
|
||||
# Apply yield erosion (decrease yield)
|
||||
old_yield = state['current_yields'][ticker]
|
||||
new_yield = old_yield * (1 - monthly_yield_erosion)
|
||||
state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields
|
||||
|
||||
final_value = sum(
|
||||
state['current_shares'][ticker] * state['current_prices'][ticker]
|
||||
for ticker in ticker_data.keys()
|
||||
)
|
||||
|
||||
return {
|
||||
'final_value': final_value,
|
||||
'total_dividends': total_dividends
|
||||
}
|
||||
@ -1,8 +0,0 @@
|
||||
"""
|
||||
Nav Erosion Service package
|
||||
"""
|
||||
|
||||
from .service import NavErosionService
|
||||
from .models import NavErosionResult
|
||||
|
||||
__all__ = ['NavErosionService', 'NavErosionResult']
|
||||
@ -1,31 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
|
||||
@dataclass
|
||||
class NavErosionConfig:
|
||||
max_erosion_level: int = 9
|
||||
max_monthly_erosion: float = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
|
||||
use_per_ticker: bool = False
|
||||
global_nav_rate: float = 0
|
||||
per_ticker_rates: Dict[str, float] = None
|
||||
|
||||
@dataclass
|
||||
class NavErosionResult:
|
||||
ticker: str
|
||||
nav_erosion_rate: float
|
||||
monthly_erosion_rate: float
|
||||
annual_erosion_rate: float
|
||||
risk_level: int # 0-9 scale
|
||||
risk_explanation: str
|
||||
max_drawdown: float
|
||||
volatility: float
|
||||
is_new_etf: bool
|
||||
etf_age_years: Optional[float]
|
||||
|
||||
@dataclass
|
||||
class NavErosionAnalysis:
|
||||
results: List[NavErosionResult]
|
||||
portfolio_nav_risk: float # Average risk level
|
||||
portfolio_erosion_rate: float # Weighted average erosion rate
|
||||
risk_summary: str
|
||||
@ -1,209 +0,0 @@
|
||||
from typing import Dict, List, Tuple
|
||||
from .models import NavErosionConfig, NavErosionResult, NavErosionAnalysis
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
import streamlit as st
|
||||
|
||||
class ETFType(Enum):
|
||||
INCOME = "Income"
|
||||
GROWTH = "Growth"
|
||||
BALANCED = "Balanced"
|
||||
|
||||
@dataclass
|
||||
class NavErosionResult:
|
||||
"""Result of NAV erosion analysis for a single ETF"""
|
||||
ticker: str
|
||||
nav_erosion_rate: float # Annual NAV erosion rate
|
||||
yield_erosion_rate: float # Annual yield erosion rate
|
||||
monthly_nav_erosion_rate: float # Monthly NAV erosion rate
|
||||
monthly_yield_erosion_rate: float # Monthly yield erosion rate
|
||||
risk_level: int
|
||||
risk_explanation: str
|
||||
max_drawdown: float
|
||||
volatility: float
|
||||
is_new_etf: bool
|
||||
etf_age_years: float
|
||||
|
||||
@dataclass
|
||||
class NavErosionAnalysis:
|
||||
"""Complete NAV erosion analysis results"""
|
||||
results: List[NavErosionResult]
|
||||
portfolio_nav_risk: float = 0.0
|
||||
portfolio_erosion_rate: float = 0.0
|
||||
risk_summary: str = ""
|
||||
|
||||
class NavErosionService:
|
||||
def __init__(self):
|
||||
self.NAV_COMPONENT_WEIGHTS = {
|
||||
'drawdown': 0.4,
|
||||
'volatility': 0.3,
|
||||
'sharpe': 0.15,
|
||||
'sortino': 0.15
|
||||
}
|
||||
|
||||
# Default erosion rates based on risk level (0-9)
|
||||
self.RISK_TO_EROSION = {
|
||||
0: 0.01, # 1% annual
|
||||
1: 0.02, # 2% annual
|
||||
2: 0.03, # 3% annual
|
||||
3: 0.04, # 4% annual
|
||||
4: 0.05, # 5% annual
|
||||
5: 0.06, # 6% annual
|
||||
6: 0.07, # 7% annual
|
||||
7: 0.08, # 8% annual
|
||||
8: 0.09, # 9% annual
|
||||
9: 0.10 # 10% annual
|
||||
}
|
||||
|
||||
def analyze_etf_erosion_risk(self, tickers: List[str]) -> NavErosionAnalysis:
|
||||
"""Analyze NAV erosion risk for a list of ETFs"""
|
||||
results = []
|
||||
|
||||
print("\n=== NAV EROSION SERVICE DEBUG ===")
|
||||
print(f"Session state keys: {st.session_state.keys()}")
|
||||
print(f"Erosion level from session state: {st.session_state.get('erosion_level')}")
|
||||
|
||||
for ticker in tickers:
|
||||
# Get erosion rates from session state
|
||||
erosion_level = st.session_state.get('erosion_level', {'nav': 5.0, 'yield': 5.0})
|
||||
annual_nav_erosion = erosion_level['nav'] / 100 # Convert from percentage to decimal
|
||||
annual_yield_erosion = erosion_level['yield'] / 100 # Convert from percentage to decimal
|
||||
|
||||
# Convert annual rates to monthly
|
||||
monthly_nav_erosion = 1 - (1 - annual_nav_erosion) ** (1/12)
|
||||
monthly_yield_erosion = 1 - (1 - annual_yield_erosion) ** (1/12)
|
||||
|
||||
print(f"\n=== NAV EROSION SERVICE DEBUG ===")
|
||||
print(f"Ticker: {ticker}")
|
||||
print(f"Session State Values:")
|
||||
print(f" Annual NAV Erosion: {annual_nav_erosion:.4%}")
|
||||
print(f" Annual Yield Erosion: {annual_yield_erosion:.4%}")
|
||||
print(f" Monthly NAV Erosion: {monthly_nav_erosion:.4%}")
|
||||
print(f" Monthly Yield Erosion: {monthly_yield_erosion:.4%}")
|
||||
print(f"=== END NAV EROSION SERVICE DEBUG ===\n")
|
||||
|
||||
result = NavErosionResult(
|
||||
ticker=ticker,
|
||||
nav_erosion_rate=annual_nav_erosion,
|
||||
yield_erosion_rate=annual_yield_erosion,
|
||||
monthly_nav_erosion_rate=monthly_nav_erosion,
|
||||
monthly_yield_erosion_rate=monthly_yield_erosion,
|
||||
risk_level=5, # Arbitrary risk level
|
||||
risk_explanation="Using erosion rates from session state",
|
||||
max_drawdown=0.2,
|
||||
volatility=0.25,
|
||||
is_new_etf=False,
|
||||
etf_age_years=1.0
|
||||
)
|
||||
results.append(result)
|
||||
print(f"Created NavErosionResult for {ticker}:")
|
||||
print(f" monthly_nav_erosion_rate: {result.monthly_nav_erosion_rate:.4%}")
|
||||
print(f" monthly_yield_erosion_rate: {result.monthly_yield_erosion_rate:.4%}")
|
||||
|
||||
# Calculate portfolio-level metrics
|
||||
portfolio_nav_risk = sum(r.risk_level for r in results) / len(results)
|
||||
portfolio_erosion_rate = sum(r.nav_erosion_rate for r in results) / len(results)
|
||||
|
||||
analysis = NavErosionAnalysis(
|
||||
results=results,
|
||||
portfolio_nav_risk=portfolio_nav_risk,
|
||||
portfolio_erosion_rate=portfolio_erosion_rate,
|
||||
risk_summary="Portfolio has moderate NAV erosion risk"
|
||||
)
|
||||
|
||||
print("\nFinal NavErosionAnalysis:")
|
||||
for r in analysis.results:
|
||||
print(f" {r.ticker}:")
|
||||
print(f" monthly_nav_erosion_rate: {r.monthly_nav_erosion_rate:.4%}")
|
||||
print(f" monthly_yield_erosion_rate: {r.monthly_yield_erosion_rate:.4%}")
|
||||
print("=== END NAV EROSION SERVICE DEBUG ===\n")
|
||||
|
||||
return analysis
|
||||
|
||||
def _calculate_nav_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]:
|
||||
"""Calculate NAV risk components with ETF-type specific adjustments"""
|
||||
components = {}
|
||||
|
||||
# Base risk calculation with ETF-type specific thresholds
|
||||
if etf_data.get('max_drawdown') is not None:
|
||||
if etf_type == ETFType.INCOME:
|
||||
# Income ETFs typically have lower drawdowns
|
||||
if etf_data['max_drawdown'] > 0.25:
|
||||
components['drawdown'] = 7
|
||||
elif etf_data['max_drawdown'] > 0.15:
|
||||
components['drawdown'] = 5
|
||||
elif etf_data['max_drawdown'] > 0.10:
|
||||
components['drawdown'] = 3
|
||||
else:
|
||||
components['drawdown'] = 2
|
||||
elif etf_type == ETFType.GROWTH:
|
||||
# Growth ETFs typically have higher drawdowns
|
||||
if etf_data['max_drawdown'] > 0.35:
|
||||
components['drawdown'] = 7
|
||||
elif etf_data['max_drawdown'] > 0.25:
|
||||
components['drawdown'] = 5
|
||||
elif etf_data['max_drawdown'] > 0.15:
|
||||
components['drawdown'] = 3
|
||||
else:
|
||||
components['drawdown'] = 2
|
||||
else: # BALANCED
|
||||
# Balanced ETFs have moderate drawdowns
|
||||
if etf_data['max_drawdown'] > 0.30:
|
||||
components['drawdown'] = 7
|
||||
elif etf_data['max_drawdown'] > 0.20:
|
||||
components['drawdown'] = 5
|
||||
elif etf_data['max_drawdown'] > 0.12:
|
||||
components['drawdown'] = 3
|
||||
else:
|
||||
components['drawdown'] = 2
|
||||
else:
|
||||
components['drawdown'] = 4 # Default medium risk if no data
|
||||
|
||||
# Rest of the method remains unchanged
|
||||
if etf_data.get('volatility') is not None:
|
||||
if etf_data['volatility'] > 0.40:
|
||||
components['volatility'] = 7
|
||||
elif etf_data['volatility'] > 0.25:
|
||||
components['volatility'] = 5
|
||||
elif etf_data['volatility'] > 0.15:
|
||||
components['volatility'] = 3
|
||||
else:
|
||||
components['volatility'] = 2
|
||||
else:
|
||||
components['volatility'] = 4
|
||||
|
||||
if etf_data.get('sharpe_ratio') is not None:
|
||||
if etf_data['sharpe_ratio'] >= 2.0:
|
||||
components['sharpe'] = 1
|
||||
elif etf_data['sharpe_ratio'] >= 1.5:
|
||||
components['sharpe'] = 2
|
||||
elif etf_data['sharpe_ratio'] >= 1.0:
|
||||
components['sharpe'] = 3
|
||||
elif etf_data['sharpe_ratio'] >= 0.5:
|
||||
components['sharpe'] = 4
|
||||
else:
|
||||
components['sharpe'] = 5
|
||||
else:
|
||||
components['sharpe'] = 4
|
||||
|
||||
if etf_data.get('sortino_ratio') is not None:
|
||||
if etf_data['sortino_ratio'] >= 2.0:
|
||||
components['sortino'] = 1
|
||||
elif etf_data['sortino_ratio'] >= 1.5:
|
||||
components['sortino'] = 2
|
||||
elif etf_data['sortino_ratio'] >= 1.0:
|
||||
components['sortino'] = 3
|
||||
elif etf_data['sortino_ratio'] >= 0.5:
|
||||
components['sortino'] = 4
|
||||
else:
|
||||
components['sortino'] = 5
|
||||
else:
|
||||
components['sortino'] = 4
|
||||
|
||||
# Calculate weighted NAV risk
|
||||
nav_risk = sum(
|
||||
components[component] * weight
|
||||
for component, weight in self.NAV_COMPONENT_WEIGHTS.items()
|
||||
)
|
||||
|
||||
return nav_risk, components
|
||||
Loading…
Reference in New Issue
Block a user