ETF_Suite_Portal/ETF_Portal/services/data_service.py

437 lines
18 KiB
Python

"""
Data Service for ETF data retrieval
"""
import os
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Optional, List
import yfinance as yf
import logging
from pathlib import Path
from ..api.factory import APIFactory
logger = logging.getLogger(__name__)
class DataService:
"""Service for retrieving ETF data with fallback logic"""
def __init__(self):
# Use existing cache structure
self.base_dir = Path(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
self.cache_dir = self.base_dir / 'cache'
self.yf_cache_dir = self.cache_dir / 'yfinance_cache'
self.fmp_cache_dir = self.cache_dir / 'FMP_cache'
self.fmp_profiles_dir = self.fmp_cache_dir / 'etf_profiles'
self.fmp_historical_dir = self.fmp_cache_dir / 'historical_data'
self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings'
self.cache_timeout = timedelta(hours=1)
# Create cache directories if they don't exist
for directory in [self.cache_dir, self.yf_cache_dir, self.fmp_cache_dir,
self.fmp_profiles_dir, self.fmp_historical_dir, self.fmp_holdings_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Initialize API factory
self.api_factory = APIFactory()
def get_etf_data(self, ticker: str) -> Dict:
"""Get ETF data using fallback logic:
1. Try FMP cache
2. Try FMP API
3. Try yfinance cache
4. Try yfinance
5. Use high yield estimates
"""
try:
# Try FMP cache first
fmp_cached_data = self._get_from_fmp_cache(ticker)
if fmp_cached_data:
logger.info(f"Using FMP cached data for {ticker}")
return fmp_cached_data
# Try FMP API
fmp_data = self._get_from_fmp(ticker)
if fmp_data:
logger.info(f"Using FMP data for {ticker}")
self._save_to_fmp_cache(ticker, fmp_data)
return fmp_data
# Try yfinance cache
yf_cached_data = self._get_from_yf_cache(ticker)
if yf_cached_data:
logger.info(f"Using yfinance cached data for {ticker}")
return yf_cached_data
# Try yfinance
yf_data = self._get_from_yfinance(ticker)
if yf_data:
logger.info(f"Using yfinance data for {ticker}")
self._save_to_yf_cache(ticker, yf_data)
return yf_data
# Use high yield estimates
logger.warning(f"Using high yield estimates for {ticker}")
return self._get_high_yield_estimates(ticker)
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {str(e)}")
return self._get_high_yield_estimates(ticker)
def _get_from_fmp_cache(self, ticker: str) -> Optional[Dict]:
"""Get data from FMP cache if available and not expired"""
# Check profile cache
profile_file = self.fmp_profiles_dir / f"{ticker}.json"
if not profile_file.exists():
return None
try:
with open(profile_file, 'r') as f:
profile_data = json.load(f)
# Check if cache is expired
cache_time = datetime.fromisoformat(profile_data['timestamp'])
if datetime.now() - cache_time > self.cache_timeout:
return None
# Get historical data
hist_file = self.fmp_historical_dir / f"{ticker}.json"
if hist_file.exists():
with open(hist_file, 'r') as f:
hist_data = json.load(f)
else:
hist_data = {}
# Get holdings data
holdings_file = self.fmp_holdings_dir / f"{ticker}.json"
if holdings_file.exists():
with open(holdings_file, 'r') as f:
holdings_data = json.load(f)
else:
holdings_data = {}
# Combine all data
return {
'info': profile_data['data'],
'hist': hist_data.get('data', {}),
'holdings': holdings_data.get('data', {}),
'volatility': profile_data['data'].get('volatility', 0.0),
'max_drawdown': profile_data['data'].get('maxDrawdown', 0.0),
'sharpe_ratio': profile_data['data'].get('sharpeRatio', 0.0),
'sortino_ratio': profile_data['data'].get('sortinoRatio', 0.0),
'dividend_trend': profile_data['data'].get('dividendTrend', 0.0),
'age_years': profile_data['data'].get('ageYears', 0.0),
'is_new': profile_data['data'].get('ageYears', 0.0) < 2
}
except Exception as e:
logger.warning(f"Error reading FMP cache for {ticker}: {str(e)}")
return None
def _get_from_yf_cache(self, ticker: str) -> Optional[Dict]:
"""Get data from yfinance cache if available and not expired"""
cache_file = self.yf_cache_dir / f"{ticker}_data.json"
if not cache_file.exists():
return None
try:
with open(cache_file, 'r') as f:
data = json.load(f)
# Check if cache is expired
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > self.cache_timeout:
return None
return data['data']
except Exception as e:
logger.warning(f"Error reading yfinance cache for {ticker}: {str(e)}")
return None
def _save_to_fmp_cache(self, ticker: str, data: Dict):
"""Save data to FMP cache"""
try:
# Save profile data
profile_data = {
'timestamp': datetime.now().isoformat(),
'data': data['info']
}
profile_file = self.fmp_profiles_dir / f"{ticker}.json"
with open(profile_file, 'w') as f:
json.dump(profile_data, f)
# Save historical data
if 'hist' in data:
hist_data = {
'timestamp': datetime.now().isoformat(),
'data': data['hist']
}
hist_file = self.fmp_historical_dir / f"{ticker}.json"
with open(hist_file, 'w') as f:
json.dump(hist_data, f)
# Save holdings data
if 'holdings' in data:
holdings_data = {
'timestamp': datetime.now().isoformat(),
'data': data['holdings']
}
holdings_file = self.fmp_holdings_dir / f"{ticker}.json"
with open(holdings_file, 'w') as f:
json.dump(holdings_data, f)
except Exception as e:
logger.warning(f"Error saving FMP cache for {ticker}: {str(e)}")
def _save_to_yf_cache(self, ticker: str, data: Dict):
"""Save data to yfinance cache"""
try:
cache_data = {
'timestamp': datetime.now().isoformat(),
'data': data
}
cache_file = self.yf_cache_dir / f"{ticker}_data.json"
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
except Exception as e:
logger.warning(f"Error saving yfinance cache for {ticker}: {str(e)}")
def _get_from_fmp(self, ticker: str) -> Optional[Dict]:
"""Get data from FMP API"""
try:
# Get FMP client
fmp_client = self.api_factory.get_client('fmp')
# Get ETF profile
profile = fmp_client.get_etf_profile(ticker)
if not profile:
return None
# Get historical data
hist_data = fmp_client.get_etf_historical_data(ticker)
if hist_data.empty:
return None
# Get holdings
holdings = fmp_client.get_etf_holdings(ticker)
# Get dividend history
dividend_history = fmp_client.get_dividend_history(ticker)
# Get sector weightings
sector_weightings = fmp_client.get_sector_weightings(ticker)
# Calculate metrics
hist_data['log_returns'] = np.log(hist_data['close'] / hist_data['close'].shift(1))
returns = hist_data['log_returns'].dropna()
# Calculate annualized volatility using daily log returns
volatility = returns.std() * np.sqrt(252)
# Calculate max drawdown using rolling window
rolling_max = hist_data['close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist_data['close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate Sharpe ratio (assuming risk-free rate of 0.02)
risk_free_rate = 0.02
excess_returns = returns - risk_free_rate/252
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std()
# Calculate Sortino ratio
downside_returns = returns[returns < 0]
sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std()
# Calculate dividend trend
if not dividend_history.empty:
dividend_history['date'] = pd.to_datetime(dividend_history['date'])
dividend_history = dividend_history.sort_values('date')
dividend_trend = dividend_history['dividend'].pct_change().mean() * 100
else:
dividend_trend = 0.0
# Calculate age in years
if 'inceptionDate' in profile:
inception_date = pd.to_datetime(profile['inceptionDate'])
age_years = (pd.Timestamp.now() - inception_date).days / 365.25
else:
age_years = 0.0
return {
'info': profile,
'hist': hist_data.to_dict('records'),
'holdings': holdings,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years < 2
}
except Exception as e:
logger.error(f"Error fetching FMP data for {ticker}: {str(e)}")
return None
def _get_from_yfinance(self, ticker: str) -> Optional[Dict]:
"""Get data from yfinance"""
try:
yf_ticker = yf.Ticker(ticker)
# Get basic info
info = yf_ticker.info
if not info:
return None
# Get historical data - use 5 years for better calculations
hist = yf_ticker.history(period="5y")
if hist.empty:
return None
# Get current price
current_price = info.get('regularMarketPrice', hist['Close'].iloc[-1])
# Get dividend yield
dividend_yield = info.get('dividendYield', 0) * 100 # Convert to percentage
# Get dividends with proper handling
try:
dividends = yf_ticker.dividends
if dividends is None or dividends.empty:
# Try to get dividend info from info
dividend_rate = info.get('dividendRate', 0)
if dividend_rate > 0:
# Create a synthetic dividend series
annual_dividend = dividend_rate
monthly_dividend = annual_dividend / 12
dividends = pd.Series(monthly_dividend, index=hist.index)
else:
dividends = pd.Series(0, index=hist.index)
except Exception as e:
logger.warning(f"Error getting dividends for {ticker}: {str(e)}")
dividends = pd.Series(0, index=hist.index)
# Calculate metrics with proper annualization
hist['log_returns'] = np.log(hist['Close'] / hist['Close'].shift(1))
returns = hist['log_returns'].dropna()
# Calculate annualized volatility using daily log returns
volatility = returns.std() * np.sqrt(252)
# Calculate max drawdown using rolling window
rolling_max = hist['Close'].rolling(window=252, min_periods=1).max()
daily_drawdown = hist['Close'] / rolling_max - 1.0
max_drawdown = abs(daily_drawdown.min())
# Calculate annualized return
annual_return = returns.mean() * 252
# Calculate Sharpe and Sortino ratios with proper risk-free rate
risk_free_rate = 0.05 # Current 3-month Treasury yield
excess_returns = returns - risk_free_rate/252
# Sharpe Ratio
if volatility > 0:
sharpe_ratio = (annual_return - risk_free_rate) / volatility
else:
sharpe_ratio = 0
# Sortino Ratio
downside_returns = returns[returns < 0]
if len(downside_returns) > 0:
downside_volatility = downside_returns.std() * np.sqrt(252)
if downside_volatility > 0:
sortino_ratio = (annual_return - risk_free_rate) / downside_volatility
else:
sortino_ratio = 0
else:
sortino_ratio = 0
# Calculate dividend trend
if not dividends.empty:
dividend_trend = (dividends.iloc[-1] / dividends.iloc[0]) - 1 if dividends.iloc[0] > 0 else 0
else:
dividend_trend = 0
# Calculate ETF age
if 'firstTradeDateEpochUtc' in info:
age_years = (datetime.now() - datetime.fromtimestamp(info['firstTradeDateEpochUtc'])).days / 365.25
else:
age_years = 0
# Return formatted data
return {
'price': current_price,
'dividend_yield': dividend_yield,
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': age_years,
'is_new': age_years < 2,
'info': info,
'hist': hist.to_dict('records'),
'dividends': dividends.to_dict()
}
except Exception as e:
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
return None
def _get_high_yield_estimates(self, ticker: str) -> Dict:
"""Get conservative high yield estimates when no data is available"""
# Determine ETF type based on ticker
if ticker in ['JEPI', 'FEPI', 'MSTY']: # Income ETFs
max_drawdown = 0.10 # 10% for income ETFs
volatility = 0.15 # 15% volatility
sharpe_ratio = 0.8 # Lower Sharpe for income ETFs
sortino_ratio = 1.2 # Higher Sortino for income ETFs
dividend_trend = 0.05 # 5% dividend growth for income ETFs
elif ticker in ['VTI', 'VOO']: # Growth ETFs
max_drawdown = 0.25 # 25% for growth ETFs
volatility = 0.20 # 20% volatility
sharpe_ratio = 1.2 # Higher Sharpe for growth ETFs
sortino_ratio = 1.5 # Higher Sortino for growth ETFs
dividend_trend = 0.10 # 10% dividend growth for growth ETFs
else: # Balanced ETFs
max_drawdown = 0.20 # 20% for balanced ETFs
volatility = 0.18 # 18% volatility
sharpe_ratio = 1.0 # Moderate Sharpe for balanced ETFs
sortino_ratio = 1.3 # Moderate Sortino for balanced ETFs
dividend_trend = 0.07 # 7% dividend growth for balanced ETFs
return {
'info': {},
'hist': {},
'dividends': {},
'volatility': volatility,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'dividend_trend': dividend_trend,
'age_years': 3.0, # Conservative estimate
'is_new': False,
'is_estimated': True # Flag to indicate these are estimates
}
def get_etf_list(self) -> List[str]:
"""Get list of available ETFs"""
try:
# Define a list of high-yield ETFs to track
etf_list = [
'JEPI', 'JEPQ', 'FEPI', 'CONY', 'MSTY', 'SDIV', 'DIV', 'VIGI',
'VYM', 'VIG', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM',
'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD',
'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO'
]
# Remove duplicates while preserving order
return list(dict.fromkeys(etf_list))
except Exception as e:
logger.error(f"Error getting ETF list: {str(e)}")
return []