from typing import Dict, List, Optional, Tuple, Any import pandas as pd import numpy as np import logging import traceback from dataclasses import dataclass, field from enum import Enum from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult from ..nav_erosion_service import NavErosionService # Configure logging logger = logging.getLogger(__name__) class DistributionFrequency(Enum): """Enum for distribution frequencies""" MONTHLY = ("Monthly", 12) QUARTERLY = ("Quarterly", 4) SEMI_ANNUALLY = ("Semi-Annually", 2) ANNUALLY = ("Annually", 1) UNKNOWN = ("Unknown", 12) def __init__(self, name: str, payments_per_year: int): self.display_name = name self.payments_per_year = payments_per_year @dataclass class TickerData: """Data structure for individual ticker information""" ticker: str price: float annual_yield: float shares: float allocation_pct: float distribution_freq: DistributionFrequency @property def market_value(self) -> float: return self.price * self.shares @property def monthly_yield(self) -> float: return self.annual_yield / 12 @property def distribution_yield(self) -> float: return self.annual_yield / self.distribution_freq.payments_per_year class DripService: """Enhanced DRIP calculation service with improved performance and accuracy""" def __init__(self) -> None: self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency} self.nav_erosion_service = NavErosionService() def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult: """ Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy. Args: portfolio_df: DataFrame containing portfolio allocation config: DripConfig object with simulation parameters Returns: DripResult object containing the simulation results """ try: # Validate inputs self._validate_inputs(portfolio_df, config) # Get erosion data from nav_erosion_service erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) erosion_rates = { result.ticker: { "nav": result.estimated_nav_erosion / 100, # Convert to decimal "yield": result.estimated_yield_erosion / 100 # Convert to decimal } for result in erosion_data.results } # Initialize portfolio data ticker_data = self._initialize_ticker_data(portfolio_df) # Pre-calculate distribution schedule for performance distribution_schedule = self._create_distribution_schedule(ticker_data, config.months) # Initialize simulation state simulation_state = self._initialize_simulation_state(ticker_data) monthly_data: List[MonthlyData] = [] # Run monthly simulation for month in range(1, config.months + 1): month_result = self._simulate_month( month, simulation_state, ticker_data, erosion_rates, distribution_schedule ) monthly_data.append(month_result) # Calculate final results return self._create_drip_result(monthly_data, simulation_state) except Exception as e: logger.error(f"Error calculating DRIP growth: {str(e)}") logger.error(traceback.format_exc()) raise def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None: """Validate input parameters""" required_columns = ["Ticker", "Price", "Yield (%)", "Shares"] missing_columns = [col for col in required_columns if col not in portfolio_df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") if config.months <= 0: raise ValueError("Months must be positive") if portfolio_df.empty: raise ValueError("Portfolio DataFrame is empty") def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]: """Initialize ticker data with validation""" ticker_data = {} for _, row in portfolio_df.iterrows(): ticker = row["Ticker"] # Handle distribution frequency dist_period = row.get("Distribution Period", "Monthly") dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY) ticker_data[ticker] = TickerData( ticker=ticker, price=max(0.01, float(row["Price"])), # Prevent zero/negative prices annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal shares=max(0.0, float(row["Shares"])), allocation_pct=float(row.get("Allocation (%)", 0) / 100), distribution_freq=dist_freq ) return ticker_data def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]: """Pre-calculate which months each ticker pays distributions""" schedule = {} for ticker, data in ticker_data.items(): distribution_months = [] freq = data.distribution_freq for month in range(1, total_months + 1): if self._is_distribution_month(month, freq): distribution_months.append(month) schedule[ticker] = distribution_months return schedule def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]: """Initialize simulation state variables""" return { 'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()}, 'current_prices': {ticker: data.price for ticker, data in ticker_data.items()}, 'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()}, 'cumulative_income': 0.0 } def _simulate_month( self, month: int, state: Dict[str, Any], ticker_data: Dict[str, TickerData], erosion_rates: Dict[str, Dict[str, float]], distribution_schedule: Dict[str, List[int]] ) -> MonthlyData: """Simulate a single month with improved accuracy""" # Calculate monthly income from distributions monthly_income = self._calculate_monthly_distributions( month, state, ticker_data, distribution_schedule ) # Update cumulative income state['cumulative_income'] += monthly_income # Apply erosion to prices and yields using nav_erosion_service data self._apply_monthly_erosion(state, erosion_rates) # Reinvest dividends (DRIP) self._reinvest_dividends(month, state, distribution_schedule) # Calculate total portfolio value with bounds checking total_value = 0.0 for ticker in ticker_data.keys(): shares = state['current_shares'][ticker] price = state['current_prices'][ticker] if shares > 0 and price > 0: total_value += shares * price return MonthlyData( month=month, total_value=total_value, monthly_income=monthly_income, cumulative_income=state['cumulative_income'], shares=state['current_shares'].copy(), prices=state['current_prices'].copy(), yields=state['current_yields'].copy() ) def _calculate_monthly_distributions( self, month: int, state: Dict[str, Any], ticker_data: Dict[str, TickerData], distribution_schedule: Dict[str, List[int]] ) -> float: """Calculate distributions for the current month""" monthly_income = 0.0 for ticker, data in ticker_data.items(): if month in distribution_schedule[ticker]: shares = state['current_shares'][ticker] price = state['current_prices'][ticker] yield_rate = state['current_yields'][ticker] # Calculate distribution amount using annual yield divided by payments per year distribution_yield = yield_rate / data.distribution_freq.payments_per_year distribution_amount = shares * price * distribution_yield monthly_income += distribution_amount return monthly_income def _apply_monthly_erosion( self, state: Dict[str, Any], erosion_rates: Dict[str, Dict[str, float]] ) -> None: """Apply erosion to current prices and yields using nav_erosion_service data""" for ticker, rates in erosion_rates.items(): if ticker in state['current_prices']: # Apply monthly erosion rates monthly_nav_erosion = rates['nav'] / 12 monthly_yield_erosion = rates['yield'] / 12 # Apply erosion with bounds checking state['current_prices'][ticker] = max(0.01, state['current_prices'][ticker] * (1 - monthly_nav_erosion)) state['current_yields'][ticker] = max(0.0, state['current_yields'][ticker] * (1 - monthly_yield_erosion)) def _reinvest_dividends( self, month: int, state: Dict[str, Any], distribution_schedule: Dict[str, List[int]] ) -> None: """Reinvest dividends for tickers that distributed in this month""" for ticker, distribution_months in distribution_schedule.items(): if month in distribution_months: shares = state['current_shares'][ticker] price = state['current_prices'][ticker] yield_rate = state['current_yields'][ticker] # Calculate dividend income using the correct distribution frequency freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY) dividend_income = shares * price * (yield_rate / freq.payments_per_year) # Purchase additional shares if price > 0: new_shares = dividend_income / price state['current_shares'][ticker] += new_shares def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool: """Check if current month is a distribution month""" if frequency == DistributionFrequency.MONTHLY: return True elif frequency == DistributionFrequency.QUARTERLY: return month % 3 == 0 elif frequency == DistributionFrequency.SEMI_ANNUALLY: return month % 6 == 0 elif frequency == DistributionFrequency.ANNUALLY: return month % 12 == 0 else: return True # Default to monthly for unknown def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult: """Create final DRIP result object""" if not monthly_data: raise ValueError("No monthly data generated") final_data = monthly_data[-1] return DripResult( monthly_data=monthly_data, final_portfolio_value=final_data.total_value, total_income=final_data.cumulative_income, total_shares=state['current_shares'].copy() ) # Utility methods for analysis and comparison def calculate_drip_vs_no_drip_comparison( self, portfolio_df: pd.DataFrame, config: DripConfig ) -> Dict[str, Any]: """Calculate comparison between DRIP and no-DRIP scenarios""" # Calculate DRIP scenario drip_result = self.calculate_drip_growth(portfolio_df, config) # Calculate no-DRIP scenario (dividends not reinvested) no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config) # Calculate comparison metrics drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value'] percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100 return { 'drip_final_value': drip_result.final_portfolio_value, 'no_drip_final_value': no_drip_result['final_value'], 'drip_advantage': drip_advantage, 'percentage_advantage': percentage_advantage, 'total_dividends_reinvested': drip_result.total_income, 'cash_dividends_no_drip': no_drip_result['total_dividends'] } def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]: """Calculate scenario where dividends are not reinvested""" ticker_data = self._initialize_ticker_data(portfolio_df) erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) erosion_rates = { result.ticker: { "nav": result.estimated_nav_erosion / 100, "yield": result.estimated_yield_erosion / 100 } for result in erosion_data.results } state = self._initialize_simulation_state(ticker_data) total_dividends = 0.0 for month in range(1, config.months + 1): # Calculate dividends but don't reinvest monthly_dividends = self._calculate_monthly_distributions( month, state, ticker_data, self._create_distribution_schedule(ticker_data, config.months) ) total_dividends += monthly_dividends # Apply erosion self._apply_monthly_erosion(state, erosion_rates) final_value = sum( state['current_shares'][ticker] * state['current_prices'][ticker] for ticker in ticker_data.keys() ) return { 'final_value': final_value, 'total_dividends': total_dividends }