ETF_Suite_Portal/pages/cache_manager.py

235 lines
7.6 KiB
Python

import json
import os
from datetime import datetime, timedelta
from pathlib import Path
import logging
from typing import Any, Dict, Optional, Tuple, Union
import hashlib
import sys
# Configure logging
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"cache_manager_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing handlers to avoid duplicate logs
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Create file handler
file_handler = logging.FileHandler(log_file, mode='a')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
# Create console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
# Configure root logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
class CacheManager:
"""
Manages caching of ETF data to reduce API calls and improve performance.
Implements a time-based cache expiration system.
"""
def __init__(self, cache_dir: str = "cache", cache_duration_hours: int = 24):
"""
Initialize the cache manager.
Args:
cache_dir: Directory to store cache files
cache_duration_hours: Number of hours before cache expires
"""
self.cache_dir = Path(cache_dir)
self.cache_duration = timedelta(hours=cache_duration_hours)
# Create cache directory if it doesn't exist
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Configure logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create file handler
log_file = Path("logs") / f"cache_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
log_file.parent.mkdir(exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.info(f"CacheManager initialized with directory: {self.cache_dir}")
self.logger.info(f"Cache duration: {cache_duration_hours} hours")
def _generate_cache_key(self, source: str, ticker: str, data_type: str) -> str:
"""
Generate a unique cache key for the data.
Args:
source: Data source (e.g., 'fmp', 'yf')
ticker: ETF ticker symbol
data_type: Type of data (e.g., 'profile', 'history')
Returns:
Cache key string
"""
return f"{source}_{ticker}_{data_type}.json"
def _get_cache_path(self, cache_key: str) -> Path:
"""
Get the full path for a cache file.
Args:
cache_key: Cache key string
Returns:
Path object for the cache file
"""
return self.cache_dir / cache_key
def _is_cache_valid(self, cache_path: Path) -> bool:
"""
Check if a cache file is still valid based on its age.
Args:
cache_path: Path to the cache file
Returns:
True if cache is valid, False otherwise
"""
if not cache_path.exists():
return False
file_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
is_valid = file_age < self.cache_duration
self.logger.debug(f"Cache file {cache_path} age: {file_age}, valid: {is_valid}")
return is_valid
def save_to_cache(self, cache_key: str, data: Any) -> bool:
"""
Save data to cache.
Args:
cache_key: Cache key string
data: Data to cache
Returns:
True if save was successful, False otherwise
"""
try:
cache_path = self._get_cache_path(cache_key)
# Create cache directory if it doesn't exist
cache_path.parent.mkdir(parents=True, exist_ok=True)
# Save data to JSON file
with open(cache_path, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Data saved to cache: {cache_path}")
return True
except Exception as e:
self.logger.error(f"Error saving to cache: {str(e)}")
return False
def load_from_cache(self, cache_key: str) -> Tuple[Optional[Any], bool]:
"""
Load data from cache if it exists and is valid.
Args:
cache_key: Cache key string
Returns:
Tuple of (cached data, is_valid)
"""
try:
cache_path = self._get_cache_path(cache_key)
if not cache_path.exists():
self.logger.debug(f"Cache miss: {cache_path}")
return None, False
if not self._is_cache_valid(cache_path):
self.logger.info(f"Cache expired: {cache_path}")
return None, False
# Load data from JSON file
with open(cache_path, 'r') as f:
data = json.load(f)
self.logger.info(f"Data loaded from cache: {cache_path}")
return data, True
except Exception as e:
self.logger.error(f"Error loading from cache: {str(e)}")
return None, False
def clear_expired_cache(self) -> int:
"""
Clear all expired cache files.
Returns:
Number of files cleared
"""
try:
cleared_count = 0
for cache_file in self.cache_dir.glob("*.json"):
if not self._is_cache_valid(cache_file):
cache_file.unlink()
cleared_count += 1
self.logger.info(f"Cleared expired cache: {cache_file}")
self.logger.info(f"Cleared {cleared_count} expired cache files")
return cleared_count
except Exception as e:
self.logger.error(f"Error clearing expired cache: {str(e)}")
return 0
def get_cache_stats(self) -> Dict[str, Any]:
"""
Get statistics about the cache.
Returns:
Dictionary with cache statistics
"""
try:
total_files = 0
expired_files = 0
total_size = 0
for cache_file in self.cache_dir.glob("*.json"):
total_files += 1
total_size += cache_file.stat().st_size
if not self._is_cache_valid(cache_file):
expired_files += 1
stats = {
"total_files": total_files,
"expired_files": expired_files,
"total_size_bytes": total_size,
"cache_dir": str(self.cache_dir),
"cache_duration_hours": self.cache_duration.total_seconds() / 3600
}
self.logger.info(f"Cache statistics: {stats}")
return stats
except Exception as e:
self.logger.error(f"Error getting cache stats: {str(e)}")
return {}