#!/usr/bin/env python3 """ Cache Manager for ETF Portal Handles caching of API responses to reduce API calls and improve response times. Implements a time-based cache expiration system with detailed logging. """ import os import json import time from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional, Tuple, List import hashlib import threading from concurrent.futures import ThreadPoolExecutor from .logging_config import cache_logger, performance_logger from dataclasses import dataclass, asdict # Constants CACHE_DIR = Path("cache") DEFAULT_CACHE_DURATION = 24 * 60 * 60 # 24 hours in seconds @dataclass class CacheStats: """Cache statistics tracking.""" hits: int = 0 misses: int = 0 total_size: int = 0 last_cleared: Optional[datetime] = None errors: int = 0 class CacheManager: """Manages caching operations for the ETF Portal.""" def __init__(self, cache_dir: str = "cache", cache_duration: int = DEFAULT_CACHE_DURATION): """ Initialize the cache manager. Args: cache_dir: Directory to store cache files cache_duration: Cache duration in seconds (24 hours by default) """ # Use absolute path for cache directory self.cache_dir = Path(os.path.abspath(cache_dir)) self.cache_duration = cache_duration self.stats = CacheStats() self._lock = threading.Lock() # Create cache directory if it doesn't exist self.cache_dir.mkdir(parents=True, exist_ok=True) cache_logger.info(f"Cache directory: {self.cache_dir}") cache_logger.info(f"Cache duration: {cache_duration} seconds") # Load or initialize stats self._load_stats() # Log initialization complete cache_logger.info("Cache manager initialized successfully") performance_logger.log_performance_metric( "cache_init", time.time(), "timestamp", {"cache_duration": cache_duration} ) def _get_cache_path(self, source: str, ticker: str, data_type: str) -> Path: """ Generate cache file path. Args: source: Data source (e.g., 'fmp', 'yahoo') ticker: Stock/ETF ticker data_type: Type of data (e.g., 'profile', 'historical') Returns: Path object for the cache file """ # Create filename in format: {source}_{ticker}_{data_type}.json filename = f"{source}_{ticker}_{data_type}.json" cache_path = self.cache_dir / filename cache_logger.debug(f"Cache path: {cache_path}") return cache_path def _load_stats(self) -> None: """Load cache statistics from disk.""" stats_file = self.cache_dir / "cache_stats.json" if stats_file.exists(): try: with open(stats_file, 'r') as f: data = json.load(f) self.stats = CacheStats(**data) if self.stats.last_cleared: self.stats.last_cleared = datetime.fromisoformat(self.stats.last_cleared) cache_logger.info(f"Loaded cache stats: {asdict(self.stats)}") except Exception as e: cache_logger.error(f"Error loading cache stats: {e}") self.stats = CacheStats() self.stats.errors += 1 def _save_stats(self) -> None: """Save cache statistics to disk.""" stats_file = self.cache_dir / "cache_stats.json" try: with open(stats_file, 'w') as f: stats_dict = asdict(self.stats) if stats_dict['last_cleared']: stats_dict['last_cleared'] = stats_dict['last_cleared'].isoformat() json.dump(stats_dict, f, indent=2) cache_logger.debug(f"Saved cache stats: {stats_dict}") except Exception as e: cache_logger.error(f"Error saving cache stats: {e}") self.stats.errors += 1 def save(self, source: str, ticker: str, data_type: str, data: Any) -> bool: """ Save data to cache. Args: source: Data source ticker: Stock/ETF ticker data_type: Type of data data: Data to cache Returns: True if save was successful, False otherwise """ with self._lock: try: start_time = time.time() cache_path = self._get_cache_path(source, ticker, data_type) # Prepare cache data with timestamp cache_data = { 'timestamp': datetime.now().isoformat(), 'source': source, 'ticker': ticker, 'type': data_type, 'data': data } # Save to cache file with open(cache_path, 'w') as f: json.dump(cache_data, f, indent=2) # Update stats file_size = os.path.getsize(cache_path) self.stats.total_size += file_size self._save_stats() duration = time.time() - start_time cache_logger.log_cache_operation( "save", f"{source}/{ticker}/{data_type}", size=file_size ) performance_logger.log_performance_metric( "cache_save", duration, "seconds", {"source": source, "ticker": ticker, "type": data_type} ) return True except Exception as e: cache_logger.error(f"Error saving to cache: {e}") self.stats.errors += 1 return False def load(self, source: str, ticker: str, data_type: str) -> Tuple[bool, Optional[Any]]: """ Load data from cache if valid. Args: source: Data source ticker: Stock/ETF ticker data_type: Type of data Returns: Tuple of (is_valid, data) """ with self._lock: start_time = time.time() cache_path = self._get_cache_path(source, ticker, data_type) if not cache_path.exists(): cache_logger.log_cache_operation( "load", f"{source}/{ticker}/{data_type}", hit=False ) self.stats.misses += 1 self._save_stats() return False, None try: with open(cache_path, 'r') as f: cache_data = json.load(f) # Check if cache is still valid timestamp = datetime.fromisoformat(cache_data['timestamp']) age = datetime.now() - timestamp if age.total_seconds() > self.cache_duration: cache_logger.log_cache_operation( "load", f"{source}/{ticker}/{data_type}", hit=False ) self.stats.misses += 1 self._save_stats() return False, None duration = time.time() - start_time cache_logger.log_cache_operation( "load", f"{source}/{ticker}/{data_type}", hit=True ) performance_logger.log_performance_metric( "cache_load", duration, "seconds", {"source": source, "ticker": ticker, "type": data_type} ) self.stats.hits += 1 self._save_stats() return True, cache_data['data'] except Exception as e: cache_logger.error(f"Error loading from cache: {e}") self.stats.misses += 1 self.stats.errors += 1 self._save_stats() return False, None def clear_expired(self) -> None: """Remove expired cache files.""" with self._lock: try: cleared_count = 0 for cache_file in self.cache_dir.glob("*.json"): if cache_file.name == "cache_stats.json": continue try: with open(cache_file, 'r') as f: cache_data = json.load(f) timestamp = datetime.fromisoformat(cache_data['timestamp']) age = datetime.now() - timestamp if age.total_seconds() > self.cache_duration: self.stats.total_size -= os.path.getsize(cache_file) cache_file.unlink() cleared_count += 1 cache_logger.debug(f"Removed expired cache: {cache_file}") except Exception as e: cache_logger.error(f"Error processing cache file {cache_file}: {e}") self.stats.errors += 1 if cleared_count > 0: cache_logger.info(f"Cleared {cleared_count} expired cache files") self.stats.last_cleared = datetime.now() self._save_stats() except Exception as e: cache_logger.error(f"Error clearing expired cache: {e}") self.stats.errors += 1 def get_stats(self) -> Dict[str, Any]: """ Get cache statistics. Returns: Dictionary containing cache statistics """ with self._lock: stats = asdict(self.stats) if stats['last_cleared']: stats['last_cleared'] = stats['last_cleared'].isoformat() # Add additional stats stats['cache_files'] = len(list(self.cache_dir.glob("*.json"))) - 1 # Exclude stats file stats['hit_rate'] = (self.stats.hits / (self.stats.hits + self.stats.misses)) if (self.stats.hits + self.stats.misses) > 0 else 0 stats['total_size_mb'] = self.stats.total_size / (1024 * 1024) return stats def clear_all(self) -> None: """Clear all cache files.""" with self._lock: try: for cache_file in self.cache_dir.glob("*.json"): if cache_file.name == "cache_stats.json": continue cache_file.unlink() self.stats = CacheStats() self._save_stats() cache_logger.info("Cleared all cache files") except Exception as e: cache_logger.error(f"Error clearing all cache: {e}") self.stats.errors += 1 # Create a singleton instance cache_manager = CacheManager()