#!/usr/bin/env python3 """ API Client for ETF Portal Handles API calls with caching, logging, and performance monitoring. """ import time import psutil import requests import os from typing import Any, Dict, Optional, Tuple, List from datetime import datetime from .cache_manager import cache_manager from .logging_config import api_logger, portfolio_logger, performance_logger class APIClient: """Manages API calls with caching and monitoring.""" def __init__(self, base_url: str, api_key: Optional[str] = None): """ Initialize API client. Args: base_url: Base URL for API endpoints api_key: Optional API key for authentication """ self.base_url = base_url.rstrip('/') self.api_key = api_key self.session = requests.Session() # Initialize performance metrics self._init_performance_metrics() api_logger.info(f"Initialized API client for {base_url}") def _init_performance_metrics(self): """Initialize performance tracking metrics.""" self.metrics = { 'api_calls': 0, 'cache_hits': 0, 'cache_misses': 0, 'errors': 0, 'total_response_time': 0, 'start_time': time.time() } def _log_performance_metrics(self): """Log current performance metrics.""" current_time = time.time() uptime = current_time - self.metrics['start_time'] # Calculate averages avg_response_time = (self.metrics['total_response_time'] / self.metrics['api_calls']) if self.metrics['api_calls'] > 0 else 0 # Calculate cache hit rate total_cache_ops = self.metrics['cache_hits'] + self.metrics['cache_misses'] cache_hit_rate = (self.metrics['cache_hits'] / total_cache_ops * 100 if total_cache_ops > 0 else 0) # Get memory usage process = psutil.Process() memory_info = process.memory_info() metrics = { 'uptime_seconds': uptime, 'api_calls': self.metrics['api_calls'], 'cache_hits': self.metrics['cache_hits'], 'cache_misses': self.metrics['cache_misses'], 'cache_hit_rate': cache_hit_rate, 'avg_response_time': avg_response_time, 'errors': self.metrics['errors'], 'memory_usage_mb': memory_info.rss / (1024 * 1024) } performance_logger.log_performance_metric( 'api_performance', time.time(), 'timestamp', metrics ) return metrics def _handle_error(self, error: Exception, context: Dict[str, Any]): """Handle and log API errors.""" self.metrics['errors'] += 1 error_info = { 'error_type': type(error).__name__, 'error_message': str(error), 'context': context, 'timestamp': datetime.now().isoformat() } api_logger.error(f"API Error: {error_info}") return error_info def make_request(self, endpoint: str, method: str = 'GET', params: Optional[Dict] = None, data: Optional[Dict] = None, source: str = 'api', data_type: str = 'response') -> Tuple[bool, Any]: """ Make API request with caching and logging. Args: endpoint: API endpoint method: HTTP method params: Query parameters data: Request body source: Data source identifier data_type: Type of data being requested Returns: Tuple of (success, data) """ start_time = time.time() request_id = f"{source}_{endpoint}_{datetime.now().strftime('%Y%m%d%H%M%S')}" # Log request start api_logger.log_api_call( endpoint=endpoint, method=method, params=params ) try: # Check cache first cache_key = f"{source}_{endpoint}_{data_type}" cache_hit, cached_data = cache_manager.load(source, endpoint, data_type) if cache_hit: self.metrics['cache_hits'] += 1 duration = time.time() - start_time api_logger.info(f"Cache hit for {cache_key}") performance_logger.log_performance_metric( 'cache_hit', duration, 'seconds', {'request_id': request_id, 'cache_key': cache_key} ) return True, cached_data self.metrics['cache_misses'] += 1 # Make API call url = f"{self.base_url}/{endpoint.lstrip('/')}" # Add API key to params if it exists if self.api_key: if params is None: params = {} params['apikey'] = self.api_key api_logger.info(f"Added API key to request: {self.api_key[:4]}...") else: api_logger.warning("No API key available for request") api_logger.info(f"Making request to {url} with params: {params}") response = self.session.request( method=method, url=url, params=params, json=data ) response.raise_for_status() # Process response response_data = response.json() duration = time.time() - start_time # Update metrics self.metrics['api_calls'] += 1 self.metrics['total_response_time'] += duration # Save to cache cache_manager.save(source, endpoint, data_type, response_data) # Log success api_logger.log_api_call( endpoint=endpoint, method=method, params=params, response_time=duration, status_code=response.status_code ) performance_logger.log_performance_metric( 'api_response', duration, 'seconds', { 'request_id': request_id, 'endpoint': endpoint, 'status_code': response.status_code } ) return True, response_data except requests.exceptions.RequestException as e: error_info = self._handle_error(e, { 'endpoint': endpoint, 'method': method, 'params': params, 'request_id': request_id }) return False, error_info except Exception as e: error_info = self._handle_error(e, { 'endpoint': endpoint, 'method': method, 'params': params, 'request_id': request_id }) return False, error_info def portfolio_operation(self, operation_type: str, input_data: Dict[str, Any]) -> Tuple[bool, Any]: """ Execute portfolio operation with logging and monitoring. Args: operation_type: Type of portfolio operation input_data: Input parameters for the operation Returns: Tuple of (success, result) """ start_time = time.time() operation_id = f"{operation_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}" # Log operation start portfolio_logger.log_portfolio_calculation( calculation_type=operation_type, input_data=input_data ) try: # Track memory usage before operation process = psutil.Process() memory_before = process.memory_info().rss # Execute operation steps steps = [] current_step = 1 # Example operation steps (replace with actual implementation) for step_name in ['validation', 'calculation', 'optimization']: step_start = time.time() # Log step start portfolio_logger.info(f"Step {current_step}: {step_name}") # Execute step (replace with actual step implementation) time.sleep(0.1) # Simulated step execution step_duration = time.time() - step_start steps.append({ 'step': current_step, 'name': step_name, 'duration': step_duration }) current_step += 1 # Calculate final result result = { 'operation_id': operation_id, 'steps': steps, 'input_data': input_data } # Track memory usage after operation memory_after = process.memory_info().rss memory_used = (memory_after - memory_before) / (1024 * 1024) # MB # Log operation completion duration = time.time() - start_time portfolio_logger.log_portfolio_calculation( calculation_type=operation_type, input_data=input_data, output_data=result, duration=duration ) # Log performance metrics performance_logger.log_performance_metric( 'portfolio_operation', duration, 'seconds', { 'operation_id': operation_id, 'operation_type': operation_type, 'memory_used_mb': memory_used, 'steps': len(steps) } ) return True, result except Exception as e: error_info = self._handle_error(e, { 'operation_type': operation_type, 'input_data': input_data, 'operation_id': operation_id }) return False, error_info def get_performance_metrics(self) -> Dict[str, Any]: """Get current performance metrics.""" return self._log_performance_metrics() def get_profile(self, ticker: str) -> Optional[List[Dict]]: """Get ETF profile data.""" success, data = self.make_request( endpoint=f"profile/{ticker}", source="fmp", data_type="profile" ) return data if success else None def get_historical_data(self, ticker: str, timeframe: str = "1d") -> Optional[Dict]: """Get historical price data.""" success, data = self.make_request( endpoint=f"historical-price-full/{ticker}", params={"timeseries": timeframe}, source="fmp", data_type="historical" ) return data if success else None def get_dividend_history(self, ticker: str) -> Optional[Dict]: """Get dividend history data.""" success, data = self.make_request( endpoint=f"historical-price-full/stock_dividend/{ticker}", source="fmp", data_type="dividend_history" ) return data if success else None def get_holdings(self, ticker: str) -> Optional[Dict]: """Get ETF holdings data.""" success, data = self.make_request( endpoint=f"etf-holdings/{ticker}", source="fmp", data_type="holdings" ) return data if success else None def get_data(self, source: str, ticker: str, data_type: str, endpoint: str, params: Dict = None, force_refresh: bool = False) -> Any: """Generic method to get data from any source.""" if params is None: params = {} success, data = self.make_request( endpoint=endpoint, params=params, source=source, data_type=data_type ) return data if success else None # Create a singleton instance api_client = APIClient(base_url="https://financialmodelingprep.com/api/v3", api_key=os.getenv('FMP_API_KEY', ''))