ETF_Suite_Portal/ETF_Portal/cache_manager.py
Pascal c462342d44 feat: Add API management system with caching support
- Add base API client and cache manager classes

- Implement FMP and YFinance specific clients and cache managers

- Add API factory for managing multiple data providers

- Add test suite for API configuration and caching

- Add logging configuration for API operations
2025-05-27 14:07:32 +02:00

310 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Cache Manager for ETF Portal
Handles caching of API responses to reduce API calls and improve response times.
Implements a time-based cache expiration system with detailed logging.
"""
import os
import json
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, List
import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor
from .logging_config import cache_logger, performance_logger
from dataclasses import dataclass, asdict
# Constants
CACHE_DIR = Path("cache")
DEFAULT_CACHE_DURATION = 24 * 60 * 60 # 24 hours in seconds
@dataclass
class CacheStats:
"""Cache statistics tracking."""
hits: int = 0
misses: int = 0
total_size: int = 0
last_cleared: Optional[datetime] = None
errors: int = 0
class CacheManager:
"""Manages caching operations for the ETF Portal."""
def __init__(self, cache_dir: str = "cache", cache_duration: int = DEFAULT_CACHE_DURATION):
"""
Initialize the cache manager.
Args:
cache_dir: Directory to store cache files
cache_duration: Cache duration in seconds (24 hours by default)
"""
# Use absolute path for cache directory
self.cache_dir = Path(os.path.abspath(cache_dir))
self.cache_duration = cache_duration
self.stats = CacheStats()
self._lock = threading.Lock()
# Create cache directory if it doesn't exist
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_logger.info(f"Cache directory: {self.cache_dir}")
cache_logger.info(f"Cache duration: {cache_duration} seconds")
# Load or initialize stats
self._load_stats()
# Log initialization complete
cache_logger.info("Cache manager initialized successfully")
performance_logger.log_performance_metric(
"cache_init",
time.time(),
"timestamp",
{"cache_duration": cache_duration}
)
def _get_cache_path(self, source: str, ticker: str, data_type: str) -> Path:
"""
Generate cache file path.
Args:
source: Data source (e.g., 'fmp', 'yahoo')
ticker: Stock/ETF ticker
data_type: Type of data (e.g., 'profile', 'historical')
Returns:
Path object for the cache file
"""
# Create filename in format: {source}_{ticker}_{data_type}.json
filename = f"{source}_{ticker}_{data_type}.json"
cache_path = self.cache_dir / filename
cache_logger.debug(f"Cache path: {cache_path}")
return cache_path
def _load_stats(self) -> None:
"""Load cache statistics from disk."""
stats_file = self.cache_dir / "cache_stats.json"
if stats_file.exists():
try:
with open(stats_file, 'r') as f:
data = json.load(f)
self.stats = CacheStats(**data)
if self.stats.last_cleared:
self.stats.last_cleared = datetime.fromisoformat(self.stats.last_cleared)
cache_logger.info(f"Loaded cache stats: {asdict(self.stats)}")
except Exception as e:
cache_logger.error(f"Error loading cache stats: {e}")
self.stats = CacheStats()
self.stats.errors += 1
def _save_stats(self) -> None:
"""Save cache statistics to disk."""
stats_file = self.cache_dir / "cache_stats.json"
try:
with open(stats_file, 'w') as f:
stats_dict = asdict(self.stats)
if stats_dict['last_cleared']:
stats_dict['last_cleared'] = stats_dict['last_cleared'].isoformat()
json.dump(stats_dict, f, indent=2)
cache_logger.debug(f"Saved cache stats: {stats_dict}")
except Exception as e:
cache_logger.error(f"Error saving cache stats: {e}")
self.stats.errors += 1
def save(self, source: str, ticker: str, data_type: str, data: Any) -> bool:
"""
Save data to cache.
Args:
source: Data source
ticker: Stock/ETF ticker
data_type: Type of data
data: Data to cache
Returns:
True if save was successful, False otherwise
"""
with self._lock:
try:
start_time = time.time()
cache_path = self._get_cache_path(source, ticker, data_type)
# Prepare cache data with timestamp
cache_data = {
'timestamp': datetime.now().isoformat(),
'source': source,
'ticker': ticker,
'type': data_type,
'data': data
}
# Save to cache file
with open(cache_path, 'w') as f:
json.dump(cache_data, f, indent=2)
# Update stats
file_size = os.path.getsize(cache_path)
self.stats.total_size += file_size
self._save_stats()
duration = time.time() - start_time
cache_logger.log_cache_operation(
"save",
f"{source}/{ticker}/{data_type}",
size=file_size
)
performance_logger.log_performance_metric(
"cache_save",
duration,
"seconds",
{"source": source, "ticker": ticker, "type": data_type}
)
return True
except Exception as e:
cache_logger.error(f"Error saving to cache: {e}")
self.stats.errors += 1
return False
def load(self, source: str, ticker: str, data_type: str) -> Tuple[bool, Optional[Any]]:
"""
Load data from cache if valid.
Args:
source: Data source
ticker: Stock/ETF ticker
data_type: Type of data
Returns:
Tuple of (is_valid, data)
"""
with self._lock:
start_time = time.time()
cache_path = self._get_cache_path(source, ticker, data_type)
if not cache_path.exists():
cache_logger.log_cache_operation(
"load",
f"{source}/{ticker}/{data_type}",
hit=False
)
self.stats.misses += 1
self._save_stats()
return False, None
try:
with open(cache_path, 'r') as f:
cache_data = json.load(f)
# Check if cache is still valid
timestamp = datetime.fromisoformat(cache_data['timestamp'])
age = datetime.now() - timestamp
if age.total_seconds() > self.cache_duration:
cache_logger.log_cache_operation(
"load",
f"{source}/{ticker}/{data_type}",
hit=False
)
self.stats.misses += 1
self._save_stats()
return False, None
duration = time.time() - start_time
cache_logger.log_cache_operation(
"load",
f"{source}/{ticker}/{data_type}",
hit=True
)
performance_logger.log_performance_metric(
"cache_load",
duration,
"seconds",
{"source": source, "ticker": ticker, "type": data_type}
)
self.stats.hits += 1
self._save_stats()
return True, cache_data['data']
except Exception as e:
cache_logger.error(f"Error loading from cache: {e}")
self.stats.misses += 1
self.stats.errors += 1
self._save_stats()
return False, None
def clear_expired(self) -> None:
"""Remove expired cache files."""
with self._lock:
try:
cleared_count = 0
for cache_file in self.cache_dir.glob("*.json"):
if cache_file.name == "cache_stats.json":
continue
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
timestamp = datetime.fromisoformat(cache_data['timestamp'])
age = datetime.now() - timestamp
if age.total_seconds() > self.cache_duration:
self.stats.total_size -= os.path.getsize(cache_file)
cache_file.unlink()
cleared_count += 1
cache_logger.debug(f"Removed expired cache: {cache_file}")
except Exception as e:
cache_logger.error(f"Error processing cache file {cache_file}: {e}")
self.stats.errors += 1
if cleared_count > 0:
cache_logger.info(f"Cleared {cleared_count} expired cache files")
self.stats.last_cleared = datetime.now()
self._save_stats()
except Exception as e:
cache_logger.error(f"Error clearing expired cache: {e}")
self.stats.errors += 1
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary containing cache statistics
"""
with self._lock:
stats = asdict(self.stats)
if stats['last_cleared']:
stats['last_cleared'] = stats['last_cleared'].isoformat()
# Add additional stats
stats['cache_files'] = len(list(self.cache_dir.glob("*.json"))) - 1 # Exclude stats file
stats['hit_rate'] = (self.stats.hits / (self.stats.hits + self.stats.misses)) if (self.stats.hits + self.stats.misses) > 0 else 0
stats['total_size_mb'] = self.stats.total_size / (1024 * 1024)
return stats
def clear_all(self) -> None:
"""Clear all cache files."""
with self._lock:
try:
for cache_file in self.cache_dir.glob("*.json"):
if cache_file.name == "cache_stats.json":
continue
cache_file.unlink()
self.stats = CacheStats()
self._save_stats()
cache_logger.info("Cleared all cache files")
except Exception as e:
cache_logger.error(f"Error clearing all cache: {e}")
self.stats.errors += 1
# Create a singleton instance
cache_manager = CacheManager()