feat: implement NAV erosion risk assessment service and UI integration

This commit is contained in:
Pascal BIBEHE 2025-05-29 21:49:24 +02:00
parent c4a7f91867
commit 65209331f5
8 changed files with 722 additions and 10 deletions

View File

@ -0,0 +1,10 @@
"""
NAV Erosion Risk Assessment Service
"""
from .service import NavErosionService
from .models import NavErosionResult, NavErosionAnalysis
from .exceptions import NavErosionError, DataFetchError
__all__ = ['NavErosionService', 'NavErosionResult', 'NavErosionAnalysis',
'NavErosionError', 'DataFetchError']

View File

@ -0,0 +1,19 @@
"""
Custom exceptions for NAV Erosion Service
"""
class NavErosionError(Exception):
"""Base exception for NAV Erosion Service"""
pass
class DataFetchError(NavErosionError):
"""Raised when data fetching fails"""
pass
class CalculationError(NavErosionError):
"""Raised when risk calculation fails"""
pass
class ValidationError(NavErosionError):
"""Raised when input validation fails"""
pass

View File

@ -0,0 +1,93 @@
"""
Logging configuration for NAV Erosion Service
"""
import logging
import os
from datetime import datetime
from pathlib import Path
class ErosionRiskLogger:
"""Logger for NAV Erosion Service"""
def __init__(self):
self.logger = logging.getLogger('erosion_risk')
self.setup_logger()
def setup_logger(self):
"""Configure logger with file and console handlers"""
# Create logs directory if it doesn't exist
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
# Set base logging level
self.logger.setLevel(logging.INFO)
# File handler for errors
error_handler = logging.FileHandler(
log_dir / f'erosion_risk_errors_{datetime.now().strftime("%Y%m%d")}.log'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
# File handler for flow tracking
flow_handler = logging.FileHandler(
log_dir / f'erosion_risk_flow_{datetime.now().strftime("%Y%m%d")}.log'
)
flow_handler.setLevel(logging.INFO)
flow_handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
# Console handler for immediate feedback
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(
logging.Formatter('%(levelname)s: %(message)s')
)
# Add handlers to logger
self.logger.addHandler(error_handler)
self.logger.addHandler(flow_handler)
self.logger.addHandler(console_handler)
def log_risk_calculation(self, ticker: str, component_risks: dict, final_risk: float):
"""Log risk calculation details"""
self.logger.info(f"Risk calculation for {ticker}:")
# Log NAV Risk Components
self.logger.info("NAV Risk Components:")
for component, risk in component_risks.get('nav', {}).items():
self.logger.info(f" {component}: {risk}")
# Log Yield Risk Components
self.logger.info("Yield Risk Components:")
for component, risk in component_risks.get('yield', {}).items():
self.logger.info(f" {component}: {risk}")
# Log Structural Risk Components
self.logger.info("Structural Risk Components:")
for component, risk in component_risks.get('structural', {}).items():
self.logger.info(f" {component}: {risk}")
self.logger.info(f"Final Risk Score: {final_risk}")
def log_error(self, ticker: str, error: Exception, context: dict = None):
"""Log error with context"""
self.logger.error(f"Error processing {ticker}: {str(error)}")
if context:
self.logger.error(f"Context: {context}")
def log_warning(self, ticker: str, message: str, context: dict = None):
"""Log warning with context"""
self.logger.warning(f"Warning for {ticker}: {message}")
if context:
self.logger.warning(f"Context: {context}")
def log_info(self, message: str, context: dict = None):
"""Log info message with context"""
self.logger.info(message)
if context:
self.logger.info(f"Context: {context}")

View File

@ -0,0 +1,36 @@
"""
Data models for NAV Erosion Service
"""
from dataclasses import dataclass
from typing import List, Optional, Dict
from datetime import datetime
@dataclass
class NavErosionResult:
"""Result of NAV erosion risk analysis for a single ETF"""
ticker: str
nav_erosion_risk: int # 0-9 scale
yield_erosion_risk: int # 0-9 scale
estimated_nav_erosion: float # Annual percentage
estimated_yield_erosion: float # Annual percentage
nav_risk_explanation: str
yield_risk_explanation: str
etf_age_years: Optional[float]
is_new_etf: bool
max_drawdown: Optional[float]
volatility: Optional[float]
sharpe_ratio: Optional[float]
sortino_ratio: Optional[float]
dividend_trend: Optional[float]
component_risks: Dict[str, float] # Detailed risk components
@dataclass
class NavErosionAnalysis:
"""Complete NAV erosion analysis for a portfolio"""
results: List[NavErosionResult]
portfolio_nav_risk: float # Weighted average
portfolio_yield_risk: float # Weighted average
risk_summary: str
timestamp: datetime
component_weights: Dict[str, float] # Weights used in calculation

View File

@ -0,0 +1,431 @@
"""
NAV Erosion Service implementation
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import List, Dict, Optional, Tuple
import yfinance as yf
from .models import NavErosionResult, NavErosionAnalysis
from .exceptions import NavErosionError, DataFetchError, CalculationError
from .logger import ErosionRiskLogger
class NavErosionService:
"""Service for calculating NAV erosion risk"""
# Risk weights
NAV_RISK_WEIGHT = 0.45
YIELD_RISK_WEIGHT = 0.35
STRUCTURAL_RISK_WEIGHT = 0.20
# Component weights within each risk category
NAV_COMPONENT_WEIGHTS = {
'drawdown': 0.333, # 33.3% of NAV risk
'volatility': 0.222, # 22.2% of NAV risk
'sharpe': 0.222, # 22.2% of NAV risk
'sortino': 0.222 # 22.2% of NAV risk
}
YIELD_COMPONENT_WEIGHTS = {
'stability': 0.429, # 42.9% of yield risk
'growth': 0.286, # 28.6% of yield risk
'payout': 0.285 # 28.5% of yield risk
}
STRUCTURAL_COMPONENT_WEIGHTS = {
'age': 0.25, # 25% of structural risk
'aum': 0.25, # 25% of structural risk
'liquidity': 0.25, # 25% of structural risk
'expense': 0.25 # 25% of structural risk
}
def __init__(self):
self.logger = ErosionRiskLogger()
def analyze_etf_erosion_risk(self, tickers: List[str], debug: bool = False) -> NavErosionAnalysis:
"""Analyze erosion risk for a list of ETFs"""
results = []
current_date = pd.Timestamp.now(tz='UTC')
for ticker in tickers:
try:
# Get ETF data
etf_data = self._fetch_etf_data(ticker)
if not etf_data:
self.logger.log_warning(ticker, "No data available")
continue
# Calculate risk components
nav_risk, nav_components = self._calculate_nav_risk(etf_data)
yield_risk, yield_components = self._calculate_yield_risk(etf_data)
structural_risk, structural_components = self._calculate_structural_risk(etf_data)
# Calculate final risk scores
final_nav_risk = self._calculate_final_risk(nav_risk, yield_risk, structural_risk)
# Create result
result = NavErosionResult(
ticker=ticker,
nav_erosion_risk=int(final_nav_risk),
yield_erosion_risk=int(yield_risk),
estimated_nav_erosion=final_nav_risk / 9 * 0.9, # Convert to percentage
estimated_yield_erosion=yield_risk / 9 * 0.9, # Convert to percentage
nav_risk_explanation=self._generate_nav_explanation(nav_components),
yield_risk_explanation=self._generate_yield_explanation(yield_components),
etf_age_years=etf_data.get('age_years'),
is_new_etf=etf_data.get('is_new', False),
max_drawdown=etf_data.get('max_drawdown'),
volatility=etf_data.get('volatility'),
sharpe_ratio=etf_data.get('sharpe_ratio'),
sortino_ratio=etf_data.get('sortino_ratio'),
dividend_trend=etf_data.get('dividend_trend'),
component_risks={
'nav': nav_components,
'yield': yield_components,
'structural': structural_components
}
)
results.append(result)
self.logger.log_risk_calculation(ticker, result.component_risks, final_nav_risk)
except Exception as e:
self.logger.log_error(ticker, e)
if debug:
raise
continue
if not results:
raise CalculationError("No valid results generated")
# Calculate portfolio-level metrics
portfolio_nav_risk = np.mean([r.nav_erosion_risk for r in results])
portfolio_yield_risk = np.mean([r.yield_erosion_risk for r in results])
return NavErosionAnalysis(
results=results,
portfolio_nav_risk=portfolio_nav_risk,
portfolio_yield_risk=portfolio_yield_risk,
risk_summary=self._generate_portfolio_summary(results),
timestamp=datetime.now(),
component_weights={
'nav': self.NAV_RISK_WEIGHT,
'yield': self.YIELD_RISK_WEIGHT,
'structural': self.STRUCTURAL_RISK_WEIGHT
}
)
def _fetch_etf_data(self, ticker: str) -> Dict:
"""Fetch ETF data with fallback logic"""
try:
yf_ticker = yf.Ticker(ticker)
# Get basic info
info = yf_ticker.info
if not info:
return None
# Get historical data
hist = yf_ticker.history(period="5y")
if hist.empty:
return None
# Get dividends
dividends = yf_ticker.dividends
if dividends is None or dividends.empty:
dividends = pd.Series()
# Calculate metrics
returns = hist['Close'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252) # Annualized
# Calculate max drawdown
rolling_max = hist['Close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist['Close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate Sharpe and Sortino ratios
risk_free_rate = 0.02 # Assuming 2% risk-free rate
excess_returns = returns - risk_free_rate/252
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
# Sortino ratio (using negative returns only)
negative_returns = returns[returns < 0]
sortino_ratio = np.sqrt(252) * excess_returns.mean() / negative_returns.std() if len(negative_returns) > 0 else 0
# Calculate dividend trend
if not dividends.empty:
monthly_div = dividends.resample('M').sum()
if len(monthly_div) > 12:
earliest_ttm = monthly_div[-12:].sum()
latest_ttm = monthly_div[-1:].sum()
dividend_trend = (latest_ttm / earliest_ttm - 1) if earliest_ttm > 0 else 0
else:
dividend_trend = 0
else:
dividend_trend = 0
# Calculate ETF age
inception_date = info.get('fundInceptionDate')
if inception_date:
try:
inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True)
age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25
except:
age_years = None
else:
age_years = None
return {
'info': info,
'hist': hist,
'dividends': dividends,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years is not None and age_years < 2
}
except Exception as e:
self.logger.log_error(ticker, e)
return None
def _calculate_nav_risk(self, etf_data: Dict) -> Tuple[float, Dict]:
"""Calculate NAV risk components"""
components = {}
# Drawdown risk
if etf_data.get('max_drawdown') is not None:
if etf_data['max_drawdown'] > 0.40:
components['drawdown'] = 7
elif etf_data['max_drawdown'] > 0.25:
components['drawdown'] = 5
elif etf_data['max_drawdown'] > 0.15:
components['drawdown'] = 3
else:
components['drawdown'] = 2
else:
components['drawdown'] = 4 # Default medium-low
# Volatility risk
if etf_data.get('volatility') is not None:
if etf_data['volatility'] > 0.40:
components['volatility'] = 7
elif etf_data['volatility'] > 0.25:
components['volatility'] = 5
elif etf_data['volatility'] > 0.15:
components['volatility'] = 3
else:
components['volatility'] = 2
else:
components['volatility'] = 4 # Default medium-low
# Sharpe ratio risk
if etf_data.get('sharpe_ratio') is not None:
if etf_data['sharpe_ratio'] >= 2.0:
components['sharpe'] = 1
elif etf_data['sharpe_ratio'] >= 1.5:
components['sharpe'] = 2
elif etf_data['sharpe_ratio'] >= 1.0:
components['sharpe'] = 3
elif etf_data['sharpe_ratio'] >= 0.5:
components['sharpe'] = 4
else:
components['sharpe'] = 5
else:
components['sharpe'] = 4 # Default medium
# Sortino ratio risk
if etf_data.get('sortino_ratio') is not None:
if etf_data['sortino_ratio'] >= 2.0:
components['sortino'] = 1
elif etf_data['sortino_ratio'] >= 1.5:
components['sortino'] = 2
elif etf_data['sortino_ratio'] >= 1.0:
components['sortino'] = 3
elif etf_data['sortino_ratio'] >= 0.5:
components['sortino'] = 4
else:
components['sortino'] = 5
else:
components['sortino'] = 4 # Default medium
# Calculate weighted NAV risk
nav_risk = sum(
components[component] * weight
for component, weight in self.NAV_COMPONENT_WEIGHTS.items()
) * self.NAV_RISK_WEIGHT
return nav_risk, components
def _calculate_yield_risk(self, etf_data: Dict) -> Tuple[float, Dict]:
"""Calculate yield risk components"""
components = {}
# Dividend stability risk
if etf_data.get('dividend_trend') is not None:
if etf_data['dividend_trend'] < -0.30:
components['stability'] = 8
elif etf_data['dividend_trend'] < -0.15:
components['stability'] = 6
elif etf_data['dividend_trend'] < -0.05:
components['stability'] = 4
elif etf_data['dividend_trend'] > 0.10:
components['stability'] = 2
else:
components['stability'] = 3
else:
components['stability'] = 4 # Default medium
# Dividend growth risk
if etf_data.get('dividend_trend') is not None:
if etf_data['dividend_trend'] > 0.10:
components['growth'] = 2
elif etf_data['dividend_trend'] > 0.05:
components['growth'] = 3
elif etf_data['dividend_trend'] < -0.10:
components['growth'] = 6
elif etf_data['dividend_trend'] < -0.05:
components['growth'] = 4
else:
components['growth'] = 3
else:
components['growth'] = 4 # Default medium
# Payout ratio risk (using dividend yield as proxy)
if etf_data.get('info', {}).get('dividendYield') is not None:
yield_value = etf_data['info']['dividendYield']
if yield_value > 0.08:
components['payout'] = 7
elif yield_value > 0.05:
components['payout'] = 5
elif yield_value > 0.03:
components['payout'] = 3
else:
components['payout'] = 2
else:
components['payout'] = 4 # Default medium
# Calculate weighted yield risk
yield_risk = sum(
components[component] * weight
for component, weight in self.YIELD_COMPONENT_WEIGHTS.items()
) * self.YIELD_RISK_WEIGHT
return yield_risk, components
def _calculate_structural_risk(self, etf_data: Dict) -> Tuple[float, Dict]:
"""Calculate structural risk components"""
components = {}
# Age risk
if etf_data.get('is_new'):
components['age'] = 7
elif etf_data.get('age_years') is not None:
if etf_data['age_years'] < 3:
components['age'] = 6
elif etf_data['age_years'] < 5:
components['age'] = 4
else:
components['age'] = 2
else:
components['age'] = 4 # Default medium
# AUM risk
if etf_data.get('info', {}).get('totalAssets') is not None:
aum = etf_data['info']['totalAssets']
if aum < 100_000_000: # Less than $100M
components['aum'] = 7
elif aum < 500_000_000: # Less than $500M
components['aum'] = 5
elif aum < 1_000_000_000: # Less than $1B
components['aum'] = 3
else:
components['aum'] = 2
else:
components['aum'] = 4 # Default medium
# Liquidity risk (using average volume as proxy)
if etf_data.get('info', {}).get('averageVolume') is not None:
volume = etf_data['info']['averageVolume']
if volume < 100_000:
components['liquidity'] = 7
elif volume < 500_000:
components['liquidity'] = 5
elif volume < 1_000_000:
components['liquidity'] = 3
else:
components['liquidity'] = 2
else:
components['liquidity'] = 4 # Default medium
# Expense ratio risk
if etf_data.get('info', {}).get('annualReportExpenseRatio') is not None:
expense_ratio = etf_data['info']['annualReportExpenseRatio']
if expense_ratio > 0.0075: # > 0.75%
components['expense'] = 7
elif expense_ratio > 0.005: # > 0.50%
components['expense'] = 5
elif expense_ratio > 0.0025: # > 0.25%
components['expense'] = 3
else:
components['expense'] = 2
else:
components['expense'] = 4 # Default medium
# Calculate weighted structural risk
structural_risk = sum(
components[component] * weight
for component, weight in self.STRUCTURAL_COMPONENT_WEIGHTS.items()
) * self.STRUCTURAL_RISK_WEIGHT
return structural_risk, components
def _calculate_final_risk(self, nav_risk: float, yield_risk: float, structural_risk: float) -> float:
"""Calculate final risk score"""
return nav_risk + yield_risk + structural_risk
def _generate_nav_explanation(self, components: Dict) -> str:
"""Generate explanation for NAV risk"""
explanations = []
if components.get('drawdown') is not None:
explanations.append(f"Drawdown risk level: {components['drawdown']}/9")
if components.get('volatility') is not None:
explanations.append(f"Volatility risk level: {components['volatility']}/9")
if components.get('sharpe') is not None:
explanations.append(f"Sharpe ratio risk level: {components['sharpe']}/9")
if components.get('sortino') is not None:
explanations.append(f"Sortino ratio risk level: {components['sortino']}/9")
return " | ".join(explanations)
def _generate_yield_explanation(self, components: Dict) -> str:
"""Generate explanation for yield risk"""
explanations = []
if components.get('stability') is not None:
explanations.append(f"Dividend stability risk: {components['stability']}/9")
if components.get('growth') is not None:
explanations.append(f"Dividend growth risk: {components['growth']}/9")
if components.get('payout') is not None:
explanations.append(f"Payout ratio risk: {components['payout']}/9")
return " | ".join(explanations)
def _generate_portfolio_summary(self, results: List[NavErosionResult]) -> str:
"""Generate portfolio-level risk summary"""
nav_risks = [r.nav_erosion_risk for r in results]
yield_risks = [r.yield_erosion_risk for r in results]
avg_nav_risk = np.mean(nav_risks)
avg_yield_risk = np.mean(yield_risks)
return (
f"Portfolio NAV Risk: {avg_nav_risk:.1f}/9 | "
f"Portfolio Yield Risk: {avg_yield_risk:.1f}/9"
)

0
api/__init__.py Normal file
View File

View File

@ -2603,4 +2603,129 @@ if st.session_state.simulation_run and st.session_state.df_data is not None:
- **Monthly Income**: DRIP strategy generates {income_diff_pct:.1f}% higher monthly income
- **Risk Mitigation**: DRIP helps mitigate erosion effects by continuously acquiring more shares
- **Compounding Effect**: Reinvested dividends generate additional income through compounding
""")
""")
with tab3:
st.subheader("📉 Erosion Risk Assessment")
# Add explanatory text
st.write("""
This analysis uses historical ETF data to estimate reasonable erosion settings
based on past performance, volatility, and dividend history.
""")
# Initialize the NAV erosion service
try:
from ETF_Portal.services.nav_erosion_service import NavErosionService
# Run the analysis in a spinner
with st.spinner("Analyzing historical ETF data..."):
erosion_service = NavErosionService()
risk_analysis = erosion_service.analyze_etf_erosion_risk(final_alloc["Ticker"].tolist())
except ImportError as e:
st.error(f"Error importing NavErosionService: {str(e)}")
st.error("Please ensure the nav_erosion_service module is properly installed.")
logger.error(f"Import error: {str(e)}")
logger.error(traceback.format_exc())
risk_analysis = None
if risk_analysis and risk_analysis.results:
# Create a summary table with key insights
risk_data = []
for result in risk_analysis.results:
risk_data.append({
"Ticker": result.ticker,
"NAV Erosion Risk (0-9)": result.nav_erosion_risk,
"Yield Erosion Risk (0-9)": result.yield_erosion_risk,
"Estimated Annual NAV Erosion": f"{result.estimated_nav_erosion:.1%}",
"Estimated Annual Yield Erosion": f"{result.estimated_yield_erosion:.1%}",
"NAV Risk Explanation": result.nav_risk_explanation,
"Yield Risk Explanation": result.yield_risk_explanation,
"ETF Age (Years)": f"{result.etf_age_years:.1f}" if result.etf_age_years else "Unknown",
"Max Drawdown": f"{result.max_drawdown:.1%}" if result.max_drawdown else "Unknown",
"Volatility": f"{result.volatility:.1%}" if result.volatility else "Unknown",
"Sharpe Ratio": f"{result.sharpe_ratio:.2f}" if result.sharpe_ratio else "Unknown",
"Sortino Ratio": f"{result.sortino_ratio:.2f}" if result.sortino_ratio else "Unknown",
"Dividend Trend": f"{result.dividend_trend:.1%}" if result.dividend_trend else "Unknown"
})
# Display main assessment table
st.subheader("Recommended Erosion Settings")
main_columns = [
"Ticker",
"NAV Erosion Risk (0-9)",
"Yield Erosion Risk (0-9)",
"Estimated Annual NAV Erosion",
"Estimated Annual Yield Erosion",
"NAV Risk Explanation",
"Yield Risk Explanation"
]
st.dataframe(
pd.DataFrame(risk_data)[main_columns],
use_container_width=True,
hide_index=True
)
# Display detailed metrics
st.subheader("Detailed Risk Metrics")
detail_columns = [
"Ticker",
"ETF Age (Years)",
"Max Drawdown",
"Volatility",
"Sharpe Ratio",
"Sortino Ratio",
"Dividend Trend"
]
st.dataframe(
pd.DataFrame(risk_data)[detail_columns],
use_container_width=True,
hide_index=True
)
# Allow applying these settings to the simulation
if st.button("Apply Recommended Erosion Settings", type="primary"):
# Initialize or update per-ticker erosion settings
if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict):
st.session_state.per_ticker_erosion = {}
# Update the session state with recommended settings
for result in risk_analysis.results:
st.session_state.per_ticker_erosion[result.ticker] = {
"nav": result.nav_erosion_risk,
"yield": result.yield_erosion_risk
}
# Enable erosion and per-ticker settings
st.session_state.erosion_type = "NAV & Yield Erosion"
st.session_state.use_per_ticker_erosion = True
# Update the erosion_level variable to match the new settings
erosion_level = {
"global": {
"nav": 5, # Default medium level for global fallback
"yield": 5
},
"per_ticker": st.session_state.per_ticker_erosion,
"use_per_ticker": True
}
# Update session state erosion level for DRIP forecast
st.session_state.erosion_level = erosion_level
st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.")
st.info("Go to the DRIP Forecast tab to see the impact of these settings.")
else:
st.error("Unable to analyze ETF erosion risk. Please try again.")
with tab4:
st.subheader("🤖 AI Suggestions")
# Add AI suggestions content
st.write("This tab will contain AI suggestions for portfolio optimization.")
with tab5:
st.subheader("📊 ETF Details")
# Add ETF details content
st.write("This tab will contain detailed information about the selected ETFs.")

View File

@ -7,15 +7,13 @@ setup(
packages=find_packages(),
include_package_data=True,
install_requires=[
"streamlit>=1.28.0",
"pandas>=1.5.3",
"numpy>=1.24.3",
"matplotlib>=3.7.1",
"seaborn>=0.12.2",
"fmp-python>=0.1.5",
"plotly>=5.14.1",
"requests>=2.31.0",
"yfinance>=0.2.36",
"streamlit",
"pandas",
"numpy",
"plotly",
"yfinance",
"requests",
"python-dotenv"
],
entry_points={
"console_scripts": [