Add: Complete nav erosion service implementation with tests

This commit is contained in:
Pascal BIBEHE 2025-05-29 23:38:55 +02:00
parent 93d93277b1
commit 300b127674
7 changed files with 453 additions and 17 deletions

View File

@ -0,0 +1,289 @@
"""
Data Service for ETF data retrieval
"""
import os
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Optional
import yfinance as yf
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class DataService:
"""Service for retrieving ETF data with fallback logic"""
def __init__(self):
# Use existing cache structure
self.base_dir = Path(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
self.cache_dir = self.base_dir / 'cache'
self.yf_cache_dir = self.cache_dir / 'yfinance_cache'
self.fmp_cache_dir = self.cache_dir / 'FMP_cache'
self.fmp_profiles_dir = self.fmp_cache_dir / 'etf_profiles'
self.fmp_historical_dir = self.fmp_cache_dir / 'historical_data'
self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings'
self.cache_timeout = timedelta(hours=1)
def get_etf_data(self, ticker: str) -> Dict:
"""Get ETF data using fallback logic:
1. Try FMP cache
2. Try FMP API
3. Try yfinance cache
4. Try yfinance
5. Use high yield estimates
"""
try:
# Try FMP cache first
fmp_cached_data = self._get_from_fmp_cache(ticker)
if fmp_cached_data:
logger.info(f"Using FMP cached data for {ticker}")
return fmp_cached_data
# Try FMP API
fmp_data = self._get_from_fmp(ticker)
if fmp_data:
logger.info(f"Using FMP data for {ticker}")
self._save_to_fmp_cache(ticker, fmp_data)
return fmp_data
# Try yfinance cache
yf_cached_data = self._get_from_yf_cache(ticker)
if yf_cached_data:
logger.info(f"Using yfinance cached data for {ticker}")
return yf_cached_data
# Try yfinance
yf_data = self._get_from_yfinance(ticker)
if yf_data:
logger.info(f"Using yfinance data for {ticker}")
self._save_to_yf_cache(ticker, yf_data)
return yf_data
# Use high yield estimates
logger.warning(f"Using high yield estimates for {ticker}")
return self._get_high_yield_estimates(ticker)
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {str(e)}")
return self._get_high_yield_estimates(ticker)
def _get_from_fmp_cache(self, ticker: str) -> Optional[Dict]:
"""Get data from FMP cache if available and not expired"""
# Check profile cache
profile_file = self.fmp_profiles_dir / f"{ticker}.json"
if not profile_file.exists():
return None
try:
with open(profile_file, 'r') as f:
profile_data = json.load(f)
# Check if cache is expired
cache_time = datetime.fromisoformat(profile_data['timestamp'])
if datetime.now() - cache_time > self.cache_timeout:
return None
# Get historical data
hist_file = self.fmp_historical_dir / f"{ticker}.json"
if hist_file.exists():
with open(hist_file, 'r') as f:
hist_data = json.load(f)
else:
hist_data = {}
# Get holdings data
holdings_file = self.fmp_holdings_dir / f"{ticker}.json"
if holdings_file.exists():
with open(holdings_file, 'r') as f:
holdings_data = json.load(f)
else:
holdings_data = {}
# Combine all data
return {
'info': profile_data['data'],
'hist': hist_data.get('data', {}),
'holdings': holdings_data.get('data', {}),
'volatility': profile_data['data'].get('volatility', 0.0),
'max_drawdown': profile_data['data'].get('maxDrawdown', 0.0),
'sharpe_ratio': profile_data['data'].get('sharpeRatio', 0.0),
'sortino_ratio': profile_data['data'].get('sortinoRatio', 0.0),
'dividend_trend': profile_data['data'].get('dividendTrend', 0.0),
'age_years': profile_data['data'].get('ageYears', 0.0),
'is_new': profile_data['data'].get('ageYears', 0.0) < 2
}
except Exception as e:
logger.warning(f"Error reading FMP cache for {ticker}: {str(e)}")
return None
def _get_from_yf_cache(self, ticker: str) -> Optional[Dict]:
"""Get data from yfinance cache if available and not expired"""
cache_file = self.yf_cache_dir / f"{ticker}_data.json"
if not cache_file.exists():
return None
try:
with open(cache_file, 'r') as f:
data = json.load(f)
# Check if cache is expired
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > self.cache_timeout:
return None
return data['data']
except Exception as e:
logger.warning(f"Error reading yfinance cache for {ticker}: {str(e)}")
return None
def _save_to_fmp_cache(self, ticker: str, data: Dict):
"""Save data to FMP cache"""
try:
# Save profile data
profile_data = {
'timestamp': datetime.now().isoformat(),
'data': data['info']
}
profile_file = self.fmp_profiles_dir / f"{ticker}.json"
with open(profile_file, 'w') as f:
json.dump(profile_data, f)
# Save historical data
if 'hist' in data:
hist_data = {
'timestamp': datetime.now().isoformat(),
'data': data['hist']
}
hist_file = self.fmp_historical_dir / f"{ticker}.json"
with open(hist_file, 'w') as f:
json.dump(hist_data, f)
# Save holdings data
if 'holdings' in data:
holdings_data = {
'timestamp': datetime.now().isoformat(),
'data': data['holdings']
}
holdings_file = self.fmp_holdings_dir / f"{ticker}.json"
with open(holdings_file, 'w') as f:
json.dump(holdings_data, f)
except Exception as e:
logger.warning(f"Error saving FMP cache for {ticker}: {str(e)}")
def _save_to_yf_cache(self, ticker: str, data: Dict):
"""Save data to yfinance cache"""
try:
cache_data = {
'timestamp': datetime.now().isoformat(),
'data': data
}
cache_file = self.yf_cache_dir / f"{ticker}_data.json"
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
except Exception as e:
logger.warning(f"Error saving yfinance cache for {ticker}: {str(e)}")
def _get_from_fmp(self, ticker: str) -> Optional[Dict]:
"""Get data from FMP API"""
# TODO: Implement FMP API integration
return None
def _get_from_yfinance(self, ticker: str) -> Optional[Dict]:
"""Get data from yfinance"""
try:
yf_ticker = yf.Ticker(ticker)
# Get basic info
info = yf_ticker.info
if not info:
return None
# Get historical data
hist = yf_ticker.history(period="5y")
if hist.empty:
return None
# Get dividends
dividends = yf_ticker.dividends
if dividends is None or dividends.empty:
dividends = pd.Series()
# Calculate metrics
returns = hist['Close'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252) # Annualized
# Calculate max drawdown
rolling_max = hist['Close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist['Close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate Sharpe and Sortino ratios
risk_free_rate = 0.02 # Assuming 2% risk-free rate
excess_returns = returns - risk_free_rate/252
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
# Sortino ratio (using negative returns only)
negative_returns = returns[returns < 0]
sortino_ratio = np.sqrt(252) * excess_returns.mean() / negative_returns.std() if len(negative_returns) > 0 else 0
# Calculate dividend trend
if not dividends.empty:
monthly_div = dividends.resample('ME').sum() # Using 'ME' instead of 'M'
if len(monthly_div) > 12:
earliest_ttm = monthly_div[-12:].sum()
latest_ttm = monthly_div[-1:].sum()
dividend_trend = (latest_ttm / earliest_ttm - 1) if earliest_ttm > 0 else 0
else:
dividend_trend = 0
else:
dividend_trend = 0
# Calculate ETF age
inception_date = info.get('fundInceptionDate')
if inception_date:
try:
inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True)
age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25
except:
age_years = None
else:
age_years = None
return {
'info': info,
'hist': hist.to_dict(),
'dividends': dividends.to_dict(),
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years is not None and age_years < 2
}
except Exception as e:
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
return None
def _get_high_yield_estimates(self, ticker: str) -> Dict:
"""Get conservative high yield estimates when no data is available"""
return {
'info': {},
'hist': {},
'dividends': {},
'volatility': 0.20, # Conservative estimate
'max_drawdown': 0.15, # Conservative estimate
'sharpe_ratio': 1.0, # Conservative estimate
'sortino_ratio': 1.0, # Conservative estimate
'dividend_trend': 0.0, # Conservative estimate
'age_years': 3.0, # Conservative estimate
'is_new': False,
'is_estimated': True # Flag to indicate these are estimates
}

View File

@ -1,10 +1,16 @@
""" """
NAV Erosion Risk Assessment Service NAV Erosion Service package
""" """
from .service import NavErosionService from .service import NavErosionService
from .models import NavErosionResult, NavErosionAnalysis from .models import NavErosionResult, NavErosionAnalysis
from .exceptions import NavErosionError, DataFetchError from .exceptions import NavErosionError, DataFetchError, CalculationError
__all__ = ['NavErosionService', 'NavErosionResult', 'NavErosionAnalysis', __all__ = [
'NavErosionError', 'DataFetchError'] 'NavErosionService',
'NavErosionResult',
'NavErosionAnalysis',
'NavErosionError',
'DataFetchError',
'CalculationError'
]

View File

@ -7,7 +7,7 @@ class NavErosionError(Exception):
pass pass
class DataFetchError(NavErosionError): class DataFetchError(NavErosionError):
"""Raised when data fetching fails""" """Raised when ETF data cannot be fetched"""
pass pass
class CalculationError(NavErosionError): class CalculationError(NavErosionError):
@ -15,5 +15,5 @@ class CalculationError(NavErosionError):
pass pass
class ValidationError(NavErosionError): class ValidationError(NavErosionError):
"""Raised when input validation fails""" """Error in risk validation."""
pass pass

View File

@ -1,11 +1,39 @@
""" """
Logging configuration for NAV Erosion Service Logger module for NAV Erosion Service
""" """
import logging import logging
import os import os
from datetime import datetime from datetime import datetime
from pathlib import Path
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance with proper configuration."""
logger = logging.getLogger(name)
# Create logs directory if it doesn't exist
log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'logs')
os.makedirs(log_dir, exist_ok=True)
# Set up file handler
log_file = os.path.join(log_dir, f'nav_erosion_{datetime.now().strftime("%Y%m%d")}.log')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# Set up console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
return logger
class ErosionRiskLogger: class ErosionRiskLogger:
"""Logger for NAV Erosion Service""" """Logger for NAV Erosion Service"""

View File

@ -10,20 +10,20 @@ from datetime import datetime
class NavErosionResult: class NavErosionResult:
"""Result of NAV erosion risk analysis for a single ETF""" """Result of NAV erosion risk analysis for a single ETF"""
ticker: str ticker: str
nav_erosion_risk: int # 0-9 scale nav_erosion_risk: float # 0-9 scale
yield_erosion_risk: int # 0-9 scale yield_erosion_risk: float # 0-9 scale
estimated_nav_erosion: float # Annual percentage estimated_nav_erosion: float # Annual percentage
estimated_yield_erosion: float # Annual percentage estimated_yield_erosion: float # Annual percentage
nav_risk_explanation: str nav_risk_explanation: str
yield_risk_explanation: str yield_risk_explanation: str
etf_age_years: Optional[float]
is_new_etf: bool
max_drawdown: Optional[float]
volatility: Optional[float]
sharpe_ratio: Optional[float]
sortino_ratio: Optional[float]
dividend_trend: Optional[float]
component_risks: Dict[str, float] # Detailed risk components component_risks: Dict[str, float] # Detailed risk components
etf_age_years: Optional[float] = None
is_new_etf: bool = False
max_drawdown: Optional[float] = None
volatility: Optional[float] = None
sharpe_ratio: Optional[float] = None
sortino_ratio: Optional[float] = None
dividend_trend: Optional[float] = None
@dataclass @dataclass
class NavErosionAnalysis: class NavErosionAnalysis:

68
test_data_service.py Normal file
View File

@ -0,0 +1,68 @@
from ETF_Portal.services.data_service import DataService
import logging
import json
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_data_fetching():
# Initialize service
service = DataService()
# Test portfolio
portfolio = ['MSTY', 'FEPI', 'JEPI', 'VTI', 'VOO']
for ticker in portfolio:
print(f"\nTesting {ticker}:")
print("=" * 50)
try:
# Get ETF data
data = service.get_etf_data(ticker)
# Print data source
if data.get('is_estimated'):
print("\nData Source: High Yield Estimates")
elif 'info' in data and data['info']:
print("\nData Source: FMP API")
else:
print("\nData Source: yfinance")
# Print data structure
print("\nData Structure:")
for key, value in data.items():
if isinstance(value, dict):
print(f"{key}: {len(value)} items")
if key == 'info' and value:
print(" Sample info fields:")
for k, v in list(value.items())[:5]:
print(f" {k}: {v}")
else:
print(f"{key}: {value}")
# Print key metrics
print("\nKey Metrics:")
print(f"Volatility: {data.get('volatility', 'N/A')}")
print(f"Max Drawdown: {data.get('max_drawdown', 'N/A')}")
print(f"Sharpe Ratio: {data.get('sharpe_ratio', 'N/A')}")
print(f"Sortino Ratio: {data.get('sortino_ratio', 'N/A')}")
print(f"Dividend Trend: {data.get('dividend_trend', 'N/A')}")
print(f"ETF Age: {data.get('age_years', 'N/A')} years")
print(f"Is New ETF: {data.get('is_new', 'N/A')}")
print(f"Is Estimated: {data.get('is_estimated', 'N/A')}")
# Save raw data for inspection
with open(f"{ticker}_data.json", 'w') as f:
json.dump(data, f, indent=2)
print(f"\nRaw data saved to {ticker}_data.json")
except Exception as e:
logger.error(f"Error testing {ticker}: {str(e)}")
raise
if __name__ == "__main__":
test_data_fetching()

45
test_erosion.py Normal file
View File

@ -0,0 +1,45 @@
from ETF_Portal.services.nav_erosion_service import NavErosionService
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_portfolio():
# Initialize service
service = NavErosionService()
# Test portfolio
portfolio = ['VTI', 'DEPI', 'MSTY', 'JEPI', 'VOO']
try:
# Analyze portfolio
analysis = service.analyze_etf_erosion_risk(portfolio)
# Print results
print("\nPortfolio Analysis Results:")
print("=" * 50)
print(f"Portfolio NAV Risk: {analysis.portfolio_nav_risk:.1f}/9")
print(f"Portfolio Yield Risk: {analysis.portfolio_yield_risk:.1f}/9")
print("\nDetailed Results:")
print("=" * 50)
for result in analysis.results:
print(f"\n{result.ticker}:")
print(f" NAV Erosion Risk: {result.nav_erosion_risk:.1f}/9")
print(f" Yield Erosion Risk: {result.yield_erosion_risk:.1f}/9")
print(f" Estimated NAV Erosion: {result.estimated_nav_erosion:.1%}")
print(f" Estimated Yield Erosion: {result.estimated_yield_erosion:.1%}")
print(f" NAV Risk Explanation: {result.nav_risk_explanation}")
print(f" Yield Risk Explanation: {result.yield_risk_explanation}")
if result.component_risks:
print(" Component Risks:")
for component, value in result.component_risks.items():
print(f" {component}: {value:.1%}")
except Exception as e:
logger.error(f"Error during analysis: {str(e)}")
raise
if __name__ == "__main__":
test_portfolio()