Fix: Adjusted ETF age calculation and risk scoring

This commit is contained in:
Pascal BIBEHE 2025-05-29 23:37:13 +02:00
parent 65209331f5
commit 93d93277b1

View File

@ -4,12 +4,22 @@ NAV Erosion Service implementation
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple from typing import List, Dict, Optional, Tuple
import yfinance as yf import yfinance as yf
from .models import NavErosionResult, NavErosionAnalysis from .models import NavErosionResult, NavErosionAnalysis
from .exceptions import NavErosionError, DataFetchError, CalculationError from .exceptions import NavErosionError, DataFetchError, CalculationError
from .logger import ErosionRiskLogger from .logger import get_logger
from enum import Enum
from ETF_Portal.services.data_service import DataService # Our existing data service
logger = get_logger(__name__)
class ETFType(Enum):
INDEX = "index" # e.g., VTI, VOO
COVERED_CALL = "covered_call" # e.g., JEPI
HIGH_YIELD = "high_yield" # e.g., FEPI
SECTOR = "sector" # e.g., MSTY
class NavErosionService: class NavErosionService:
"""Service for calculating NAV erosion risk""" """Service for calculating NAV erosion risk"""
@ -41,45 +51,149 @@ class NavErosionService:
} }
def __init__(self): def __init__(self):
self.logger = ErosionRiskLogger() self.data_service = DataService() # Use our existing data service
self.etf_types = {
'VTI': ETFType.INDEX,
'VOO': ETFType.INDEX,
'JEPI': ETFType.COVERED_CALL,
'FEPI': ETFType.HIGH_YIELD,
'MSTY': ETFType.SECTOR,
# Add more ETFs as needed
}
def analyze_etf_erosion_risk(self, tickers: List[str], debug: bool = False) -> NavErosionAnalysis: # ETF-type specific weights for risk calculation
"""Analyze erosion risk for a list of ETFs""" self.risk_weights = {
ETFType.INDEX: {'stability': 0.4, 'growth': 0.4, 'payout': 0.2},
ETFType.COVERED_CALL: {'stability': 0.3, 'growth': 0.2, 'payout': 0.5},
ETFType.HIGH_YIELD: {'stability': 0.5, 'growth': 0.3, 'payout': 0.2},
ETFType.SECTOR: {'stability': 0.4, 'growth': 0.3, 'payout': 0.3}
}
# Maximum annual erosion by ETF type
self.max_erosion = {
ETFType.INDEX: 0.15, # 15% max for index ETFs
ETFType.COVERED_CALL: 0.30, # 30% max for covered call
ETFType.HIGH_YIELD: 0.25, # 25% max for high yield
ETFType.SECTOR: 0.20 # 20% max for sector
}
def get_etf_type(self, ticker: str) -> ETFType:
"""Get ETF type from our mapping or default to INDEX."""
return self.etf_types.get(ticker, ETFType.INDEX)
def calculate_stability(self, etf_data: Dict) -> float:
"""Calculate dividend stability (0-1)."""
if not etf_data.get('dividends'):
return 0.0
# Convert dividends dict to DataFrame
dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends'])
dividends.index = pd.to_datetime(dividends.index)
# Calculate coefficient of variation
mean = dividends['Dividends'].mean()
std = dividends['Dividends'].std()
cv = std / mean if mean > 0 else 1.0
# Convert to stability score (0-1)
stability = 1 / (1 + cv)
return min(max(stability, 0), 1)
def calculate_growth(self, etf_data: Dict) -> float:
"""Calculate dividend growth (0-1)."""
if not etf_data.get('dividends'):
return 0.0
# Convert dividends dict to DataFrame
dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends'])
dividends.index = pd.to_datetime(dividends.index)
# Calculate year-over-year growth
yearly_divs = dividends.resample('Y')['Dividends'].sum()
if len(yearly_divs) < 2:
return 0.5 # Neutral if not enough data
growth_rates = yearly_divs.pct_change().dropna()
avg_growth = growth_rates.mean()
# Convert to growth score (0-1)
# 0% growth = 0.5, 10% growth = 1.0, -10% growth = 0.0
growth_score = 0.5 + (avg_growth * 5)
return min(max(growth_score, 0), 1)
def calculate_payout_ratio(self, etf_data: Dict) -> float:
"""Calculate payout ratio (0-1)."""
if not etf_data.get('dividends') or not etf_data.get('hist'):
return 0.0
# Convert dividends dict to DataFrame
dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends'])
dividends.index = pd.to_datetime(dividends.index)
# Convert historical data to DataFrame
hist = pd.DataFrame.from_dict(etf_data['hist'])
hist.index = pd.to_datetime(hist.index)
# Calculate annual dividend yield
annual_div = dividends['Dividends'].sum()
avg_price = hist['Close'].mean()
yield_ratio = annual_div / avg_price if avg_price > 0 else 0
# Normalize to 0-1 range (assuming max yield of 20%)
payout_ratio = min(yield_ratio / 0.20, 1.0)
return payout_ratio
def analyze_etf_erosion_risk(self, tickers: List[str]) -> NavErosionAnalysis:
"""Analyze erosion risk for a list of ETFs."""
results = [] results = []
current_date = pd.Timestamp.now(tz='UTC') errors = []
for ticker in tickers: for ticker in tickers:
try: try:
# Get ETF data # Get ETF type
etf_data = self._fetch_etf_data(ticker) etf_type = self.get_etf_type(ticker)
if not etf_data:
self.logger.log_warning(ticker, "No data available")
continue
# Calculate risk components # Get ETF data using our existing pipeline
nav_risk, nav_components = self._calculate_nav_risk(etf_data) etf_data = self._fetch_etf_data(ticker)
yield_risk, yield_components = self._calculate_yield_risk(etf_data)
if etf_data.get('is_estimated', False):
logger.warning(f"Using estimated data for {ticker}")
# Calculate all risk components with ETF-type specific adjustments
nav_risk, nav_components = self._calculate_nav_risk(etf_data, etf_type)
yield_risk, yield_components = self._calculate_yield_risk(etf_data, etf_type)
structural_risk, structural_components = self._calculate_structural_risk(etf_data) structural_risk, structural_components = self._calculate_structural_risk(etf_data)
# Calculate final risk scores # Calculate final risk scores
final_nav_risk = self._calculate_final_risk(nav_risk, yield_risk, structural_risk) final_nav_risk = round(
nav_risk * self.NAV_RISK_WEIGHT +
structural_risk * self.STRUCTURAL_RISK_WEIGHT
)
# Create result final_yield_risk = round(
yield_risk * self.YIELD_RISK_WEIGHT +
structural_risk * self.STRUCTURAL_RISK_WEIGHT
)
# Create result object
result = NavErosionResult( result = NavErosionResult(
ticker=ticker, ticker=ticker,
nav_erosion_risk=int(final_nav_risk), nav_erosion_risk=final_nav_risk,
yield_erosion_risk=int(yield_risk), yield_erosion_risk=final_yield_risk,
estimated_nav_erosion=final_nav_risk / 9 * 0.9, # Convert to percentage estimated_nav_erosion=round(final_nav_risk / 9 * self.max_erosion[etf_type], 3),
estimated_yield_erosion=yield_risk / 9 * 0.9, # Convert to percentage estimated_yield_erosion=round(final_yield_risk / 9 * self.max_erosion[etf_type], 3),
nav_risk_explanation=self._generate_nav_explanation(nav_components), nav_risk_explanation=self._generate_nav_explanation(nav_components),
yield_risk_explanation=self._generate_yield_explanation(yield_components), yield_risk_explanation=(
etf_age_years=etf_data.get('age_years'), f"Dividend stability: {yield_components['stability']:.1%}, "
is_new_etf=etf_data.get('is_new', False), f"Growth: {yield_components['growth']:.1%}, "
max_drawdown=etf_data.get('max_drawdown'), f"Payout ratio: {yield_components['payout']:.1%}"
volatility=etf_data.get('volatility'), ),
sharpe_ratio=etf_data.get('sharpe_ratio'), etf_age_years=etf_data.get('age_years', 3),
sortino_ratio=etf_data.get('sortino_ratio'), max_drawdown=round(etf_data.get('max_drawdown', 0.0), 3),
dividend_trend=etf_data.get('dividend_trend'), volatility=round(etf_data.get('volatility', 0.0), 3),
sharpe_ratio=round(etf_data.get('sharpe_ratio', 0.0), 2),
sortino_ratio=round(etf_data.get('sortino_ratio', 0.0), 2),
dividend_trend=round(etf_data.get('dividend_trend', 0.0), 3),
component_risks={ component_risks={
'nav': nav_components, 'nav': nav_components,
'yield': yield_components, 'yield': yield_components,
@ -88,117 +202,72 @@ class NavErosionService:
) )
results.append(result) results.append(result)
self.logger.log_risk_calculation(ticker, result.component_risks, final_nav_risk)
except Exception as e: except Exception as e:
self.logger.log_error(ticker, e) logger.error(f"Error analyzing {ticker}: {str(e)}")
if debug: errors.append((ticker, str(e)))
raise # Add a result with error info
continue results.append(NavErosionResult(
ticker=ticker,
nav_erosion_risk=0,
yield_erosion_risk=0,
estimated_nav_erosion=0.0,
estimated_yield_erosion=0.0,
nav_risk_explanation=f"Error: {str(e)}",
yield_risk_explanation=f"Error: {str(e)}",
component_risks={}
))
if not results: if not results:
raise CalculationError("No valid results generated") raise CalculationError(f"No valid results generated. Errors: {errors}")
# Calculate portfolio-level metrics # Calculate portfolio averages
portfolio_nav_risk = np.mean([r.nav_erosion_risk for r in results]) portfolio_nav_risk = round(sum(r.nav_erosion_risk for r in results) / len(results))
portfolio_yield_risk = np.mean([r.yield_erosion_risk for r in results]) portfolio_yield_risk = round(sum(r.yield_erosion_risk for r in results) / len(results))
return NavErosionAnalysis( return NavErosionAnalysis(
results=results, results=results,
portfolio_nav_risk=portfolio_nav_risk, portfolio_nav_risk=portfolio_nav_risk,
portfolio_yield_risk=portfolio_yield_risk, portfolio_yield_risk=portfolio_yield_risk,
risk_summary=self._generate_portfolio_summary(results), risk_summary=f"Portfolio average NAV risk: {portfolio_nav_risk}/9, Yield risk: {portfolio_yield_risk}/9",
timestamp=datetime.now(), timestamp=datetime.now(),
component_weights={ component_weights=self.risk_weights
'nav': self.NAV_RISK_WEIGHT,
'yield': self.YIELD_RISK_WEIGHT,
'structural': self.STRUCTURAL_RISK_WEIGHT
}
) )
def _fetch_etf_data(self, ticker: str) -> Dict: def _fetch_etf_data(self, ticker: str) -> Dict:
"""Fetch ETF data with fallback logic""" """Fetch ETF data using our existing data pipeline."""
try: try:
yf_ticker = yf.Ticker(ticker) # Use our existing data service
data = self.data_service.get_etf_data(ticker)
if not data:
raise DataFetchError(f"No data available for {ticker}")
# Get basic info # Calculate actual ETF age
info = yf_ticker.info if 'info' in data and 'firstTradeDateEpochUtc' in data['info']:
if not info: inception_date = datetime.fromtimestamp(data['info']['firstTradeDateEpochUtc'])
return None age_years = (datetime.now() - inception_date).days / 365.25
data['age_years'] = round(age_years)
# Get historical data
hist = yf_ticker.history(period="5y")
if hist.empty:
return None
# Get dividends
dividends = yf_ticker.dividends
if dividends is None or dividends.empty:
dividends = pd.Series()
# Calculate metrics
returns = hist['Close'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252) # Annualized
# Calculate max drawdown
rolling_max = hist['Close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist['Close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate Sharpe and Sortino ratios
risk_free_rate = 0.02 # Assuming 2% risk-free rate
excess_returns = returns - risk_free_rate/252
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
# Sortino ratio (using negative returns only)
negative_returns = returns[returns < 0]
sortino_ratio = np.sqrt(252) * excess_returns.mean() / negative_returns.std() if len(negative_returns) > 0 else 0
# Calculate dividend trend
if not dividends.empty:
monthly_div = dividends.resample('M').sum()
if len(monthly_div) > 12:
earliest_ttm = monthly_div[-12:].sum()
latest_ttm = monthly_div[-1:].sum()
dividend_trend = (latest_ttm / earliest_ttm - 1) if earliest_ttm > 0 else 0
else:
dividend_trend = 0
else: else:
dividend_trend = 0 # Known ETF inception dates as fallback
known_ages = {
# Calculate ETF age 'VTI': 23, # Inception: 2001
inception_date = info.get('fundInceptionDate') 'VOO': 13, # Inception: 2010
if inception_date: 'JEPI': 4, # Inception: 2020
try: 'FEPI': 3, # Inception: 2021
inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) 'MSTY': 1 # Inception: 2022
age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25 }
except: data['age_years'] = known_ages.get(ticker, 3)
age_years = None
else:
age_years = None
return {
'info': info,
'hist': hist,
'dividends': dividends,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years is not None and age_years < 2
}
return data
except Exception as e: except Exception as e:
self.logger.log_error(ticker, e) logger.error(f"Error fetching data for {ticker}: {str(e)}")
return None raise DataFetchError(f"Failed to fetch data for {ticker}: {str(e)}")
def _calculate_nav_risk(self, etf_data: Dict) -> Tuple[float, Dict]: def _calculate_nav_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]:
"""Calculate NAV risk components""" """Calculate NAV risk components with ETF-type specific adjustments"""
components = {} components = {}
# Drawdown risk # Base risk calculation
if etf_data.get('max_drawdown') is not None: if etf_data.get('max_drawdown') is not None:
if etf_data['max_drawdown'] > 0.40: if etf_data['max_drawdown'] > 0.40:
components['drawdown'] = 7 components['drawdown'] = 7
@ -209,9 +278,8 @@ class NavErosionService:
else: else:
components['drawdown'] = 2 components['drawdown'] = 2
else: else:
components['drawdown'] = 4 # Default medium-low components['drawdown'] = 4
# Volatility risk
if etf_data.get('volatility') is not None: if etf_data.get('volatility') is not None:
if etf_data['volatility'] > 0.40: if etf_data['volatility'] > 0.40:
components['volatility'] = 7 components['volatility'] = 7
@ -222,9 +290,8 @@ class NavErosionService:
else: else:
components['volatility'] = 2 components['volatility'] = 2
else: else:
components['volatility'] = 4 # Default medium-low components['volatility'] = 4
# Sharpe ratio risk
if etf_data.get('sharpe_ratio') is not None: if etf_data.get('sharpe_ratio') is not None:
if etf_data['sharpe_ratio'] >= 2.0: if etf_data['sharpe_ratio'] >= 2.0:
components['sharpe'] = 1 components['sharpe'] = 1
@ -237,9 +304,8 @@ class NavErosionService:
else: else:
components['sharpe'] = 5 components['sharpe'] = 5
else: else:
components['sharpe'] = 4 # Default medium components['sharpe'] = 4
# Sortino ratio risk
if etf_data.get('sortino_ratio') is not None: if etf_data.get('sortino_ratio') is not None:
if etf_data['sortino_ratio'] >= 2.0: if etf_data['sortino_ratio'] >= 2.0:
components['sortino'] = 1 components['sortino'] = 1
@ -252,69 +318,63 @@ class NavErosionService:
else: else:
components['sortino'] = 5 components['sortino'] = 5
else: else:
components['sortino'] = 4 # Default medium components['sortino'] = 4
# ETF-type specific adjustments for NAV risk
if etf_type == ETFType.INDEX:
# Index ETFs are generally more stable
components = {k: max(1, v - 2) for k, v in components.items()}
elif etf_type == ETFType.SECTOR:
# Sector ETFs are more volatile
components = {k: min(9, v + 1) for k, v in components.items()}
elif etf_type == ETFType.COVERED_CALL:
# Covered call ETFs have higher NAV risk due to strategy complexity
components = {k: min(9, v + 3) for k, v in components.items()}
elif etf_type == ETFType.HIGH_YIELD:
# High yield ETFs have highest NAV risk
components = {k: min(9, v + 3) for k, v in components.items()}
# Calculate weighted NAV risk # Calculate weighted NAV risk
nav_risk = sum( nav_risk = sum(
components[component] * weight components[component] * weight
for component, weight in self.NAV_COMPONENT_WEIGHTS.items() for component, weight in self.NAV_COMPONENT_WEIGHTS.items()
) * self.NAV_RISK_WEIGHT )
return nav_risk, components return nav_risk, components
def _calculate_yield_risk(self, etf_data: Dict) -> Tuple[float, Dict]: def _calculate_yield_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]:
"""Calculate yield risk components""" """Calculate yield risk components with ETF-type specific adjustments"""
components = {} components = {}
# Dividend stability risk # Calculate base components
if etf_data.get('dividend_trend') is not None: stability = self.calculate_stability(etf_data)
if etf_data['dividend_trend'] < -0.30: growth = self.calculate_growth(etf_data)
components['stability'] = 8 payout = self.calculate_payout_ratio(etf_data)
elif etf_data['dividend_trend'] < -0.15:
components['stability'] = 6
elif etf_data['dividend_trend'] < -0.05:
components['stability'] = 4
elif etf_data['dividend_trend'] > 0.10:
components['stability'] = 2
else:
components['stability'] = 3
else:
components['stability'] = 4 # Default medium
# Dividend growth risk # Convert to risk scores (1-9)
if etf_data.get('dividend_trend') is not None: components['stability'] = int((1 - stability) * 8) + 1
if etf_data['dividend_trend'] > 0.10: components['growth'] = int((1 - growth) * 8) + 1
components['growth'] = 2 components['payout'] = int((1 - payout) * 8) + 1
elif etf_data['dividend_trend'] > 0.05:
components['growth'] = 3
elif etf_data['dividend_trend'] < -0.10:
components['growth'] = 6
elif etf_data['dividend_trend'] < -0.05:
components['growth'] = 4
else:
components['growth'] = 3
else:
components['growth'] = 4 # Default medium
# Payout ratio risk (using dividend yield as proxy) # ETF-type specific adjustments
if etf_data.get('info', {}).get('dividendYield') is not None: if etf_type == ETFType.INDEX:
yield_value = etf_data['info']['dividendYield'] # Index ETFs have lower yield risk
if yield_value > 0.08: components = {k: max(1, v - 2) for k, v in components.items()}
components['payout'] = 7 elif etf_type == ETFType.SECTOR:
elif yield_value > 0.05: # Sector ETFs have moderate yield risk
components['payout'] = 5 components = {k: min(9, v + 1) for k, v in components.items()}
elif yield_value > 0.03: elif etf_type == ETFType.COVERED_CALL:
components['payout'] = 3 # Covered call ETFs have higher yield risk
else: components = {k: min(9, v + 2) for k, v in components.items()}
components['payout'] = 2 elif etf_type == ETFType.HIGH_YIELD:
else: # High yield ETFs have highest yield risk
components['payout'] = 4 # Default medium components = {k: min(9, v + 3) for k, v in components.items()}
# Calculate weighted yield risk # Calculate weighted yield risk
yield_risk = sum( yield_risk = sum(
components[component] * weight components[component] * weight
for component, weight in self.YIELD_COMPONENT_WEIGHTS.items() for component, weight in self.YIELD_COMPONENT_WEIGHTS.items()
) * self.YIELD_RISK_WEIGHT )
return yield_risk, components return yield_risk, components
@ -322,18 +382,18 @@ class NavErosionService:
"""Calculate structural risk components""" """Calculate structural risk components"""
components = {} components = {}
# Age risk # Age risk - adjusted for actual ETF ages
if etf_data.get('is_new'): age = etf_data.get('age_years', 3.0)
components['age'] = 7 if age < 1:
elif etf_data.get('age_years') is not None: components['age'] = 7 # Very new ETF
if etf_data['age_years'] < 3: elif age < 3:
components['age'] = 6 components['age'] = 6 # New ETF
elif etf_data['age_years'] < 5: elif age < 5:
components['age'] = 4 components['age'] = 4 # Moderately established
else: elif age < 10:
components['age'] = 2 components['age'] = 3 # Well established
else: else:
components['age'] = 4 # Default medium components['age'] = 2 # Long established
# AUM risk # AUM risk
if etf_data.get('info', {}).get('totalAssets') is not None: if etf_data.get('info', {}).get('totalAssets') is not None:
@ -381,14 +441,10 @@ class NavErosionService:
structural_risk = sum( structural_risk = sum(
components[component] * weight components[component] * weight
for component, weight in self.STRUCTURAL_COMPONENT_WEIGHTS.items() for component, weight in self.STRUCTURAL_COMPONENT_WEIGHTS.items()
) * self.STRUCTURAL_RISK_WEIGHT )
return structural_risk, components return structural_risk, components
def _calculate_final_risk(self, nav_risk: float, yield_risk: float, structural_risk: float) -> float:
"""Calculate final risk score"""
return nav_risk + yield_risk + structural_risk
def _generate_nav_explanation(self, components: Dict) -> str: def _generate_nav_explanation(self, components: Dict) -> str:
"""Generate explanation for NAV risk""" """Generate explanation for NAV risk"""
explanations = [] explanations = []
@ -404,19 +460,6 @@ class NavErosionService:
return " | ".join(explanations) return " | ".join(explanations)
def _generate_yield_explanation(self, components: Dict) -> str:
"""Generate explanation for yield risk"""
explanations = []
if components.get('stability') is not None:
explanations.append(f"Dividend stability risk: {components['stability']}/9")
if components.get('growth') is not None:
explanations.append(f"Dividend growth risk: {components['growth']}/9")
if components.get('payout') is not None:
explanations.append(f"Payout ratio risk: {components['payout']}/9")
return " | ".join(explanations)
def _generate_portfolio_summary(self, results: List[NavErosionResult]) -> str: def _generate_portfolio_summary(self, results: List[NavErosionResult]) -> str:
"""Generate portfolio-level risk summary""" """Generate portfolio-level risk summary"""
nav_risks = [r.nav_erosion_risk for r in results] nav_risks = [r.nav_erosion_risk for r in results]