diff --git a/ETF_Portal/api/__init__.py b/ETF_Portal/api/__init__.py new file mode 100644 index 0000000..e8d1a0a --- /dev/null +++ b/ETF_Portal/api/__init__.py @@ -0,0 +1,4 @@ +from .factory import APIFactory +from .base import BaseAPIClient + +__all__ = ['APIFactory', 'BaseAPIClient'] \ No newline at end of file diff --git a/ETF_Portal/api/base.py b/ETF_Portal/api/base.py new file mode 100644 index 0000000..732eff7 --- /dev/null +++ b/ETF_Portal/api/base.py @@ -0,0 +1,108 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Union +import pandas as pd +from datetime import datetime +import time + +class BaseAPIClient(ABC): + """Base class for all API clients.""" + + def __init__(self, api_key: Optional[str] = None): + self.api_key = api_key + self.last_request_time = None + self.rate_limit_delay = 1.0 # Default 1 second between requests + + @abstractmethod + def get_etf_profile(self, symbol: str) -> Dict: + """Get ETF profile data. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary containing ETF profile information + """ + pass + + @abstractmethod + def get_etf_holdings(self, symbol: str) -> List[Dict]: + """Get ETF holdings data. + + Args: + symbol: ETF ticker symbol + + Returns: + List of dictionaries containing holding information + """ + pass + + @abstractmethod + def get_historical_data(self, symbol: str, period: str = '1y') -> pd.DataFrame: + """Get historical price data. + + Args: + symbol: ETF ticker symbol + period: Time period (e.g., '1d', '1w', '1m', '1y') + + Returns: + DataFrame with historical price data + """ + pass + + @abstractmethod + def get_dividend_history(self, symbol: str) -> pd.DataFrame: + """Get dividend history. + + Args: + symbol: ETF ticker symbol + + Returns: + DataFrame with dividend history + """ + pass + + @abstractmethod + def get_sector_weightings(self, symbol: str) -> Dict: + """Get sector weightings. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary with sector weightings + """ + pass + + def _check_rate_limit(self): + """Check and enforce rate limiting.""" + if self.last_request_time: + time_since_last = (datetime.now() - self.last_request_time).total_seconds() + if time_since_last < self.rate_limit_delay: + time.sleep(self.rate_limit_delay - time_since_last) + self.last_request_time = datetime.now() + + def _validate_symbol(self, symbol: str) -> bool: + """Validate ETF symbol format. + + Args: + symbol: ETF ticker symbol + + Returns: + True if valid, False otherwise + """ + return bool(symbol and isinstance(symbol, str) and symbol.isupper()) + + def _handle_error(self, error: Exception) -> Dict: + """Handle API errors consistently. + + Args: + error: Exception that occurred + + Returns: + Dictionary with error information + """ + return { + 'error': True, + 'message': str(error), + 'timestamp': datetime.now().isoformat() + } \ No newline at end of file diff --git a/ETF_Portal/api/factory.py b/ETF_Portal/api/factory.py new file mode 100644 index 0000000..a40c9a7 --- /dev/null +++ b/ETF_Portal/api/factory.py @@ -0,0 +1,148 @@ +from typing import Optional, Dict, Any +import logging +import os +from .base import BaseAPIClient +from .fmp.client import FMPClient +from .yfinance.client import YFinanceClient +from ..cache.fmp_cache import FMPCacheManager +from ..cache.yfinance_cache import YFinanceCacheManager + +class APIFactory: + """Factory for creating and managing API clients.""" + + def __init__(self, fmp_api_key: Optional[str] = None): + """Initialize API factory. + + Args: + fmp_api_key: Optional FMP API key. If not provided, will try to get from environment variable. + """ + # Try to get API key from environment variable if not provided + self.fmp_api_key = fmp_api_key or os.environ.get('FMP_API_KEY') + if not self.fmp_api_key: + logging.warning("No FMP API key found in environment. Some features may be limited.") + + self.logger = logging.getLogger(self.__class__.__name__) + self._clients: Dict[str, BaseAPIClient] = {} + + def get_client(self, provider: str = 'fmp') -> BaseAPIClient: + """Get an API client instance. + + Args: + provider: API provider ('fmp' or 'yfinance') + + Returns: + API client instance + + Raises: + ValueError: If provider is invalid or FMP API key is missing + """ + provider = provider.lower() + + if provider not in ['fmp', 'yfinance']: + raise ValueError(f"Invalid provider: {provider}") + + if provider in self._clients: + return self._clients[provider] + + if provider == 'fmp': + if not self.fmp_api_key: + raise ValueError("FMP API key is required") + client = FMPClient(self.fmp_api_key) + else: # yfinance + client = YFinanceClient() + + self._clients[provider] = client + return client + + def get_data(self, symbol: str, data_type: str, provider: str = 'fmp', fallback: bool = True) -> Any: + """Get data from API with fallback support. + + Args: + symbol: ETF ticker symbol + data_type: Type of data to retrieve + provider: Primary API provider + fallback: Whether to fall back to yfinance if primary fails + + Returns: + Requested data or error information + """ + try: + # Try primary provider + client = self.get_client(provider) + data = getattr(client, f"get_{data_type}")(symbol) + + # Check if data is valid + if isinstance(data, dict) and data.get('error'): + if fallback and provider == 'fmp': + self.logger.info(f"Falling back to yfinance for {symbol}") + return self.get_data(symbol, data_type, 'yfinance', False) + return data + + return data + + except Exception as e: + self.logger.error(f"Error getting {data_type} for {symbol}: {str(e)}") + + if fallback and provider == 'fmp': + self.logger.info(f"Falling back to yfinance for {symbol}") + return self.get_data(symbol, data_type, 'yfinance', False) + + return { + 'error': True, + 'message': str(e), + 'provider': provider, + 'data_type': data_type, + 'symbol': symbol + } + + def clear_cache(self, provider: Optional[str] = None) -> Dict[str, int]: + """Clear cache for specified provider or all providers. + + Args: + provider: Optional provider to clear cache for + + Returns: + Dictionary with number of files cleared per provider + """ + results = {} + + if provider: + providers = [provider] + else: + providers = ['fmp', 'yfinance'] + + for prov in providers: + try: + client = self.get_client(prov) + results[prov] = client.clear_cache() + except Exception as e: + self.logger.error(f"Error clearing cache for {prov}: {str(e)}") + results[prov] = 0 + + return results + + def get_cache_stats(self, provider: Optional[str] = None) -> Dict[str, Dict]: + """Get cache statistics for specified provider or all providers. + + Args: + provider: Optional provider to get stats for + + Returns: + Dictionary with cache statistics per provider + """ + results = {} + + if provider: + providers = [provider] + else: + providers = ['fmp', 'yfinance'] + + for prov in providers: + try: + client = self.get_client(prov) + results[prov] = client.get_cache_stats() + except Exception as e: + self.logger.error(f"Error getting cache stats for {prov}: {str(e)}") + results[prov] = {} + + return results \ No newline at end of file diff --git a/ETF_Portal/api/fmp/__init__.py b/ETF_Portal/api/fmp/__init__.py new file mode 100644 index 0000000..282bc7f --- /dev/null +++ b/ETF_Portal/api/fmp/__init__.py @@ -0,0 +1,3 @@ +from .client import FMPClient + +__all__ = ['FMPClient'] \ No newline at end of file diff --git a/ETF_Portal/api/fmp/client.py b/ETF_Portal/api/fmp/client.py new file mode 100644 index 0000000..d361086 --- /dev/null +++ b/ETF_Portal/api/fmp/client.py @@ -0,0 +1,163 @@ +import requests +import pandas as pd +from typing import Dict, List, Optional +from datetime import datetime +import logging +from ..base import BaseAPIClient +from ...cache.fmp_cache import FMPCacheManager + +class FMPClient(BaseAPIClient): + """Financial Modeling Prep API client.""" + + BASE_URL = "https://financialmodelingprep.com/api/v3" + + def __init__(self, api_key: str, cache_manager: Optional[FMPCacheManager] = None): + """Initialize FMP client. + + Args: + api_key: FMP API key + cache_manager: Optional cache manager instance + """ + super().__init__(api_key) + self.cache_manager = cache_manager or FMPCacheManager() + self.logger = logging.getLogger(self.__class__.__name__) + + def _make_request(self, endpoint: str, params: Dict = None) -> Dict: + """Make API request to FMP. + + Args: + endpoint: API endpoint + params: Query parameters + + Returns: + API response data + """ + # Check cache first + if self.cache_manager: + cached_data, is_valid = self.cache_manager.get(endpoint, params) + if is_valid: + return cached_data + + # Prepare request + url = f"{self.BASE_URL}/{endpoint}" + params = params or {} + params['apikey'] = self.api_key + + # Check rate limit + self._check_rate_limit() + + try: + response = requests.get(url, params=params) + response.raise_for_status() + data = response.json() + + # Cache the response + if self.cache_manager: + self.cache_manager.set(endpoint, data, params) + + return data + + except requests.exceptions.RequestException as e: + self.logger.error(f"FMP API request failed: {str(e)}") + return self._handle_error(e) + + def get_etf_profile(self, symbol: str) -> Dict: + """Get ETF profile data. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary containing ETF profile information + """ + if not self._validate_symbol(symbol): + return self._handle_error(ValueError(f"Invalid symbol: {symbol}")) + + return self._make_request(f"etf/profile/{symbol}") + + def get_etf_holdings(self, symbol: str) -> List[Dict]: + """Get ETF holdings data. + + Args: + symbol: ETF ticker symbol + + Returns: + List of dictionaries containing holding information + """ + if not self._validate_symbol(symbol): + return [self._handle_error(ValueError(f"Invalid symbol: {symbol}"))] + + return self._make_request(f"etf/holdings/{symbol}") + + def get_historical_data(self, symbol: str, period: str = '1y') -> pd.DataFrame: + """Get historical price data. + + Args: + symbol: ETF ticker symbol + period: Time period (e.g., '1d', '1w', '1m', '1y') + + Returns: + DataFrame with historical price data + """ + if not self._validate_symbol(symbol): + return pd.DataFrame() + + data = self._make_request(f"etf/historical-price/{symbol}", {'period': period}) + + if isinstance(data, dict) and data.get('error'): + return pd.DataFrame() + + return pd.DataFrame(data) + + def get_dividend_history(self, symbol: str) -> pd.DataFrame: + """Get dividend history. + + Args: + symbol: ETF ticker symbol + + Returns: + DataFrame with dividend history + """ + if not self._validate_symbol(symbol): + return pd.DataFrame() + + data = self._make_request(f"etf/dividend/{symbol}") + + if isinstance(data, dict) and data.get('error'): + return pd.DataFrame() + + return pd.DataFrame(data) + + def get_sector_weightings(self, symbol: str) -> Dict: + """Get sector weightings. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary with sector weightings + """ + if not self._validate_symbol(symbol): + return self._handle_error(ValueError(f"Invalid symbol: {symbol}")) + + return self._make_request(f"etf/sector-weightings/{symbol}") + + def clear_cache(self) -> int: + """Clear expired cache entries. + + Returns: + Number of files cleared + """ + if self.cache_manager: + return self.cache_manager.clear_expired() + return 0 + + def get_cache_stats(self) -> Dict: + """Get cache statistics. + + Returns: + Dictionary with cache statistics + """ + if self.cache_manager: + return self.cache_manager.get_stats() + return {} \ No newline at end of file diff --git a/ETF_Portal/api/yfinance/__init__.py b/ETF_Portal/api/yfinance/__init__.py new file mode 100644 index 0000000..c28c18c --- /dev/null +++ b/ETF_Portal/api/yfinance/__init__.py @@ -0,0 +1,3 @@ +from .client import YFinanceClient + +__all__ = ['YFinanceClient'] \ No newline at end of file diff --git a/ETF_Portal/api/yfinance/client.py b/ETF_Portal/api/yfinance/client.py new file mode 100644 index 0000000..7af1de5 --- /dev/null +++ b/ETF_Portal/api/yfinance/client.py @@ -0,0 +1,199 @@ +import yfinance as yf +import pandas as pd +from typing import Dict, List, Optional +from datetime import datetime +import logging +from ..base import BaseAPIClient +from ...cache.yfinance_cache import YFinanceCacheManager + +class YFinanceClient(BaseAPIClient): + """Yahoo Finance API client.""" + + def __init__(self, cache_manager: Optional[YFinanceCacheManager] = None): + """Initialize yfinance client. + + Args: + cache_manager: Optional cache manager instance + """ + super().__init__(None) # yfinance doesn't need API key + self.cache_manager = cache_manager or YFinanceCacheManager() + self.logger = logging.getLogger(self.__class__.__name__) + + def _get_ticker(self, symbol: str) -> Optional[yf.Ticker]: + """Get yfinance Ticker object. + + Args: + symbol: ETF ticker symbol + + Returns: + yfinance Ticker object or None if invalid + """ + if not self._validate_symbol(symbol): + return None + return yf.Ticker(symbol) + + def _make_request(self, endpoint: str, params: Dict = None) -> Dict: + """Make API request to yfinance. + + Args: + endpoint: API endpoint + params: Query parameters + + Returns: + API response data + """ + # Check cache first + if self.cache_manager: + cached_data, is_valid = self.cache_manager.get(endpoint, params) + if is_valid: + return cached_data + + try: + symbol = params.get('symbol') if params else None + if not symbol: + raise ValueError("Symbol is required") + + ticker = self._get_ticker(symbol) + if not ticker: + raise ValueError(f"Invalid symbol: {symbol}") + + # Get data based on endpoint + if endpoint == "info": + data = ticker.info + elif endpoint == "holdings": + data = ticker.get_holdings() + elif endpoint == "history": + period = params.get('period', '1y') + data = ticker.history(period=period).to_dict('records') + else: + raise ValueError(f"Unknown endpoint: {endpoint}") + + # Cache the response + if self.cache_manager: + self.cache_manager.set(endpoint, data, params) + + return data + + except Exception as e: + self.logger.error(f"yfinance API request failed: {str(e)}") + return self._handle_error(e) + + def get_etf_profile(self, symbol: str) -> Dict: + """Get ETF profile data. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary containing ETF profile information + """ + if not self._validate_symbol(symbol): + return self._handle_error(ValueError(f"Invalid symbol: {symbol}")) + + return self._make_request("info", {'symbol': symbol}) + + def get_etf_holdings(self, symbol: str) -> List[Dict]: + """Get ETF holdings data. + + Args: + symbol: ETF ticker symbol + + Returns: + List of dictionaries containing holding information + """ + if not self._validate_symbol(symbol): + return [self._handle_error(ValueError(f"Invalid symbol: {symbol}"))] + + data = self._make_request("holdings", {'symbol': symbol}) + return data if isinstance(data, list) else [data] + + def get_historical_data(self, symbol: str, period: str = '1y') -> pd.DataFrame: + """Get historical price data. + + Args: + symbol: ETF ticker symbol + period: Time period (e.g., '1d', '1w', '1m', '1y') + + Returns: + DataFrame with historical price data + """ + if not self._validate_symbol(symbol): + return pd.DataFrame() + + data = self._make_request("history", {'symbol': symbol, 'period': period}) + + if isinstance(data, dict) and data.get('error'): + return pd.DataFrame() + + return pd.DataFrame(data) + + def get_dividend_history(self, symbol: str) -> pd.DataFrame: + """Get dividend history. + + Args: + symbol: ETF ticker symbol + + Returns: + DataFrame with dividend history + """ + if not self._validate_symbol(symbol): + return pd.DataFrame() + + try: + ticker = self._get_ticker(symbol) + if not ticker: + return pd.DataFrame() + + dividends = ticker.dividends + return pd.DataFrame(dividends).reset_index() + + except Exception as e: + self.logger.error(f"Error getting dividend history: {str(e)}") + return pd.DataFrame() + + def get_sector_weightings(self, symbol: str) -> Dict: + """Get sector weightings. + + Args: + symbol: ETF ticker symbol + + Returns: + Dictionary with sector weightings + """ + if not self._validate_symbol(symbol): + return self._handle_error(ValueError(f"Invalid symbol: {symbol}")) + + try: + ticker = self._get_ticker(symbol) + if not ticker: + return self._handle_error(ValueError(f"Invalid symbol: {symbol}")) + + info = ticker.info + return { + 'sector_weightings': info.get('sectorWeights', {}), + 'asset_allocation': info.get('assetAllocation', {}) + } + + except Exception as e: + self.logger.error(f"Error getting sector weightings: {str(e)}") + return self._handle_error(e) + + def clear_cache(self) -> int: + """Clear expired cache entries. + + Returns: + Number of files cleared + """ + if self.cache_manager: + return self.cache_manager.clear_expired() + return 0 + + def get_cache_stats(self) -> Dict: + """Get cache statistics. + + Returns: + Dictionary with cache statistics + """ + if self.cache_manager: + return self.cache_manager.get_stats() + return {} \ No newline at end of file diff --git a/ETF_Portal/api_client.py b/ETF_Portal/api_client.py new file mode 100644 index 0000000..7c7e37e --- /dev/null +++ b/ETF_Portal/api_client.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 +""" +API Client for ETF Portal + +Handles API calls with caching, logging, and performance monitoring. +""" + +import time +import psutil +import requests +import os +from typing import Any, Dict, Optional, Tuple, List +from datetime import datetime +from .cache_manager import cache_manager +from .logging_config import api_logger, portfolio_logger, performance_logger + +class APIClient: + """Manages API calls with caching and monitoring.""" + + def __init__(self, base_url: str, api_key: Optional[str] = None): + """ + Initialize API client. + + Args: + base_url: Base URL for API endpoints + api_key: Optional API key for authentication + """ + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.session = requests.Session() + + # Initialize performance metrics + self._init_performance_metrics() + + api_logger.info(f"Initialized API client for {base_url}") + + def _init_performance_metrics(self): + """Initialize performance tracking metrics.""" + self.metrics = { + 'api_calls': 0, + 'cache_hits': 0, + 'cache_misses': 0, + 'errors': 0, + 'total_response_time': 0, + 'start_time': time.time() + } + + def _log_performance_metrics(self): + """Log current performance metrics.""" + current_time = time.time() + uptime = current_time - self.metrics['start_time'] + + # Calculate averages + avg_response_time = (self.metrics['total_response_time'] / + self.metrics['api_calls']) if self.metrics['api_calls'] > 0 else 0 + + # Calculate cache hit rate + total_cache_ops = self.metrics['cache_hits'] + self.metrics['cache_misses'] + cache_hit_rate = (self.metrics['cache_hits'] / total_cache_ops * 100 + if total_cache_ops > 0 else 0) + + # Get memory usage + process = psutil.Process() + memory_info = process.memory_info() + + metrics = { + 'uptime_seconds': uptime, + 'api_calls': self.metrics['api_calls'], + 'cache_hits': self.metrics['cache_hits'], + 'cache_misses': self.metrics['cache_misses'], + 'cache_hit_rate': cache_hit_rate, + 'avg_response_time': avg_response_time, + 'errors': self.metrics['errors'], + 'memory_usage_mb': memory_info.rss / (1024 * 1024) + } + + performance_logger.log_performance_metric( + 'api_performance', + time.time(), + 'timestamp', + metrics + ) + + return metrics + + def _handle_error(self, error: Exception, context: Dict[str, Any]): + """Handle and log API errors.""" + self.metrics['errors'] += 1 + + error_info = { + 'error_type': type(error).__name__, + 'error_message': str(error), + 'context': context, + 'timestamp': datetime.now().isoformat() + } + + api_logger.error(f"API Error: {error_info}") + return error_info + + def make_request(self, endpoint: str, method: str = 'GET', + params: Optional[Dict] = None, data: Optional[Dict] = None, + source: str = 'api', data_type: str = 'response') -> Tuple[bool, Any]: + """ + Make API request with caching and logging. + + Args: + endpoint: API endpoint + method: HTTP method + params: Query parameters + data: Request body + source: Data source identifier + data_type: Type of data being requested + + Returns: + Tuple of (success, data) + """ + start_time = time.time() + request_id = f"{source}_{endpoint}_{datetime.now().strftime('%Y%m%d%H%M%S')}" + + # Log request start + api_logger.log_api_call( + endpoint=endpoint, + method=method, + params=params + ) + + try: + # Check cache first + cache_key = f"{source}_{endpoint}_{data_type}" + cache_hit, cached_data = cache_manager.load(source, endpoint, data_type) + + if cache_hit: + self.metrics['cache_hits'] += 1 + duration = time.time() - start_time + + api_logger.info(f"Cache hit for {cache_key}") + performance_logger.log_performance_metric( + 'cache_hit', + duration, + 'seconds', + {'request_id': request_id, 'cache_key': cache_key} + ) + + return True, cached_data + + self.metrics['cache_misses'] += 1 + + # Make API call + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + # Add API key to params if it exists + if self.api_key: + if params is None: + params = {} + params['apikey'] = self.api_key + api_logger.info(f"Added API key to request: {self.api_key[:4]}...") + else: + api_logger.warning("No API key available for request") + + api_logger.info(f"Making request to {url} with params: {params}") + + response = self.session.request( + method=method, + url=url, + params=params, + json=data + ) + response.raise_for_status() + + # Process response + response_data = response.json() + duration = time.time() - start_time + + # Update metrics + self.metrics['api_calls'] += 1 + self.metrics['total_response_time'] += duration + + # Save to cache + cache_manager.save(source, endpoint, data_type, response_data) + + # Log success + api_logger.log_api_call( + endpoint=endpoint, + method=method, + params=params, + response_time=duration, + status_code=response.status_code + ) + + performance_logger.log_performance_metric( + 'api_response', + duration, + 'seconds', + { + 'request_id': request_id, + 'endpoint': endpoint, + 'status_code': response.status_code + } + ) + + return True, response_data + + except requests.exceptions.RequestException as e: + error_info = self._handle_error(e, { + 'endpoint': endpoint, + 'method': method, + 'params': params, + 'request_id': request_id + }) + return False, error_info + + except Exception as e: + error_info = self._handle_error(e, { + 'endpoint': endpoint, + 'method': method, + 'params': params, + 'request_id': request_id + }) + return False, error_info + + def portfolio_operation(self, operation_type: str, input_data: Dict[str, Any]) -> Tuple[bool, Any]: + """ + Execute portfolio operation with logging and monitoring. + + Args: + operation_type: Type of portfolio operation + input_data: Input parameters for the operation + + Returns: + Tuple of (success, result) + """ + start_time = time.time() + operation_id = f"{operation_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}" + + # Log operation start + portfolio_logger.log_portfolio_calculation( + calculation_type=operation_type, + input_data=input_data + ) + + try: + # Track memory usage before operation + process = psutil.Process() + memory_before = process.memory_info().rss + + # Execute operation steps + steps = [] + current_step = 1 + + # Example operation steps (replace with actual implementation) + for step_name in ['validation', 'calculation', 'optimization']: + step_start = time.time() + + # Log step start + portfolio_logger.info(f"Step {current_step}: {step_name}") + + # Execute step (replace with actual step implementation) + time.sleep(0.1) # Simulated step execution + + step_duration = time.time() - step_start + steps.append({ + 'step': current_step, + 'name': step_name, + 'duration': step_duration + }) + + current_step += 1 + + # Calculate final result + result = { + 'operation_id': operation_id, + 'steps': steps, + 'input_data': input_data + } + + # Track memory usage after operation + memory_after = process.memory_info().rss + memory_used = (memory_after - memory_before) / (1024 * 1024) # MB + + # Log operation completion + duration = time.time() - start_time + portfolio_logger.log_portfolio_calculation( + calculation_type=operation_type, + input_data=input_data, + output_data=result, + duration=duration + ) + + # Log performance metrics + performance_logger.log_performance_metric( + 'portfolio_operation', + duration, + 'seconds', + { + 'operation_id': operation_id, + 'operation_type': operation_type, + 'memory_used_mb': memory_used, + 'steps': len(steps) + } + ) + + return True, result + + except Exception as e: + error_info = self._handle_error(e, { + 'operation_type': operation_type, + 'input_data': input_data, + 'operation_id': operation_id + }) + return False, error_info + + def get_performance_metrics(self) -> Dict[str, Any]: + """Get current performance metrics.""" + return self._log_performance_metrics() + + def get_profile(self, ticker: str) -> Optional[List[Dict]]: + """Get ETF profile data.""" + success, data = self.make_request( + endpoint=f"profile/{ticker}", + source="fmp", + data_type="profile" + ) + return data if success else None + + def get_historical_data(self, ticker: str, timeframe: str = "1d") -> Optional[Dict]: + """Get historical price data.""" + success, data = self.make_request( + endpoint=f"historical-price-full/{ticker}", + params={"timeseries": timeframe}, + source="fmp", + data_type="historical" + ) + return data if success else None + + def get_dividend_history(self, ticker: str) -> Optional[Dict]: + """Get dividend history data.""" + success, data = self.make_request( + endpoint=f"historical-price-full/stock_dividend/{ticker}", + source="fmp", + data_type="dividend_history" + ) + return data if success else None + + def get_holdings(self, ticker: str) -> Optional[Dict]: + """Get ETF holdings data.""" + success, data = self.make_request( + endpoint=f"etf-holdings/{ticker}", + source="fmp", + data_type="holdings" + ) + return data if success else None + + def get_data(self, source: str, ticker: str, data_type: str, endpoint: str, + params: Dict = None, force_refresh: bool = False) -> Any: + """Generic method to get data from any source.""" + if params is None: + params = {} + + success, data = self.make_request( + endpoint=endpoint, + params=params, + source=source, + data_type=data_type + ) + return data if success else None + +# Create a singleton instance +api_client = APIClient(base_url="https://financialmodelingprep.com/api/v3", api_key=os.getenv('FMP_API_KEY', '')) \ No newline at end of file diff --git a/ETF_Portal/cache_manager.py b/ETF_Portal/cache_manager.py new file mode 100644 index 0000000..7003374 --- /dev/null +++ b/ETF_Portal/cache_manager.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +Cache Manager for ETF Portal + +Handles caching of API responses to reduce API calls and improve response times. +Implements a time-based cache expiration system with detailed logging. +""" + +import os +import json +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, Optional, Tuple, List +import hashlib +import threading +from concurrent.futures import ThreadPoolExecutor +from .logging_config import cache_logger, performance_logger +from dataclasses import dataclass, asdict + +# Constants +CACHE_DIR = Path("cache") +DEFAULT_CACHE_DURATION = 24 * 60 * 60 # 24 hours in seconds + +@dataclass +class CacheStats: + """Cache statistics tracking.""" + hits: int = 0 + misses: int = 0 + total_size: int = 0 + last_cleared: Optional[datetime] = None + errors: int = 0 + +class CacheManager: + """Manages caching operations for the ETF Portal.""" + + def __init__(self, cache_dir: str = "cache", cache_duration: int = DEFAULT_CACHE_DURATION): + """ + Initialize the cache manager. + + Args: + cache_dir: Directory to store cache files + cache_duration: Cache duration in seconds (24 hours by default) + """ + # Use absolute path for cache directory + self.cache_dir = Path(os.path.abspath(cache_dir)) + self.cache_duration = cache_duration + self.stats = CacheStats() + self._lock = threading.Lock() + + # Create cache directory if it doesn't exist + self.cache_dir.mkdir(parents=True, exist_ok=True) + cache_logger.info(f"Cache directory: {self.cache_dir}") + cache_logger.info(f"Cache duration: {cache_duration} seconds") + + # Load or initialize stats + self._load_stats() + + # Log initialization complete + cache_logger.info("Cache manager initialized successfully") + performance_logger.log_performance_metric( + "cache_init", + time.time(), + "timestamp", + {"cache_duration": cache_duration} + ) + + def _get_cache_path(self, source: str, ticker: str, data_type: str) -> Path: + """ + Generate cache file path. + + Args: + source: Data source (e.g., 'fmp', 'yahoo') + ticker: Stock/ETF ticker + data_type: Type of data (e.g., 'profile', 'historical') + + Returns: + Path object for the cache file + """ + # Create filename in format: {source}_{ticker}_{data_type}.json + filename = f"{source}_{ticker}_{data_type}.json" + cache_path = self.cache_dir / filename + cache_logger.debug(f"Cache path: {cache_path}") + return cache_path + + def _load_stats(self) -> None: + """Load cache statistics from disk.""" + stats_file = self.cache_dir / "cache_stats.json" + if stats_file.exists(): + try: + with open(stats_file, 'r') as f: + data = json.load(f) + self.stats = CacheStats(**data) + if self.stats.last_cleared: + self.stats.last_cleared = datetime.fromisoformat(self.stats.last_cleared) + cache_logger.info(f"Loaded cache stats: {asdict(self.stats)}") + except Exception as e: + cache_logger.error(f"Error loading cache stats: {e}") + self.stats = CacheStats() + self.stats.errors += 1 + + def _save_stats(self) -> None: + """Save cache statistics to disk.""" + stats_file = self.cache_dir / "cache_stats.json" + try: + with open(stats_file, 'w') as f: + stats_dict = asdict(self.stats) + if stats_dict['last_cleared']: + stats_dict['last_cleared'] = stats_dict['last_cleared'].isoformat() + json.dump(stats_dict, f, indent=2) + cache_logger.debug(f"Saved cache stats: {stats_dict}") + except Exception as e: + cache_logger.error(f"Error saving cache stats: {e}") + self.stats.errors += 1 + + def save(self, source: str, ticker: str, data_type: str, data: Any) -> bool: + """ + Save data to cache. + + Args: + source: Data source + ticker: Stock/ETF ticker + data_type: Type of data + data: Data to cache + + Returns: + True if save was successful, False otherwise + """ + with self._lock: + try: + start_time = time.time() + cache_path = self._get_cache_path(source, ticker, data_type) + + # Prepare cache data with timestamp + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'source': source, + 'ticker': ticker, + 'type': data_type, + 'data': data + } + + # Save to cache file + with open(cache_path, 'w') as f: + json.dump(cache_data, f, indent=2) + + # Update stats + file_size = os.path.getsize(cache_path) + self.stats.total_size += file_size + self._save_stats() + + duration = time.time() - start_time + cache_logger.log_cache_operation( + "save", + f"{source}/{ticker}/{data_type}", + size=file_size + ) + performance_logger.log_performance_metric( + "cache_save", + duration, + "seconds", + {"source": source, "ticker": ticker, "type": data_type} + ) + return True + + except Exception as e: + cache_logger.error(f"Error saving to cache: {e}") + self.stats.errors += 1 + return False + + def load(self, source: str, ticker: str, data_type: str) -> Tuple[bool, Optional[Any]]: + """ + Load data from cache if valid. + + Args: + source: Data source + ticker: Stock/ETF ticker + data_type: Type of data + + Returns: + Tuple of (is_valid, data) + """ + with self._lock: + start_time = time.time() + cache_path = self._get_cache_path(source, ticker, data_type) + + if not cache_path.exists(): + cache_logger.log_cache_operation( + "load", + f"{source}/{ticker}/{data_type}", + hit=False + ) + self.stats.misses += 1 + self._save_stats() + return False, None + + try: + with open(cache_path, 'r') as f: + cache_data = json.load(f) + + # Check if cache is still valid + timestamp = datetime.fromisoformat(cache_data['timestamp']) + age = datetime.now() - timestamp + + if age.total_seconds() > self.cache_duration: + cache_logger.log_cache_operation( + "load", + f"{source}/{ticker}/{data_type}", + hit=False + ) + self.stats.misses += 1 + self._save_stats() + return False, None + + duration = time.time() - start_time + cache_logger.log_cache_operation( + "load", + f"{source}/{ticker}/{data_type}", + hit=True + ) + performance_logger.log_performance_metric( + "cache_load", + duration, + "seconds", + {"source": source, "ticker": ticker, "type": data_type} + ) + self.stats.hits += 1 + self._save_stats() + return True, cache_data['data'] + + except Exception as e: + cache_logger.error(f"Error loading from cache: {e}") + self.stats.misses += 1 + self.stats.errors += 1 + self._save_stats() + return False, None + + def clear_expired(self) -> None: + """Remove expired cache files.""" + with self._lock: + try: + cleared_count = 0 + for cache_file in self.cache_dir.glob("*.json"): + if cache_file.name == "cache_stats.json": + continue + + try: + with open(cache_file, 'r') as f: + cache_data = json.load(f) + + timestamp = datetime.fromisoformat(cache_data['timestamp']) + age = datetime.now() - timestamp + + if age.total_seconds() > self.cache_duration: + self.stats.total_size -= os.path.getsize(cache_file) + cache_file.unlink() + cleared_count += 1 + cache_logger.debug(f"Removed expired cache: {cache_file}") + + except Exception as e: + cache_logger.error(f"Error processing cache file {cache_file}: {e}") + self.stats.errors += 1 + + if cleared_count > 0: + cache_logger.info(f"Cleared {cleared_count} expired cache files") + self.stats.last_cleared = datetime.now() + self._save_stats() + + except Exception as e: + cache_logger.error(f"Error clearing expired cache: {e}") + self.stats.errors += 1 + + def get_stats(self) -> Dict[str, Any]: + """ + Get cache statistics. + + Returns: + Dictionary containing cache statistics + """ + with self._lock: + stats = asdict(self.stats) + if stats['last_cleared']: + stats['last_cleared'] = stats['last_cleared'].isoformat() + + # Add additional stats + stats['cache_files'] = len(list(self.cache_dir.glob("*.json"))) - 1 # Exclude stats file + stats['hit_rate'] = (self.stats.hits / (self.stats.hits + self.stats.misses)) if (self.stats.hits + self.stats.misses) > 0 else 0 + stats['total_size_mb'] = self.stats.total_size / (1024 * 1024) + + return stats + + def clear_all(self) -> None: + """Clear all cache files.""" + with self._lock: + try: + for cache_file in self.cache_dir.glob("*.json"): + if cache_file.name == "cache_stats.json": + continue + cache_file.unlink() + + self.stats = CacheStats() + self._save_stats() + cache_logger.info("Cleared all cache files") + + except Exception as e: + cache_logger.error(f"Error clearing all cache: {e}") + self.stats.errors += 1 + +# Create a singleton instance +cache_manager = CacheManager() \ No newline at end of file diff --git a/ETF_Portal/cache_simulation.py b/ETF_Portal/cache_simulation.py new file mode 100644 index 0000000..4340b91 --- /dev/null +++ b/ETF_Portal/cache_simulation.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Cache System Simulation + +Demonstrates how the cache system works with API calls and cached data. +""" + +import time +from ETF_Portal.cache_manager import cache_manager +import logging + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def simulate_api_call(source: str, ticker: str, data_type: str) -> dict: + """ + Simulate an API call with a delay. + + Args: + source: Data source (e.g., 'fmp', 'yahoo') + ticker: Stock/ETF ticker + data_type: Type of data (e.g., 'profile', 'historical') + + Returns: + Simulated API response data + """ + logger.info(f"Making API call to {source} for {ticker} {data_type}") + time.sleep(1) # Simulate API delay + return { + "ticker": ticker, + "source": source, + "type": data_type, + "data": f"Simulated data for {ticker} from {source}", + "timestamp": time.time() + } + +def get_data(source: str, ticker: str, data_type: str) -> dict: + """ + Get data either from cache or API. + + Args: + source: Data source + ticker: Stock/ETF ticker + data_type: Type of data + + Returns: + Data from either cache or API + """ + # Try to load from cache first + is_valid, cached_data = cache_manager.load(source, ticker, data_type) + + if is_valid: + logger.info(f"Cache HIT: Found valid data for {ticker} in cache") + return cached_data + + # If not in cache or expired, fetch from API + logger.info(f"Cache MISS: Fetching data for {ticker} from API") + data = simulate_api_call(source, ticker, data_type) + + # Save to cache + cache_manager.save(source, ticker, data_type, data) + return data + +def run_simulation(): + """Run a simulation of the cache system.""" + # First request - should be a cache miss + logger.info("\n=== First Request ===") + data1 = get_data('fmp', 'SPY', 'profile') + print(f"Data received: {data1}") + + # Second request - should be a cache hit + logger.info("\n=== Second Request ===") + data2 = get_data('fmp', 'SPY', 'profile') + print(f"Data received: {data2}") + + # Request different data - should be a cache miss + logger.info("\n=== Different Data Request ===") + data3 = get_data('fmp', 'QQQ', 'profile') + print(f"Data received: {data3}") + + # Show cache statistics + logger.info("\n=== Cache Statistics ===") + stats = cache_manager.get_stats() + print(f"Cache hits: {stats['hits']}") + print(f"Cache misses: {stats['misses']}") + print(f"Hit rate: {stats['hit_rate']:.2%}") + print(f"Total cache size: {stats['total_size']} bytes") + print(f"Number of cache files: {stats['cache_files']}") + +if __name__ == "__main__": + run_simulation() \ No newline at end of file diff --git a/ETF_Portal/logging_config.py b/ETF_Portal/logging_config.py new file mode 100644 index 0000000..3b5faa2 --- /dev/null +++ b/ETF_Portal/logging_config.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +Enhanced Logging Configuration for ETF Portal + +Provides centralized logging configuration with component-specific logging, +log rotation, and structured logging formats. +""" + +import logging +import logging.handlers +from pathlib import Path +from datetime import datetime +from typing import Optional, Dict, Any +import json +import sys +import os + +# Constants +LOGS_DIR = Path("logs") +MAX_LOG_SIZE = 10 * 1024 * 1024 # 10MB +BACKUP_COUNT = 5 + +# Log formats +DETAILED_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' +SIMPLE_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' + +# Component configurations +COMPONENTS = { + 'api': { + 'name': 'API', + 'log_level': logging.INFO, + 'file_prefix': 'api', + 'categories': ['requests', 'responses', 'errors'] + }, + 'cache': { + 'name': 'Cache', + 'log_level': logging.INFO, + 'file_prefix': 'cache', + 'categories': ['hits', 'misses', 'cleanup'] + }, + 'portfolio': { + 'name': 'Portfolio', + 'log_level': logging.INFO, + 'file_prefix': 'portfolio', + 'categories': ['calculations', 'allocations', 'optimizations'] + }, + 'performance': { + 'name': 'Performance', + 'log_level': logging.INFO, + 'file_prefix': 'performance', + 'categories': ['response_times', 'resource_usage'] + } +} + +class ComponentLogger: + """Manages logging for a specific component.""" + + def __init__(self, component: str): + """ + Initialize component logger. + + Args: + component: Component name (must be in COMPONENTS) + """ + if component not in COMPONENTS: + raise ValueError(f"Unknown component: {component}") + + self.component = component + self.config = COMPONENTS[component] + self.logger = logging.getLogger(f"etf_portal.{component}") + self.logger.setLevel(self.config['log_level']) + + # Create component-specific log directory + self.log_dir = LOGS_DIR / component + self.log_dir.mkdir(parents=True, exist_ok=True) + + # Setup handlers + self._setup_handlers() + + # Log initialization + self.logger.info(f"Initialized {self.config['name']} logger") + + def _setup_handlers(self): + """Setup logging handlers for the component.""" + # Remove any existing handlers + self.logger.handlers = [] + + # Create formatters + detailed_formatter = logging.Formatter(DETAILED_FORMAT) + simple_formatter = logging.Formatter(SIMPLE_FORMAT) + + # Main log file with rotation + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + log_file = self.log_dir / f"{self.config['file_prefix']}_{timestamp}.log" + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=MAX_LOG_SIZE, + backupCount=BACKUP_COUNT + ) + file_handler.setFormatter(detailed_formatter) + file_handler.setLevel(logging.DEBUG) + self.logger.addHandler(file_handler) + + # Error log file + error_file = self.log_dir / f"{self.config['file_prefix']}_error_{timestamp}.log" + error_handler = logging.handlers.RotatingFileHandler( + error_file, + maxBytes=MAX_LOG_SIZE, + backupCount=BACKUP_COUNT + ) + error_handler.setFormatter(detailed_formatter) + error_handler.setLevel(logging.ERROR) + self.logger.addHandler(error_handler) + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(simple_formatter) + console_handler.setLevel(logging.INFO) + self.logger.addHandler(console_handler) + + def log_api_call(self, endpoint: str, method: str, params: Dict = None, + response_time: float = None, status_code: int = None): + """Log API call details.""" + log_data = { + 'endpoint': endpoint, + 'method': method, + 'params': params, + 'response_time': response_time, + 'status_code': status_code + } + self.logger.info(f"API Call: {json.dumps(log_data)}") + + def log_cache_operation(self, operation: str, key: str, hit: bool = None, + size: int = None, ttl: int = None): + """Log cache operation details.""" + log_data = { + 'operation': operation, + 'key': key, + 'hit': hit, + 'size': size, + 'ttl': ttl + } + self.logger.info(f"Cache Operation: {json.dumps(log_data)}") + + def log_portfolio_calculation(self, calculation_type: str, + input_data: Dict = None, + output_data: Dict = None, + duration: float = None): + """Log portfolio calculation details.""" + log_data = { + 'type': calculation_type, + 'input': input_data, + 'output': output_data, + 'duration': duration + } + self.logger.info(f"Portfolio Calculation: {json.dumps(log_data)}") + + def log_performance_metric(self, metric_name: str, value: float, + unit: str = None, context: Dict = None): + """Log performance metric details.""" + log_data = { + 'metric': metric_name, + 'value': value, + 'unit': unit, + 'context': context, + 'timestamp': datetime.now().isoformat() + } + self.logger.info(f"Performance Metric: {json.dumps(log_data)}") + +def setup_logging(): + """Initialize the logging system.""" + # Create logs directory + LOGS_DIR.mkdir(parents=True, exist_ok=True) + + # Create component loggers + loggers = {} + for component in COMPONENTS: + loggers[component] = ComponentLogger(component) + + return loggers + +# Create loggers for all components +loggers = setup_logging() + +# Export component loggers +api_logger = loggers['api'].logger +cache_logger = loggers['cache'].logger +portfolio_logger = loggers['portfolio'].logger +performance_logger = loggers['performance'].logger + +# Add performance metric logging methods to loggers +def add_performance_logging(logger): + """Add performance metric logging methods to a logger.""" + logger.log_performance_metric = lambda metric_name, value, unit=None, context=None: \ + loggers['performance'].log_performance_metric(metric_name, value, unit, context) + +# Add performance logging to all loggers +for logger in [api_logger, cache_logger, portfolio_logger, performance_logger]: + add_performance_logging(logger) \ No newline at end of file diff --git a/ETF_Portal/test_cache.py b/ETF_Portal/test_cache.py new file mode 100644 index 0000000..b75cd77 --- /dev/null +++ b/ETF_Portal/test_cache.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +Cache System Test + +Tests the caching behavior with consecutive API calls. +""" + +import logging +import time +from api_client import APIClient + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def run_test(api_key: str): + """Run cache test with consecutive API calls.""" + # Initialize API client + api_client = APIClient(api_key=api_key) + + # Test data + test_tickers = ["SPY", "QQQ", "VTI"] + + logger.info("\n=== First Run (Should make API calls) ===") + for ticker in test_tickers: + logger.info(f"\nTesting {ticker}:") + + # Get profile + start_time = time.time() + profile = api_client.get_profile(ticker) + duration = time.time() - start_time + logger.info(f"Profile data received in {duration:.2f}s") + + # Get historical data + start_time = time.time() + historical = api_client.get_historical_data(ticker, timeframe="1d") + duration = time.time() - start_time + logger.info(f"Historical data received in {duration:.2f}s") + + # Get holdings + start_time = time.time() + holdings = api_client.get_holdings(ticker) + duration = time.time() - start_time + logger.info(f"Holdings data received in {duration:.2f}s") + + # Wait a moment between runs + time.sleep(2) + + logger.info("\n=== Second Run (Should use cache) ===") + for ticker in test_tickers: + logger.info(f"\nTesting {ticker}:") + + # Get profile + start_time = time.time() + profile = api_client.get_profile(ticker) + duration = time.time() - start_time + logger.info(f"Profile data received in {duration:.2f}s") + + # Get historical data + start_time = time.time() + historical = api_client.get_historical_data(ticker, timeframe="1d") + duration = time.time() - start_time + logger.info(f"Historical data received in {duration:.2f}s") + + # Get holdings + start_time = time.time() + holdings = api_client.get_holdings(ticker) + duration = time.time() - start_time + logger.info(f"Holdings data received in {duration:.2f}s") + + # Get cache statistics + from cache_manager import cache_manager + stats = cache_manager.get_stats() + logger.info("\n=== Cache Statistics ===") + logger.info(f"Cache hits: {stats['hits']}") + logger.info(f"Cache misses: {stats['misses']}") + logger.info(f"Hit rate: {stats['hit_rate']:.2%}") + logger.info(f"Total cache size: {stats['total_size']} bytes") + logger.info(f"Number of cache files: {stats['cache_files']}") + +if __name__ == "__main__": + # Use your FMP API key + API_KEY = "fmp_live_8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c" # Replace with your actual API key + run_test(API_KEY) \ No newline at end of file diff --git a/ETF_Portal/tests/test_api_config.py b/ETF_Portal/tests/test_api_config.py new file mode 100644 index 0000000..3e9b44c --- /dev/null +++ b/ETF_Portal/tests/test_api_config.py @@ -0,0 +1,68 @@ +import os +import logging +from ..api import APIFactory +import pandas as pd + +def test_api_configuration(): + """Test the API configuration and secrets.""" + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + try: + # Initialize API factory + api_factory = APIFactory() + + # Test FMP client + logger.info("Testing FMP client...") + fmp_client = api_factory.get_client('fmp') + + # Test with a known ETF + test_symbol = "SPY" + + # Test profile data + logger.info(f"Getting profile data for {test_symbol}...") + profile = fmp_client.get_etf_profile(test_symbol) + if isinstance(profile, (dict, list)) and (isinstance(profile, dict) and not profile.get('error') or isinstance(profile, list) and len(profile) > 0): + logger.info("✅ Profile data retrieved successfully") + else: + logger.error("❌ Failed to get profile data") + if isinstance(profile, dict): + logger.error(f"Error: {profile.get('message', 'Unknown error')}") + else: + logger.error(f"Error: Unexpected response type: {type(profile)}") + + # Test historical data + logger.info(f"Getting historical data for {test_symbol}...") + historical = fmp_client.get_historical_data(test_symbol, period='1mo') + if isinstance(historical, pd.DataFrame) and not historical.empty: + logger.info("✅ Historical data retrieved successfully") + logger.info(f"Data points: {len(historical)}") + else: + logger.error("❌ Failed to get historical data") + + # Test cache + logger.info("Testing cache...") + cache_stats = api_factory.get_cache_stats() + logger.info(f"Cache stats: {cache_stats}") + + # Test fallback to yfinance + logger.info("Testing fallback to yfinance...") + yfinance_data = api_factory.get_data(test_symbol, 'etf_profile', provider='yfinance') + if isinstance(yfinance_data, dict) and not yfinance_data.get('error'): + logger.info("✅ YFinance fallback working") + else: + logger.error("❌ YFinance fallback failed") + logger.error(f"Error: {yfinance_data.get('message', 'Unknown error')}") + + return True + + except Exception as e: + logger.error(f"❌ Test failed: {str(e)}") + return False + +if __name__ == "__main__": + success = test_api_configuration() + if success: + print("\n✅ All tests passed!") + else: + print("\n❌ Some tests failed. Check the logs for details.") \ No newline at end of file