From 93d93277b1b6e44a27bb59c84258c1ff9eab34ce Mon Sep 17 00:00:00 2001 From: Pascal Date: Thu, 29 May 2025 23:37:13 +0200 Subject: [PATCH] Fix: Adjusted ETF age calculation and risk scoring --- .../services/nav_erosion_service/service.py | 441 ++++++++++-------- 1 file changed, 242 insertions(+), 199 deletions(-) diff --git a/ETF_Portal/services/nav_erosion_service/service.py b/ETF_Portal/services/nav_erosion_service/service.py index cbeffaa..6dce8b9 100644 --- a/ETF_Portal/services/nav_erosion_service/service.py +++ b/ETF_Portal/services/nav_erosion_service/service.py @@ -4,12 +4,22 @@ NAV Erosion Service implementation import pandas as pd import numpy as np -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple import yfinance as yf from .models import NavErosionResult, NavErosionAnalysis from .exceptions import NavErosionError, DataFetchError, CalculationError -from .logger import ErosionRiskLogger +from .logger import get_logger +from enum import Enum +from ETF_Portal.services.data_service import DataService # Our existing data service + +logger = get_logger(__name__) + +class ETFType(Enum): + INDEX = "index" # e.g., VTI, VOO + COVERED_CALL = "covered_call" # e.g., JEPI + HIGH_YIELD = "high_yield" # e.g., FEPI + SECTOR = "sector" # e.g., MSTY class NavErosionService: """Service for calculating NAV erosion risk""" @@ -41,45 +51,149 @@ class NavErosionService: } def __init__(self): - self.logger = ErosionRiskLogger() + self.data_service = DataService() # Use our existing data service + self.etf_types = { + 'VTI': ETFType.INDEX, + 'VOO': ETFType.INDEX, + 'JEPI': ETFType.COVERED_CALL, + 'FEPI': ETFType.HIGH_YIELD, + 'MSTY': ETFType.SECTOR, + # Add more ETFs as needed + } - def analyze_etf_erosion_risk(self, tickers: List[str], debug: bool = False) -> NavErosionAnalysis: - """Analyze erosion risk for a list of ETFs""" + # ETF-type specific weights for risk calculation + self.risk_weights = { + ETFType.INDEX: {'stability': 0.4, 'growth': 0.4, 'payout': 0.2}, + ETFType.COVERED_CALL: {'stability': 0.3, 'growth': 0.2, 'payout': 0.5}, + ETFType.HIGH_YIELD: {'stability': 0.5, 'growth': 0.3, 'payout': 0.2}, + ETFType.SECTOR: {'stability': 0.4, 'growth': 0.3, 'payout': 0.3} + } + + # Maximum annual erosion by ETF type + self.max_erosion = { + ETFType.INDEX: 0.15, # 15% max for index ETFs + ETFType.COVERED_CALL: 0.30, # 30% max for covered call + ETFType.HIGH_YIELD: 0.25, # 25% max for high yield + ETFType.SECTOR: 0.20 # 20% max for sector + } + + def get_etf_type(self, ticker: str) -> ETFType: + """Get ETF type from our mapping or default to INDEX.""" + return self.etf_types.get(ticker, ETFType.INDEX) + + def calculate_stability(self, etf_data: Dict) -> float: + """Calculate dividend stability (0-1).""" + if not etf_data.get('dividends'): + return 0.0 + + # Convert dividends dict to DataFrame + dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends']) + dividends.index = pd.to_datetime(dividends.index) + + # Calculate coefficient of variation + mean = dividends['Dividends'].mean() + std = dividends['Dividends'].std() + cv = std / mean if mean > 0 else 1.0 + + # Convert to stability score (0-1) + stability = 1 / (1 + cv) + return min(max(stability, 0), 1) + + def calculate_growth(self, etf_data: Dict) -> float: + """Calculate dividend growth (0-1).""" + if not etf_data.get('dividends'): + return 0.0 + + # Convert dividends dict to DataFrame + dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends']) + dividends.index = pd.to_datetime(dividends.index) + + # Calculate year-over-year growth + yearly_divs = dividends.resample('Y')['Dividends'].sum() + if len(yearly_divs) < 2: + return 0.5 # Neutral if not enough data + + growth_rates = yearly_divs.pct_change().dropna() + avg_growth = growth_rates.mean() + + # Convert to growth score (0-1) + # 0% growth = 0.5, 10% growth = 1.0, -10% growth = 0.0 + growth_score = 0.5 + (avg_growth * 5) + return min(max(growth_score, 0), 1) + + def calculate_payout_ratio(self, etf_data: Dict) -> float: + """Calculate payout ratio (0-1).""" + if not etf_data.get('dividends') or not etf_data.get('hist'): + return 0.0 + + # Convert dividends dict to DataFrame + dividends = pd.DataFrame.from_dict(etf_data['dividends'], orient='index', columns=['Dividends']) + dividends.index = pd.to_datetime(dividends.index) + + # Convert historical data to DataFrame + hist = pd.DataFrame.from_dict(etf_data['hist']) + hist.index = pd.to_datetime(hist.index) + + # Calculate annual dividend yield + annual_div = dividends['Dividends'].sum() + avg_price = hist['Close'].mean() + yield_ratio = annual_div / avg_price if avg_price > 0 else 0 + + # Normalize to 0-1 range (assuming max yield of 20%) + payout_ratio = min(yield_ratio / 0.20, 1.0) + return payout_ratio + + def analyze_etf_erosion_risk(self, tickers: List[str]) -> NavErosionAnalysis: + """Analyze erosion risk for a list of ETFs.""" results = [] - current_date = pd.Timestamp.now(tz='UTC') + errors = [] for ticker in tickers: try: - # Get ETF data - etf_data = self._fetch_etf_data(ticker) - if not etf_data: - self.logger.log_warning(ticker, "No data available") - continue + # Get ETF type + etf_type = self.get_etf_type(ticker) - # Calculate risk components - nav_risk, nav_components = self._calculate_nav_risk(etf_data) - yield_risk, yield_components = self._calculate_yield_risk(etf_data) + # Get ETF data using our existing pipeline + etf_data = self._fetch_etf_data(ticker) + + if etf_data.get('is_estimated', False): + logger.warning(f"Using estimated data for {ticker}") + + # Calculate all risk components with ETF-type specific adjustments + nav_risk, nav_components = self._calculate_nav_risk(etf_data, etf_type) + yield_risk, yield_components = self._calculate_yield_risk(etf_data, etf_type) structural_risk, structural_components = self._calculate_structural_risk(etf_data) # Calculate final risk scores - final_nav_risk = self._calculate_final_risk(nav_risk, yield_risk, structural_risk) + final_nav_risk = round( + nav_risk * self.NAV_RISK_WEIGHT + + structural_risk * self.STRUCTURAL_RISK_WEIGHT + ) - # Create result + final_yield_risk = round( + yield_risk * self.YIELD_RISK_WEIGHT + + structural_risk * self.STRUCTURAL_RISK_WEIGHT + ) + + # Create result object result = NavErosionResult( ticker=ticker, - nav_erosion_risk=int(final_nav_risk), - yield_erosion_risk=int(yield_risk), - estimated_nav_erosion=final_nav_risk / 9 * 0.9, # Convert to percentage - estimated_yield_erosion=yield_risk / 9 * 0.9, # Convert to percentage + nav_erosion_risk=final_nav_risk, + yield_erosion_risk=final_yield_risk, + estimated_nav_erosion=round(final_nav_risk / 9 * self.max_erosion[etf_type], 3), + estimated_yield_erosion=round(final_yield_risk / 9 * self.max_erosion[etf_type], 3), nav_risk_explanation=self._generate_nav_explanation(nav_components), - yield_risk_explanation=self._generate_yield_explanation(yield_components), - etf_age_years=etf_data.get('age_years'), - is_new_etf=etf_data.get('is_new', False), - max_drawdown=etf_data.get('max_drawdown'), - volatility=etf_data.get('volatility'), - sharpe_ratio=etf_data.get('sharpe_ratio'), - sortino_ratio=etf_data.get('sortino_ratio'), - dividend_trend=etf_data.get('dividend_trend'), + yield_risk_explanation=( + f"Dividend stability: {yield_components['stability']:.1%}, " + f"Growth: {yield_components['growth']:.1%}, " + f"Payout ratio: {yield_components['payout']:.1%}" + ), + etf_age_years=etf_data.get('age_years', 3), + max_drawdown=round(etf_data.get('max_drawdown', 0.0), 3), + volatility=round(etf_data.get('volatility', 0.0), 3), + sharpe_ratio=round(etf_data.get('sharpe_ratio', 0.0), 2), + sortino_ratio=round(etf_data.get('sortino_ratio', 0.0), 2), + dividend_trend=round(etf_data.get('dividend_trend', 0.0), 3), component_risks={ 'nav': nav_components, 'yield': yield_components, @@ -88,117 +202,72 @@ class NavErosionService: ) results.append(result) - self.logger.log_risk_calculation(ticker, result.component_risks, final_nav_risk) except Exception as e: - self.logger.log_error(ticker, e) - if debug: - raise - continue + logger.error(f"Error analyzing {ticker}: {str(e)}") + errors.append((ticker, str(e))) + # Add a result with error info + results.append(NavErosionResult( + ticker=ticker, + nav_erosion_risk=0, + yield_erosion_risk=0, + estimated_nav_erosion=0.0, + estimated_yield_erosion=0.0, + nav_risk_explanation=f"Error: {str(e)}", + yield_risk_explanation=f"Error: {str(e)}", + component_risks={} + )) if not results: - raise CalculationError("No valid results generated") - - # Calculate portfolio-level metrics - portfolio_nav_risk = np.mean([r.nav_erosion_risk for r in results]) - portfolio_yield_risk = np.mean([r.yield_erosion_risk for r in results]) + raise CalculationError(f"No valid results generated. Errors: {errors}") + + # Calculate portfolio averages + portfolio_nav_risk = round(sum(r.nav_erosion_risk for r in results) / len(results)) + portfolio_yield_risk = round(sum(r.yield_erosion_risk for r in results) / len(results)) return NavErosionAnalysis( results=results, portfolio_nav_risk=portfolio_nav_risk, portfolio_yield_risk=portfolio_yield_risk, - risk_summary=self._generate_portfolio_summary(results), + risk_summary=f"Portfolio average NAV risk: {portfolio_nav_risk}/9, Yield risk: {portfolio_yield_risk}/9", timestamp=datetime.now(), - component_weights={ - 'nav': self.NAV_RISK_WEIGHT, - 'yield': self.YIELD_RISK_WEIGHT, - 'structural': self.STRUCTURAL_RISK_WEIGHT - } + component_weights=self.risk_weights ) def _fetch_etf_data(self, ticker: str) -> Dict: - """Fetch ETF data with fallback logic""" + """Fetch ETF data using our existing data pipeline.""" try: - yf_ticker = yf.Ticker(ticker) + # Use our existing data service + data = self.data_service.get_etf_data(ticker) + if not data: + raise DataFetchError(f"No data available for {ticker}") - # Get basic info - info = yf_ticker.info - if not info: - return None - - # Get historical data - hist = yf_ticker.history(period="5y") - if hist.empty: - return None - - # Get dividends - dividends = yf_ticker.dividends - if dividends is None or dividends.empty: - dividends = pd.Series() - - # Calculate metrics - returns = hist['Close'].pct_change().dropna() - volatility = returns.std() * np.sqrt(252) # Annualized - - # Calculate max drawdown - rolling_max = hist['Close'].rolling(window=252, min_periods=1).max() - daily_drawdown = hist['Close'] / rolling_max - 1.0 - max_drawdown = abs(daily_drawdown.min()) - - # Calculate Sharpe and Sortino ratios - risk_free_rate = 0.02 # Assuming 2% risk-free rate - excess_returns = returns - risk_free_rate/252 - sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std() - - # Sortino ratio (using negative returns only) - negative_returns = returns[returns < 0] - sortino_ratio = np.sqrt(252) * excess_returns.mean() / negative_returns.std() if len(negative_returns) > 0 else 0 - - # Calculate dividend trend - if not dividends.empty: - monthly_div = dividends.resample('M').sum() - if len(monthly_div) > 12: - earliest_ttm = monthly_div[-12:].sum() - latest_ttm = monthly_div[-1:].sum() - dividend_trend = (latest_ttm / earliest_ttm - 1) if earliest_ttm > 0 else 0 - else: - dividend_trend = 0 + # Calculate actual ETF age + if 'info' in data and 'firstTradeDateEpochUtc' in data['info']: + inception_date = datetime.fromtimestamp(data['info']['firstTradeDateEpochUtc']) + age_years = (datetime.now() - inception_date).days / 365.25 + data['age_years'] = round(age_years) else: - dividend_trend = 0 - - # Calculate ETF age - inception_date = info.get('fundInceptionDate') - if inception_date: - try: - inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) - age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25 - except: - age_years = None - else: - age_years = None - - return { - 'info': info, - 'hist': hist, - 'dividends': dividends, - 'volatility': volatility, - 'max_drawdown': max_drawdown, - 'sharpe_ratio': sharpe_ratio, - 'sortino_ratio': sortino_ratio, - 'dividend_trend': dividend_trend, - 'age_years': age_years, - 'is_new': age_years is not None and age_years < 2 - } + # Known ETF inception dates as fallback + known_ages = { + 'VTI': 23, # Inception: 2001 + 'VOO': 13, # Inception: 2010 + 'JEPI': 4, # Inception: 2020 + 'FEPI': 3, # Inception: 2021 + 'MSTY': 1 # Inception: 2022 + } + data['age_years'] = known_ages.get(ticker, 3) + return data except Exception as e: - self.logger.log_error(ticker, e) - return None + logger.error(f"Error fetching data for {ticker}: {str(e)}") + raise DataFetchError(f"Failed to fetch data for {ticker}: {str(e)}") - def _calculate_nav_risk(self, etf_data: Dict) -> Tuple[float, Dict]: - """Calculate NAV risk components""" + def _calculate_nav_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]: + """Calculate NAV risk components with ETF-type specific adjustments""" components = {} - # Drawdown risk + # Base risk calculation if etf_data.get('max_drawdown') is not None: if etf_data['max_drawdown'] > 0.40: components['drawdown'] = 7 @@ -209,9 +278,8 @@ class NavErosionService: else: components['drawdown'] = 2 else: - components['drawdown'] = 4 # Default medium-low + components['drawdown'] = 4 - # Volatility risk if etf_data.get('volatility') is not None: if etf_data['volatility'] > 0.40: components['volatility'] = 7 @@ -222,9 +290,8 @@ class NavErosionService: else: components['volatility'] = 2 else: - components['volatility'] = 4 # Default medium-low + components['volatility'] = 4 - # Sharpe ratio risk if etf_data.get('sharpe_ratio') is not None: if etf_data['sharpe_ratio'] >= 2.0: components['sharpe'] = 1 @@ -237,9 +304,8 @@ class NavErosionService: else: components['sharpe'] = 5 else: - components['sharpe'] = 4 # Default medium + components['sharpe'] = 4 - # Sortino ratio risk if etf_data.get('sortino_ratio') is not None: if etf_data['sortino_ratio'] >= 2.0: components['sortino'] = 1 @@ -252,69 +318,63 @@ class NavErosionService: else: components['sortino'] = 5 else: - components['sortino'] = 4 # Default medium + components['sortino'] = 4 + + # ETF-type specific adjustments for NAV risk + if etf_type == ETFType.INDEX: + # Index ETFs are generally more stable + components = {k: max(1, v - 2) for k, v in components.items()} + elif etf_type == ETFType.SECTOR: + # Sector ETFs are more volatile + components = {k: min(9, v + 1) for k, v in components.items()} + elif etf_type == ETFType.COVERED_CALL: + # Covered call ETFs have higher NAV risk due to strategy complexity + components = {k: min(9, v + 3) for k, v in components.items()} + elif etf_type == ETFType.HIGH_YIELD: + # High yield ETFs have highest NAV risk + components = {k: min(9, v + 3) for k, v in components.items()} # Calculate weighted NAV risk nav_risk = sum( components[component] * weight for component, weight in self.NAV_COMPONENT_WEIGHTS.items() - ) * self.NAV_RISK_WEIGHT + ) return nav_risk, components - def _calculate_yield_risk(self, etf_data: Dict) -> Tuple[float, Dict]: - """Calculate yield risk components""" + def _calculate_yield_risk(self, etf_data: Dict, etf_type: ETFType) -> Tuple[float, Dict]: + """Calculate yield risk components with ETF-type specific adjustments""" components = {} - # Dividend stability risk - if etf_data.get('dividend_trend') is not None: - if etf_data['dividend_trend'] < -0.30: - components['stability'] = 8 - elif etf_data['dividend_trend'] < -0.15: - components['stability'] = 6 - elif etf_data['dividend_trend'] < -0.05: - components['stability'] = 4 - elif etf_data['dividend_trend'] > 0.10: - components['stability'] = 2 - else: - components['stability'] = 3 - else: - components['stability'] = 4 # Default medium + # Calculate base components + stability = self.calculate_stability(etf_data) + growth = self.calculate_growth(etf_data) + payout = self.calculate_payout_ratio(etf_data) - # Dividend growth risk - if etf_data.get('dividend_trend') is not None: - if etf_data['dividend_trend'] > 0.10: - components['growth'] = 2 - elif etf_data['dividend_trend'] > 0.05: - components['growth'] = 3 - elif etf_data['dividend_trend'] < -0.10: - components['growth'] = 6 - elif etf_data['dividend_trend'] < -0.05: - components['growth'] = 4 - else: - components['growth'] = 3 - else: - components['growth'] = 4 # Default medium + # Convert to risk scores (1-9) + components['stability'] = int((1 - stability) * 8) + 1 + components['growth'] = int((1 - growth) * 8) + 1 + components['payout'] = int((1 - payout) * 8) + 1 - # Payout ratio risk (using dividend yield as proxy) - if etf_data.get('info', {}).get('dividendYield') is not None: - yield_value = etf_data['info']['dividendYield'] - if yield_value > 0.08: - components['payout'] = 7 - elif yield_value > 0.05: - components['payout'] = 5 - elif yield_value > 0.03: - components['payout'] = 3 - else: - components['payout'] = 2 - else: - components['payout'] = 4 # Default medium + # ETF-type specific adjustments + if etf_type == ETFType.INDEX: + # Index ETFs have lower yield risk + components = {k: max(1, v - 2) for k, v in components.items()} + elif etf_type == ETFType.SECTOR: + # Sector ETFs have moderate yield risk + components = {k: min(9, v + 1) for k, v in components.items()} + elif etf_type == ETFType.COVERED_CALL: + # Covered call ETFs have higher yield risk + components = {k: min(9, v + 2) for k, v in components.items()} + elif etf_type == ETFType.HIGH_YIELD: + # High yield ETFs have highest yield risk + components = {k: min(9, v + 3) for k, v in components.items()} # Calculate weighted yield risk yield_risk = sum( components[component] * weight for component, weight in self.YIELD_COMPONENT_WEIGHTS.items() - ) * self.YIELD_RISK_WEIGHT + ) return yield_risk, components @@ -322,18 +382,18 @@ class NavErosionService: """Calculate structural risk components""" components = {} - # Age risk - if etf_data.get('is_new'): - components['age'] = 7 - elif etf_data.get('age_years') is not None: - if etf_data['age_years'] < 3: - components['age'] = 6 - elif etf_data['age_years'] < 5: - components['age'] = 4 - else: - components['age'] = 2 + # Age risk - adjusted for actual ETF ages + age = etf_data.get('age_years', 3.0) + if age < 1: + components['age'] = 7 # Very new ETF + elif age < 3: + components['age'] = 6 # New ETF + elif age < 5: + components['age'] = 4 # Moderately established + elif age < 10: + components['age'] = 3 # Well established else: - components['age'] = 4 # Default medium + components['age'] = 2 # Long established # AUM risk if etf_data.get('info', {}).get('totalAssets') is not None: @@ -381,14 +441,10 @@ class NavErosionService: structural_risk = sum( components[component] * weight for component, weight in self.STRUCTURAL_COMPONENT_WEIGHTS.items() - ) * self.STRUCTURAL_RISK_WEIGHT + ) return structural_risk, components - def _calculate_final_risk(self, nav_risk: float, yield_risk: float, structural_risk: float) -> float: - """Calculate final risk score""" - return nav_risk + yield_risk + structural_risk - def _generate_nav_explanation(self, components: Dict) -> str: """Generate explanation for NAV risk""" explanations = [] @@ -404,19 +460,6 @@ class NavErosionService: return " | ".join(explanations) - def _generate_yield_explanation(self, components: Dict) -> str: - """Generate explanation for yield risk""" - explanations = [] - - if components.get('stability') is not None: - explanations.append(f"Dividend stability risk: {components['stability']}/9") - if components.get('growth') is not None: - explanations.append(f"Dividend growth risk: {components['growth']}/9") - if components.get('payout') is not None: - explanations.append(f"Payout ratio risk: {components['payout']}/9") - - return " | ".join(explanations) - def _generate_portfolio_summary(self, results: List[NavErosionResult]) -> str: """Generate portfolio-level risk summary""" nav_risks = [r.nav_erosion_risk for r in results]