344 lines
13 KiB
Python
344 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cache Manager for ETF Portal
|
|
|
|
Handles caching of API responses to reduce API calls and improve response times.
|
|
Implements a time-based cache expiration system with detailed logging.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, Tuple, List
|
|
import hashlib
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from .logging_config import cache_logger, performance_logger
|
|
from dataclasses import dataclass, asdict
|
|
|
|
# Constants
|
|
CACHE_DIR = Path("cache")
|
|
DEFAULT_CACHE_DURATION = 24 * 60 * 60 # 24 hours in seconds
|
|
|
|
@dataclass
|
|
class CacheStats:
|
|
"""Cache statistics tracking."""
|
|
hits: int = 0
|
|
misses: int = 0
|
|
total_size: int = 0
|
|
last_cleared: Optional[datetime] = None
|
|
errors: int = 0
|
|
|
|
class CacheManager:
|
|
"""Manages caching operations for the ETF Portal."""
|
|
|
|
def __init__(self, cache_dir: str = "cache", cache_duration: int = DEFAULT_CACHE_DURATION):
|
|
"""
|
|
Initialize the cache manager.
|
|
|
|
Args:
|
|
cache_dir: Directory to store cache files
|
|
cache_duration: Cache duration in seconds (24 hours by default)
|
|
"""
|
|
# Use absolute path for cache directory
|
|
self.cache_dir = Path(os.path.abspath(cache_dir))
|
|
self.cache_duration = cache_duration
|
|
self.stats = CacheStats()
|
|
self._lock = threading.Lock()
|
|
|
|
# Create cache directory if it doesn't exist
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
cache_logger.info(f"Cache directory: {self.cache_dir}")
|
|
cache_logger.info(f"Cache duration: {cache_duration} seconds")
|
|
|
|
# Load or initialize stats
|
|
self._load_stats()
|
|
|
|
# Log initialization complete
|
|
cache_logger.info("Cache manager initialized successfully")
|
|
performance_logger.log_performance_metric(
|
|
"cache_init",
|
|
time.time(),
|
|
"timestamp",
|
|
{"cache_duration": cache_duration}
|
|
)
|
|
|
|
def _get_cache_path(self, source: str, ticker: str, data_type: str) -> Path:
|
|
"""
|
|
Generate cache file path.
|
|
|
|
Args:
|
|
source: Data source (e.g., 'fmp', 'yahoo')
|
|
ticker: Stock/ETF ticker
|
|
data_type: Type of data (e.g., 'profile', 'historical')
|
|
|
|
Returns:
|
|
Path object for the cache file
|
|
"""
|
|
# Create subdirectory based on source and data type
|
|
if source == 'fmp':
|
|
subdir = f"FMP_cache/{data_type}"
|
|
else:
|
|
subdir = f"{source}_cache"
|
|
|
|
# Create the subdirectory
|
|
subdir_path = self.cache_dir / subdir
|
|
subdir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create filename in format: {ticker}.json
|
|
filename = f"{ticker}.json"
|
|
cache_path = subdir_path / filename
|
|
cache_logger.debug(f"Cache path: {cache_path}")
|
|
return cache_path
|
|
|
|
def _load_stats(self) -> None:
|
|
"""Load cache statistics from disk."""
|
|
stats_file = self.cache_dir / "cache_stats.json"
|
|
if stats_file.exists():
|
|
try:
|
|
with open(stats_file, 'r') as f:
|
|
data = json.load(f)
|
|
self.stats = CacheStats(**data)
|
|
if self.stats.last_cleared:
|
|
self.stats.last_cleared = datetime.fromisoformat(self.stats.last_cleared)
|
|
cache_logger.info(f"Loaded cache stats: {asdict(self.stats)}")
|
|
except Exception as e:
|
|
cache_logger.error(f"Error loading cache stats: {e}")
|
|
self.stats = CacheStats()
|
|
self.stats.errors += 1
|
|
|
|
def _save_stats(self) -> None:
|
|
"""Save cache statistics to disk."""
|
|
stats_file = self.cache_dir / "cache_stats.json"
|
|
try:
|
|
with open(stats_file, 'w') as f:
|
|
stats_dict = asdict(self.stats)
|
|
if stats_dict['last_cleared']:
|
|
stats_dict['last_cleared'] = stats_dict['last_cleared'].isoformat()
|
|
json.dump(stats_dict, f, indent=2)
|
|
cache_logger.debug(f"Saved cache stats: {stats_dict}")
|
|
except Exception as e:
|
|
cache_logger.error(f"Error saving cache stats: {e}")
|
|
self.stats.errors += 1
|
|
|
|
def save(self, source: str, ticker: str, data_type: str, data: Any) -> bool:
|
|
"""
|
|
Save data to cache.
|
|
|
|
Args:
|
|
source: Data source
|
|
ticker: Stock/ETF ticker
|
|
data_type: Type of data
|
|
data: Data to cache
|
|
|
|
Returns:
|
|
True if save was successful, False otherwise
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
start_time = time.time()
|
|
cache_path = self._get_cache_path(source, ticker, data_type)
|
|
|
|
# Log the data being cached
|
|
cache_logger.debug(f"Caching data for {source}/{ticker}/{data_type}")
|
|
cache_logger.debug(f"Data type: {type(data)}")
|
|
if isinstance(data, (list, dict)):
|
|
cache_logger.debug(f"Data length: {len(data)}")
|
|
|
|
# Prepare cache data with timestamp
|
|
cache_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'source': source,
|
|
'ticker': ticker,
|
|
'type': data_type,
|
|
'data': data
|
|
}
|
|
|
|
# Save to cache file
|
|
with open(cache_path, 'w') as f:
|
|
json.dump(cache_data, f, indent=2)
|
|
|
|
# Verify the file was written correctly
|
|
if not cache_path.exists():
|
|
cache_logger.error(f"Cache file was not created: {cache_path}")
|
|
return False
|
|
|
|
file_size = os.path.getsize(cache_path)
|
|
if file_size == 0:
|
|
cache_logger.error(f"Cache file is empty: {cache_path}")
|
|
return False
|
|
|
|
# Update stats
|
|
self.stats.total_size += file_size
|
|
self._save_stats()
|
|
|
|
duration = time.time() - start_time
|
|
cache_logger.log_cache_operation(
|
|
"save",
|
|
f"{source}/{ticker}/{data_type}",
|
|
size=file_size
|
|
)
|
|
performance_logger.log_performance_metric(
|
|
"cache_save",
|
|
duration,
|
|
"seconds",
|
|
{"source": source, "ticker": ticker, "type": data_type}
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
cache_logger.error(f"Error saving to cache: {e}")
|
|
self.stats.errors += 1
|
|
return False
|
|
|
|
def load(self, source: str, ticker: str, data_type: str) -> Tuple[bool, Optional[Any]]:
|
|
"""
|
|
Load data from cache if valid.
|
|
|
|
Args:
|
|
source: Data source
|
|
ticker: Stock/ETF ticker
|
|
data_type: Type of data
|
|
|
|
Returns:
|
|
Tuple of (is_valid, data)
|
|
"""
|
|
with self._lock:
|
|
start_time = time.time()
|
|
cache_path = self._get_cache_path(source, ticker, data_type)
|
|
|
|
cache_logger.debug(f"Attempting to load cache for {source}/{ticker}/{data_type}")
|
|
cache_logger.debug(f"Cache path: {cache_path}")
|
|
|
|
if not cache_path.exists():
|
|
cache_logger.debug(f"Cache file does not exist: {cache_path}")
|
|
cache_logger.log_cache_operation(
|
|
"load",
|
|
f"{source}/{ticker}/{data_type}",
|
|
hit=False
|
|
)
|
|
self.stats.misses += 1
|
|
self._save_stats()
|
|
return False, None
|
|
|
|
try:
|
|
with open(cache_path, 'r') as f:
|
|
cache_data = json.load(f)
|
|
|
|
# Check if cache is still valid
|
|
timestamp = datetime.fromisoformat(cache_data['timestamp'])
|
|
age = datetime.now() - timestamp
|
|
|
|
cache_logger.debug(f"Cache age: {age.total_seconds()} seconds")
|
|
cache_logger.debug(f"Cache duration: {self.cache_duration} seconds")
|
|
|
|
if age.total_seconds() > self.cache_duration:
|
|
cache_logger.debug(f"Cache expired for {source}/{ticker}/{data_type}")
|
|
cache_logger.log_cache_operation(
|
|
"load",
|
|
f"{source}/{ticker}/{data_type}",
|
|
hit=False
|
|
)
|
|
self.stats.misses += 1
|
|
self._save_stats()
|
|
return False, None
|
|
|
|
duration = time.time() - start_time
|
|
cache_logger.debug(f"Cache hit for {source}/{ticker}/{data_type}")
|
|
cache_logger.log_cache_operation(
|
|
"load",
|
|
f"{source}/{ticker}/{data_type}",
|
|
hit=True
|
|
)
|
|
performance_logger.log_performance_metric(
|
|
"cache_load",
|
|
duration,
|
|
"seconds",
|
|
{"source": source, "ticker": ticker, "type": data_type}
|
|
)
|
|
self.stats.hits += 1
|
|
self._save_stats()
|
|
return True, cache_data['data']
|
|
|
|
except Exception as e:
|
|
cache_logger.error(f"Error loading from cache: {e}")
|
|
self.stats.misses += 1
|
|
self.stats.errors += 1
|
|
self._save_stats()
|
|
return False, None
|
|
|
|
def clear_expired(self) -> None:
|
|
"""Remove expired cache files."""
|
|
with self._lock:
|
|
try:
|
|
cleared_count = 0
|
|
for cache_file in self.cache_dir.glob("*.json"):
|
|
if cache_file.name == "cache_stats.json":
|
|
continue
|
|
|
|
try:
|
|
with open(cache_file, 'r') as f:
|
|
cache_data = json.load(f)
|
|
|
|
timestamp = datetime.fromisoformat(cache_data['timestamp'])
|
|
age = datetime.now() - timestamp
|
|
|
|
if age.total_seconds() > self.cache_duration:
|
|
self.stats.total_size -= os.path.getsize(cache_file)
|
|
cache_file.unlink()
|
|
cleared_count += 1
|
|
cache_logger.debug(f"Removed expired cache: {cache_file}")
|
|
|
|
except Exception as e:
|
|
cache_logger.error(f"Error processing cache file {cache_file}: {e}")
|
|
self.stats.errors += 1
|
|
|
|
if cleared_count > 0:
|
|
cache_logger.info(f"Cleared {cleared_count} expired cache files")
|
|
self.stats.last_cleared = datetime.now()
|
|
self._save_stats()
|
|
|
|
except Exception as e:
|
|
cache_logger.error(f"Error clearing expired cache: {e}")
|
|
self.stats.errors += 1
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get cache statistics.
|
|
|
|
Returns:
|
|
Dictionary containing cache statistics
|
|
"""
|
|
with self._lock:
|
|
stats = asdict(self.stats)
|
|
if stats['last_cleared']:
|
|
stats['last_cleared'] = stats['last_cleared'].isoformat()
|
|
|
|
# Add additional stats
|
|
stats['cache_files'] = len(list(self.cache_dir.glob("*.json"))) - 1 # Exclude stats file
|
|
stats['hit_rate'] = (self.stats.hits / (self.stats.hits + self.stats.misses)) if (self.stats.hits + self.stats.misses) > 0 else 0
|
|
stats['total_size_mb'] = self.stats.total_size / (1024 * 1024)
|
|
|
|
return stats
|
|
|
|
def clear_all(self) -> None:
|
|
"""Clear all cache files."""
|
|
with self._lock:
|
|
try:
|
|
for cache_file in self.cache_dir.glob("*.json"):
|
|
if cache_file.name == "cache_stats.json":
|
|
continue
|
|
cache_file.unlink()
|
|
|
|
self.stats = CacheStats()
|
|
self._save_stats()
|
|
cache_logger.info("Cleared all cache files")
|
|
|
|
except Exception as e:
|
|
cache_logger.error(f"Error clearing all cache: {e}")
|
|
self.stats.errors += 1
|
|
|
|
# Create a singleton instance
|
|
cache_manager = CacheManager() |