""" Data Service for ETF data retrieval """ import os import json import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, Optional, List import yfinance as yf import logging from pathlib import Path from ..api.factory import APIFactory logger = logging.getLogger(__name__) class DataService: """Service for retrieving ETF data with fallback logic""" def __init__(self): # Use existing cache structure self.base_dir = Path(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) self.cache_dir = self.base_dir / 'cache' self.yf_cache_dir = self.cache_dir / 'yfinance_cache' self.fmp_cache_dir = self.cache_dir / 'FMP_cache' self.fmp_profiles_dir = self.fmp_cache_dir / 'etf_profiles' self.fmp_historical_dir = self.fmp_cache_dir / 'historical_data' self.fmp_holdings_dir = self.fmp_cache_dir / 'etf_holdings' self.cache_timeout = timedelta(hours=1) # Create cache directories if they don't exist for directory in [self.cache_dir, self.yf_cache_dir, self.fmp_cache_dir, self.fmp_profiles_dir, self.fmp_historical_dir, self.fmp_holdings_dir]: directory.mkdir(parents=True, exist_ok=True) # Initialize API factory self.api_factory = APIFactory() def get_etf_data(self, ticker: str) -> Dict: """Get ETF data using fallback logic: 1. Try FMP cache 2. Try FMP API 3. Try yfinance cache 4. Try yfinance 5. Use high yield estimates """ try: # Try FMP cache first fmp_cached_data = self._get_from_fmp_cache(ticker) if fmp_cached_data: logger.info(f"Using FMP cached data for {ticker}") return fmp_cached_data # Try FMP API fmp_data = self._get_from_fmp(ticker) if fmp_data: logger.info(f"Using FMP data for {ticker}") self._save_to_fmp_cache(ticker, fmp_data) return fmp_data # Try yfinance cache yf_cached_data = self._get_from_yf_cache(ticker) if yf_cached_data: logger.info(f"Using yfinance cached data for {ticker}") return yf_cached_data # Try yfinance yf_data = self._get_from_yfinance(ticker) if yf_data: logger.info(f"Using yfinance data for {ticker}") self._save_to_yf_cache(ticker, yf_data) return yf_data # Use high yield estimates logger.warning(f"Using high yield estimates for {ticker}") return self._get_high_yield_estimates(ticker) except Exception as e: logger.error(f"Error fetching data for {ticker}: {str(e)}") return self._get_high_yield_estimates(ticker) def _get_from_fmp_cache(self, ticker: str) -> Optional[Dict]: """Get data from FMP cache if available and not expired""" # Check profile cache profile_file = self.fmp_profiles_dir / f"{ticker}.json" if not profile_file.exists(): return None try: with open(profile_file, 'r') as f: profile_data = json.load(f) # Check if cache is expired cache_time = datetime.fromisoformat(profile_data['timestamp']) if datetime.now() - cache_time > self.cache_timeout: return None # Get historical data hist_file = self.fmp_historical_dir / f"{ticker}.json" if hist_file.exists(): with open(hist_file, 'r') as f: hist_data = json.load(f) else: hist_data = {} # Get holdings data holdings_file = self.fmp_holdings_dir / f"{ticker}.json" if holdings_file.exists(): with open(holdings_file, 'r') as f: holdings_data = json.load(f) else: holdings_data = {} # Combine all data return { 'info': profile_data['data'], 'hist': hist_data.get('data', {}), 'holdings': holdings_data.get('data', {}), 'volatility': profile_data['data'].get('volatility', 0.0), 'max_drawdown': profile_data['data'].get('maxDrawdown', 0.0), 'sharpe_ratio': profile_data['data'].get('sharpeRatio', 0.0), 'sortino_ratio': profile_data['data'].get('sortinoRatio', 0.0), 'dividend_trend': profile_data['data'].get('dividendTrend', 0.0), 'age_years': profile_data['data'].get('ageYears', 0.0), 'is_new': profile_data['data'].get('ageYears', 0.0) < 2 } except Exception as e: logger.warning(f"Error reading FMP cache for {ticker}: {str(e)}") return None def _get_from_yf_cache(self, ticker: str) -> Optional[Dict]: """Get data from yfinance cache if available and not expired""" cache_file = self.yf_cache_dir / f"{ticker}_data.json" if not cache_file.exists(): return None try: with open(cache_file, 'r') as f: data = json.load(f) # Check if cache is expired cache_time = datetime.fromisoformat(data['timestamp']) if datetime.now() - cache_time > self.cache_timeout: return None return data['data'] except Exception as e: logger.warning(f"Error reading yfinance cache for {ticker}: {str(e)}") return None def _save_to_fmp_cache(self, ticker: str, data: Dict): """Save data to FMP cache""" try: # Save profile data profile_data = { 'timestamp': datetime.now().isoformat(), 'data': data['info'] } profile_file = self.fmp_profiles_dir / f"{ticker}.json" with open(profile_file, 'w') as f: json.dump(profile_data, f) # Save historical data if 'hist' in data: hist_data = { 'timestamp': datetime.now().isoformat(), 'data': data['hist'] } hist_file = self.fmp_historical_dir / f"{ticker}.json" with open(hist_file, 'w') as f: json.dump(hist_data, f) # Save holdings data if 'holdings' in data: holdings_data = { 'timestamp': datetime.now().isoformat(), 'data': data['holdings'] } holdings_file = self.fmp_holdings_dir / f"{ticker}.json" with open(holdings_file, 'w') as f: json.dump(holdings_data, f) except Exception as e: logger.warning(f"Error saving FMP cache for {ticker}: {str(e)}") def _save_to_yf_cache(self, ticker: str, data: Dict): """Save data to yfinance cache""" try: cache_data = { 'timestamp': datetime.now().isoformat(), 'data': data } cache_file = self.yf_cache_dir / f"{ticker}_data.json" with open(cache_file, 'w') as f: json.dump(cache_data, f) except Exception as e: logger.warning(f"Error saving yfinance cache for {ticker}: {str(e)}") def _get_from_fmp(self, ticker: str) -> Optional[Dict]: """Get data from FMP API""" try: # Get FMP client fmp_client = self.api_factory.get_client('fmp') # Get ETF profile profile = fmp_client.get_etf_profile(ticker) if not profile: return None # Get historical data hist_data = fmp_client.get_etf_historical_data(ticker) if hist_data.empty: return None # Get holdings holdings = fmp_client.get_etf_holdings(ticker) # Get dividend history dividend_history = fmp_client.get_dividend_history(ticker) # Get sector weightings sector_weightings = fmp_client.get_sector_weightings(ticker) # Calculate metrics hist_data['log_returns'] = np.log(hist_data['close'] / hist_data['close'].shift(1)) returns = hist_data['log_returns'].dropna() # Calculate annualized volatility using daily log returns volatility = returns.std() * np.sqrt(252) # Calculate max drawdown using rolling window rolling_max = hist_data['close'].rolling(window=252, min_periods=1).max() daily_drawdown = hist_data['close'] / rolling_max - 1.0 max_drawdown = abs(daily_drawdown.min()) # Calculate Sharpe ratio (assuming risk-free rate of 0.02) risk_free_rate = 0.02 excess_returns = returns - risk_free_rate/252 sharpe_ratio = np.sqrt(252) * excess_returns.mean() / returns.std() # Calculate Sortino ratio downside_returns = returns[returns < 0] sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std() # Calculate dividend trend if not dividend_history.empty: dividend_history['date'] = pd.to_datetime(dividend_history['date']) dividend_history = dividend_history.sort_values('date') dividend_trend = dividend_history['dividend'].pct_change().mean() * 100 else: dividend_trend = 0.0 # Calculate age in years if 'inceptionDate' in profile: inception_date = pd.to_datetime(profile['inceptionDate']) age_years = (pd.Timestamp.now() - inception_date).days / 365.25 else: age_years = 0.0 return { 'info': profile, 'hist': hist_data.to_dict('records'), 'holdings': holdings, 'volatility': volatility, 'max_drawdown': max_drawdown, 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio, 'dividend_trend': dividend_trend, 'age_years': age_years, 'is_new': age_years < 2 } except Exception as e: logger.error(f"Error fetching FMP data for {ticker}: {str(e)}") return None def _get_from_yfinance(self, ticker: str) -> Optional[Dict]: """Get data from yfinance""" try: yf_ticker = yf.Ticker(ticker) # Get basic info info = yf_ticker.info if not info: return None # Get historical data - use 5 years for better calculations hist = yf_ticker.history(period="5y") if hist.empty: return None # Get current price current_price = info.get('regularMarketPrice', hist['Close'].iloc[-1]) # Get dividend yield dividend_yield = info.get('dividendYield', 0) * 100 # Convert to percentage # Get dividends with proper handling try: dividends = yf_ticker.dividends if dividends is None or dividends.empty: # Try to get dividend info from info dividend_rate = info.get('dividendRate', 0) if dividend_rate > 0: # Create a synthetic dividend series annual_dividend = dividend_rate monthly_dividend = annual_dividend / 12 dividends = pd.Series(monthly_dividend, index=hist.index) else: dividends = pd.Series(0, index=hist.index) except Exception as e: logger.warning(f"Error getting dividends for {ticker}: {str(e)}") dividends = pd.Series(0, index=hist.index) # Calculate metrics with proper annualization hist['log_returns'] = np.log(hist['Close'] / hist['Close'].shift(1)) returns = hist['log_returns'].dropna() # Calculate annualized volatility using daily log returns volatility = returns.std() * np.sqrt(252) # Calculate max drawdown using rolling window rolling_max = hist['Close'].rolling(window=252, min_periods=1).max() daily_drawdown = hist['Close'] / rolling_max - 1.0 max_drawdown = abs(daily_drawdown.min()) # Calculate annualized return annual_return = returns.mean() * 252 # Calculate Sharpe and Sortino ratios with proper risk-free rate risk_free_rate = 0.05 # Current 3-month Treasury yield excess_returns = returns - risk_free_rate/252 # Sharpe Ratio if volatility > 0: sharpe_ratio = (annual_return - risk_free_rate) / volatility else: sharpe_ratio = 0 # Sortino Ratio downside_returns = returns[returns < 0] if len(downside_returns) > 0: downside_volatility = downside_returns.std() * np.sqrt(252) if downside_volatility > 0: sortino_ratio = (annual_return - risk_free_rate) / downside_volatility else: sortino_ratio = 0 else: sortino_ratio = 0 # Calculate dividend trend if not dividends.empty: dividend_trend = (dividends.iloc[-1] / dividends.iloc[0]) - 1 if dividends.iloc[0] > 0 else 0 else: dividend_trend = 0 # Calculate ETF age if 'firstTradeDateEpochUtc' in info: age_years = (datetime.now() - datetime.fromtimestamp(info['firstTradeDateEpochUtc'])).days / 365.25 else: age_years = 0 # Return formatted data return { 'price': current_price, 'dividend_yield': dividend_yield, 'volatility': volatility, 'max_drawdown': max_drawdown, 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio, 'dividend_trend': dividend_trend, 'age_years': age_years, 'is_new': age_years < 2, 'info': info, 'hist': hist.to_dict('records'), 'dividends': dividends.to_dict() } except Exception as e: logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}") return None def _get_high_yield_estimates(self, ticker: str) -> Dict: """Get conservative high yield estimates when no data is available""" # Determine ETF type based on ticker if ticker in ['JEPI', 'FEPI', 'MSTY']: # Income ETFs max_drawdown = 0.10 # 10% for income ETFs volatility = 0.15 # 15% volatility sharpe_ratio = 0.8 # Lower Sharpe for income ETFs sortino_ratio = 1.2 # Higher Sortino for income ETFs dividend_trend = 0.05 # 5% dividend growth for income ETFs elif ticker in ['VTI', 'VOO']: # Growth ETFs max_drawdown = 0.25 # 25% for growth ETFs volatility = 0.20 # 20% volatility sharpe_ratio = 1.2 # Higher Sharpe for growth ETFs sortino_ratio = 1.5 # Higher Sortino for growth ETFs dividend_trend = 0.10 # 10% dividend growth for growth ETFs else: # Balanced ETFs max_drawdown = 0.20 # 20% for balanced ETFs volatility = 0.18 # 18% volatility sharpe_ratio = 1.0 # Moderate Sharpe for balanced ETFs sortino_ratio = 1.3 # Moderate Sortino for balanced ETFs dividend_trend = 0.07 # 7% dividend growth for balanced ETFs return { 'info': {}, 'hist': {}, 'dividends': {}, 'volatility': volatility, 'max_drawdown': max_drawdown, 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio, 'dividend_trend': dividend_trend, 'age_years': 3.0, # Conservative estimate 'is_new': False, 'is_estimated': True # Flag to indicate these are estimates } def get_etf_list(self) -> List[str]: """Get list of available ETFs""" try: # Define a list of high-yield ETFs to track etf_list = [ 'JEPI', 'JEPQ', 'FEPI', 'CONY', 'MSTY', 'SDIV', 'DIV', 'VIGI', 'VYM', 'VIG', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO', 'VIGI', 'VIG', 'VYM', 'DVY', 'SCHD', 'DGRO' ] # Remove duplicates while preserving order return list(dict.fromkeys(etf_list)) except Exception as e: logger.error(f"Error getting ETF list: {str(e)}") return []