ETF_Suite_Portal/ETF_Portal/api_client.py
Pascal c462342d44 feat: Add API management system with caching support
- Add base API client and cache manager classes

- Implement FMP and YFinance specific clients and cache managers

- Add API factory for managing multiple data providers

- Add test suite for API configuration and caching

- Add logging configuration for API operations
2025-05-27 14:07:32 +02:00

368 lines
12 KiB
Python

#!/usr/bin/env python3
"""
API Client for ETF Portal
Handles API calls with caching, logging, and performance monitoring.
"""
import time
import psutil
import requests
import os
from typing import Any, Dict, Optional, Tuple, List
from datetime import datetime
from .cache_manager import cache_manager
from .logging_config import api_logger, portfolio_logger, performance_logger
class APIClient:
"""Manages API calls with caching and monitoring."""
def __init__(self, base_url: str, api_key: Optional[str] = None):
"""
Initialize API client.
Args:
base_url: Base URL for API endpoints
api_key: Optional API key for authentication
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
# Initialize performance metrics
self._init_performance_metrics()
api_logger.info(f"Initialized API client for {base_url}")
def _init_performance_metrics(self):
"""Initialize performance tracking metrics."""
self.metrics = {
'api_calls': 0,
'cache_hits': 0,
'cache_misses': 0,
'errors': 0,
'total_response_time': 0,
'start_time': time.time()
}
def _log_performance_metrics(self):
"""Log current performance metrics."""
current_time = time.time()
uptime = current_time - self.metrics['start_time']
# Calculate averages
avg_response_time = (self.metrics['total_response_time'] /
self.metrics['api_calls']) if self.metrics['api_calls'] > 0 else 0
# Calculate cache hit rate
total_cache_ops = self.metrics['cache_hits'] + self.metrics['cache_misses']
cache_hit_rate = (self.metrics['cache_hits'] / total_cache_ops * 100
if total_cache_ops > 0 else 0)
# Get memory usage
process = psutil.Process()
memory_info = process.memory_info()
metrics = {
'uptime_seconds': uptime,
'api_calls': self.metrics['api_calls'],
'cache_hits': self.metrics['cache_hits'],
'cache_misses': self.metrics['cache_misses'],
'cache_hit_rate': cache_hit_rate,
'avg_response_time': avg_response_time,
'errors': self.metrics['errors'],
'memory_usage_mb': memory_info.rss / (1024 * 1024)
}
performance_logger.log_performance_metric(
'api_performance',
time.time(),
'timestamp',
metrics
)
return metrics
def _handle_error(self, error: Exception, context: Dict[str, Any]):
"""Handle and log API errors."""
self.metrics['errors'] += 1
error_info = {
'error_type': type(error).__name__,
'error_message': str(error),
'context': context,
'timestamp': datetime.now().isoformat()
}
api_logger.error(f"API Error: {error_info}")
return error_info
def make_request(self, endpoint: str, method: str = 'GET',
params: Optional[Dict] = None, data: Optional[Dict] = None,
source: str = 'api', data_type: str = 'response') -> Tuple[bool, Any]:
"""
Make API request with caching and logging.
Args:
endpoint: API endpoint
method: HTTP method
params: Query parameters
data: Request body
source: Data source identifier
data_type: Type of data being requested
Returns:
Tuple of (success, data)
"""
start_time = time.time()
request_id = f"{source}_{endpoint}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Log request start
api_logger.log_api_call(
endpoint=endpoint,
method=method,
params=params
)
try:
# Check cache first
cache_key = f"{source}_{endpoint}_{data_type}"
cache_hit, cached_data = cache_manager.load(source, endpoint, data_type)
if cache_hit:
self.metrics['cache_hits'] += 1
duration = time.time() - start_time
api_logger.info(f"Cache hit for {cache_key}")
performance_logger.log_performance_metric(
'cache_hit',
duration,
'seconds',
{'request_id': request_id, 'cache_key': cache_key}
)
return True, cached_data
self.metrics['cache_misses'] += 1
# Make API call
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# Add API key to params if it exists
if self.api_key:
if params is None:
params = {}
params['apikey'] = self.api_key
api_logger.info(f"Added API key to request: {self.api_key[:4]}...")
else:
api_logger.warning("No API key available for request")
api_logger.info(f"Making request to {url} with params: {params}")
response = self.session.request(
method=method,
url=url,
params=params,
json=data
)
response.raise_for_status()
# Process response
response_data = response.json()
duration = time.time() - start_time
# Update metrics
self.metrics['api_calls'] += 1
self.metrics['total_response_time'] += duration
# Save to cache
cache_manager.save(source, endpoint, data_type, response_data)
# Log success
api_logger.log_api_call(
endpoint=endpoint,
method=method,
params=params,
response_time=duration,
status_code=response.status_code
)
performance_logger.log_performance_metric(
'api_response',
duration,
'seconds',
{
'request_id': request_id,
'endpoint': endpoint,
'status_code': response.status_code
}
)
return True, response_data
except requests.exceptions.RequestException as e:
error_info = self._handle_error(e, {
'endpoint': endpoint,
'method': method,
'params': params,
'request_id': request_id
})
return False, error_info
except Exception as e:
error_info = self._handle_error(e, {
'endpoint': endpoint,
'method': method,
'params': params,
'request_id': request_id
})
return False, error_info
def portfolio_operation(self, operation_type: str, input_data: Dict[str, Any]) -> Tuple[bool, Any]:
"""
Execute portfolio operation with logging and monitoring.
Args:
operation_type: Type of portfolio operation
input_data: Input parameters for the operation
Returns:
Tuple of (success, result)
"""
start_time = time.time()
operation_id = f"{operation_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Log operation start
portfolio_logger.log_portfolio_calculation(
calculation_type=operation_type,
input_data=input_data
)
try:
# Track memory usage before operation
process = psutil.Process()
memory_before = process.memory_info().rss
# Execute operation steps
steps = []
current_step = 1
# Example operation steps (replace with actual implementation)
for step_name in ['validation', 'calculation', 'optimization']:
step_start = time.time()
# Log step start
portfolio_logger.info(f"Step {current_step}: {step_name}")
# Execute step (replace with actual step implementation)
time.sleep(0.1) # Simulated step execution
step_duration = time.time() - step_start
steps.append({
'step': current_step,
'name': step_name,
'duration': step_duration
})
current_step += 1
# Calculate final result
result = {
'operation_id': operation_id,
'steps': steps,
'input_data': input_data
}
# Track memory usage after operation
memory_after = process.memory_info().rss
memory_used = (memory_after - memory_before) / (1024 * 1024) # MB
# Log operation completion
duration = time.time() - start_time
portfolio_logger.log_portfolio_calculation(
calculation_type=operation_type,
input_data=input_data,
output_data=result,
duration=duration
)
# Log performance metrics
performance_logger.log_performance_metric(
'portfolio_operation',
duration,
'seconds',
{
'operation_id': operation_id,
'operation_type': operation_type,
'memory_used_mb': memory_used,
'steps': len(steps)
}
)
return True, result
except Exception as e:
error_info = self._handle_error(e, {
'operation_type': operation_type,
'input_data': input_data,
'operation_id': operation_id
})
return False, error_info
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get current performance metrics."""
return self._log_performance_metrics()
def get_profile(self, ticker: str) -> Optional[List[Dict]]:
"""Get ETF profile data."""
success, data = self.make_request(
endpoint=f"profile/{ticker}",
source="fmp",
data_type="profile"
)
return data if success else None
def get_historical_data(self, ticker: str, timeframe: str = "1d") -> Optional[Dict]:
"""Get historical price data."""
success, data = self.make_request(
endpoint=f"historical-price-full/{ticker}",
params={"timeseries": timeframe},
source="fmp",
data_type="historical"
)
return data if success else None
def get_dividend_history(self, ticker: str) -> Optional[Dict]:
"""Get dividend history data."""
success, data = self.make_request(
endpoint=f"historical-price-full/stock_dividend/{ticker}",
source="fmp",
data_type="dividend_history"
)
return data if success else None
def get_holdings(self, ticker: str) -> Optional[Dict]:
"""Get ETF holdings data."""
success, data = self.make_request(
endpoint=f"etf-holdings/{ticker}",
source="fmp",
data_type="holdings"
)
return data if success else None
def get_data(self, source: str, ticker: str, data_type: str, endpoint: str,
params: Dict = None, force_refresh: bool = False) -> Any:
"""Generic method to get data from any source."""
if params is None:
params = {}
success, data = self.make_request(
endpoint=endpoint,
params=params,
source=source,
data_type=data_type
)
return data if success else None
# Create a singleton instance
api_client = APIClient(base_url="https://financialmodelingprep.com/api/v3", api_key=os.getenv('FMP_API_KEY', ''))