from typing import Dict, List, Optional, Tuple, Any import pandas as pd import numpy as np import traceback from dataclasses import dataclass, field from enum import Enum from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult from ..nav_erosion_service import NavErosionService from .logger import logger class DistributionFrequency(Enum): """Enum for distribution frequencies""" MONTHLY = ("Monthly", 12) QUARTERLY = ("Quarterly", 4) SEMI_ANNUALLY = ("Semi-Annually", 2) ANNUALLY = ("Annually", 1) UNKNOWN = ("Unknown", 12) def __init__(self, name: str, payments_per_year: int): self.display_name = name self.payments_per_year = payments_per_year @dataclass class TickerData: """Data structure for individual ticker information""" ticker: str price: float annual_yield: float shares: float allocation_pct: float distribution_freq: DistributionFrequency @property def market_value(self) -> float: return self.price * self.shares @property def monthly_yield(self) -> float: return self.annual_yield / 12 @property def distribution_yield(self) -> float: return self.annual_yield / self.distribution_freq.payments_per_year class DripService: """Enhanced DRIP calculation service with improved performance and accuracy""" def __init__(self) -> None: self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency} self.nav_erosion_service = NavErosionService() def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult: """ Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy. Args: portfolio_df: DataFrame containing portfolio allocation config: DripConfig object with simulation parameters Returns: DripResult object containing the simulation results """ try: # Validate inputs self._validate_inputs(portfolio_df, config) # Get erosion data from nav_erosion_service erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) logger.info(f"Erosion data results: {erosion_data.results}") # Initialize erosion rates dictionary erosion_rates = {} # Use erosion rates from nav_erosion_service for ticker in portfolio_df["Ticker"]: # Find the result for this ticker in erosion_data.results result = next((r for r in erosion_data.results if r.ticker == ticker), None) if result: erosion_rates[ticker] = { "nav": result.monthly_nav_erosion_rate, "yield": result.monthly_yield_erosion_rate } logger.info(f"=== EROSION RATE DEBUG ===") logger.info(f"Ticker: {ticker}") logger.info(f"Erosion rates from nav_erosion_service:") logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}") logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}") logger.info(f"=== END EROSION RATE DEBUG ===\n") else: # Use default erosion rates if not found erosion_rates[ticker] = { "nav": 0.05, # 5% per month (very high, for test) "yield": 0.07 # 7% per month (very high, for test) } logger.info(f"=== EROSION RATE DEBUG ===") logger.info(f"Ticker: {ticker}") logger.info(f"Using default erosion rates:") logger.info(f" NAV: {erosion_rates[ticker]['nav']:.4%}") logger.info(f" Yield: {erosion_rates[ticker]['yield']:.4%}") logger.info(f"=== END EROSION RATE DEBUG ===\n") # Log the final erosion rates dictionary logger.info(f"Final erosion rates dictionary: {erosion_rates}") # Initialize portfolio data ticker_data = self._initialize_ticker_data(portfolio_df) # Pre-calculate distribution schedule for performance distribution_schedule = self._create_distribution_schedule(ticker_data, config.months) # Initialize simulation state simulation_state = self._initialize_simulation_state(ticker_data) monthly_data: List[MonthlyData] = [] # Run monthly simulation for month in range(1, config.months + 1): logger.info(f"\n=== Starting Month {month} ===") logger.info(f"Initial state for month {month}:") for ticker in ticker_data.keys(): logger.info(f" {ticker}:") logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}") logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}") logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}") month_result = self._simulate_month( month, simulation_state, ticker_data, erosion_rates, distribution_schedule ) monthly_data.append(month_result) logger.info(f"Final state for month {month}:") for ticker in ticker_data.keys(): logger.info(f" {ticker}:") logger.info(f" Price: ${simulation_state['current_prices'][ticker]:.2f}") logger.info(f" Yield: {simulation_state['current_yields'][ticker]:.2%}") logger.info(f" Shares: {simulation_state['current_shares'][ticker]:.4f}") logger.info(f"=== End Month {month} ===\n") # Calculate final results return self._create_drip_result(monthly_data, simulation_state) except Exception as e: logger.error(f"Error calculating DRIP growth: {str(e)}") logger.error(traceback.format_exc()) raise def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None: """Validate input parameters""" required_columns = ["Ticker", "Price", "Yield (%)", "Shares"] missing_columns = [col for col in required_columns if col not in portfolio_df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") if config.months <= 0: raise ValueError("Months must be positive") if portfolio_df.empty: raise ValueError("Portfolio DataFrame is empty") def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]: """Initialize ticker data with validation""" ticker_data = {} for _, row in portfolio_df.iterrows(): ticker = row["Ticker"] # Handle distribution frequency dist_period = row.get("Distribution Period", "Monthly") dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY) ticker_data[ticker] = TickerData( ticker=ticker, price=max(0.01, float(row["Price"])), # Prevent zero/negative prices annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal shares=max(0.0, float(row["Shares"])), allocation_pct=float(row.get("Allocation (%)", 0) / 100), distribution_freq=dist_freq ) return ticker_data def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]: """Pre-calculate which months each ticker pays distributions""" schedule = {} for ticker, data in ticker_data.items(): distribution_months = [] freq = data.distribution_freq for month in range(1, total_months + 1): if self._is_distribution_month(month, freq): distribution_months.append(month) schedule[ticker] = distribution_months return schedule def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]: """Initialize simulation state variables""" return { 'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()}, 'current_prices': {ticker: data.price for ticker, data in ticker_data.items()}, 'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()}, 'cumulative_income': 0.0 } def _simulate_month( self, month: int, state: Dict[str, Any], ticker_data: Dict[str, TickerData], erosion_rates: Dict[str, Dict[str, float]], distribution_schedule: Dict[str, List[int]] ) -> MonthlyData: """Simulate a single month with improved accuracy""" # Debug logging for erosion rates logger.info(f"\n=== EROSION RATES DEBUG ===") logger.info(f"Erosion rates dictionary: {erosion_rates}") for ticker, rates in erosion_rates.items(): logger.info(f" {ticker}:") logger.info(f" nav: {rates['nav']:.4%}") logger.info(f" yield: {rates['yield']:.4%}") logger.info(f"=== END EROSION RATES DEBUG ===\n") # Apply erosion first for ticker, rates in erosion_rates.items(): if ticker in state['current_prices']: # Get monthly erosion rates (already in decimal form) monthly_nav_erosion = rates['nav'] monthly_yield_erosion = rates['yield'] # Get current values old_price = state['current_prices'][ticker] old_yield = state['current_yields'][ticker] # Debug logging logger.info(f"\n=== EROSION CALCULATION DEBUG ===") logger.info(f"Ticker: {ticker}") logger.info(f"Raw erosion rates from nav_erosion_service:") logger.info(f" monthly_nav_erosion: {monthly_nav_erosion:.4%}") logger.info(f" monthly_yield_erosion: {monthly_yield_erosion:.4%}") logger.info(f"Current values:") logger.info(f" old_price: ${old_price:.4f}") logger.info(f" old_yield: {old_yield:.4%}") # Calculate new values new_price = old_price * (1 - monthly_nav_erosion) new_yield = old_yield * (1 - monthly_yield_erosion) logger.info(f"Calculated new values:") logger.info(f" new_price = ${old_price:.4f} * (1 - {monthly_nav_erosion:.4%})") logger.info(f" new_price = ${old_price:.4f} * {1 - monthly_nav_erosion:.4f}") logger.info(f" new_price = ${new_price:.4f}") logger.info(f" new_yield = {old_yield:.4%} * (1 - {monthly_yield_erosion:.4%})") logger.info(f" new_yield = {old_yield:.4%} * {1 - monthly_yield_erosion:.4f}") logger.info(f" new_yield = {new_yield:.4%}") # Apply the new values with bounds checking state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields logger.info(f"Final values after bounds checking:") logger.info(f" final_price: ${state['current_prices'][ticker]:.4f}") logger.info(f" final_yield: {state['current_yields'][ticker]:.4%}") logger.info(f"=== END EROSION CALCULATION DEBUG ===\n") # Log the actual erosion being applied logger.info(f"Applied erosion to {ticker}:") logger.info(f" NAV: {monthly_nav_erosion:.4%} -> New price: ${state['current_prices'][ticker]:.2f}") logger.info(f" Yield: {monthly_yield_erosion:.4%} -> New yield: {state['current_yields'][ticker]:.2%}") # Calculate monthly income from distributions using eroded values monthly_income = self._calculate_monthly_distributions( month, state, ticker_data, distribution_schedule ) # Update cumulative income state['cumulative_income'] += monthly_income # Reinvest dividends (DRIP) self._reinvest_dividends(month, state, distribution_schedule) # Calculate total portfolio value with bounds checking total_value = 0.0 for ticker in ticker_data.keys(): shares = state['current_shares'][ticker] price = state['current_prices'][ticker] if shares > 0 and price > 0: total_value += shares * price return MonthlyData( month=month, total_value=total_value, monthly_income=monthly_income, cumulative_income=state['cumulative_income'], shares=state['current_shares'].copy(), prices=state['current_prices'].copy(), yields=state['current_yields'].copy() ) def _calculate_monthly_distributions( self, month: int, state: Dict[str, Any], ticker_data: Dict[str, TickerData], distribution_schedule: Dict[str, List[int]] ) -> float: """Calculate distributions for the current month""" monthly_income = 0.0 for ticker, data in ticker_data.items(): if month in distribution_schedule[ticker]: shares = state['current_shares'][ticker] price = state['current_prices'][ticker] yield_rate = state['current_yields'][ticker] # Calculate distribution amount using annual yield divided by payments per year distribution_yield = yield_rate / data.distribution_freq.payments_per_year distribution_amount = shares * price * distribution_yield monthly_income += distribution_amount return monthly_income def _reinvest_dividends( self, month: int, state: Dict[str, Any], distribution_schedule: Dict[str, List[int]] ) -> None: """Reinvest dividends for tickers that distributed in this month""" for ticker, distribution_months in distribution_schedule.items(): if month in distribution_months: shares = state['current_shares'][ticker] price = state['current_prices'][ticker] yield_rate = state['current_yields'][ticker] # Calculate dividend income using the correct distribution frequency freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY) dividend_income = shares * price * (yield_rate / freq.payments_per_year) # Purchase additional shares if price > 0: new_shares = dividend_income / price state['current_shares'][ticker] += new_shares def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool: """Check if current month is a distribution month""" if frequency == DistributionFrequency.MONTHLY: return True elif frequency == DistributionFrequency.QUARTERLY: return month % 3 == 0 elif frequency == DistributionFrequency.SEMI_ANNUALLY: return month % 6 == 0 elif frequency == DistributionFrequency.ANNUALLY: return month % 12 == 0 else: return True # Default to monthly for unknown def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult: """Create final DRIP result object""" if not monthly_data: raise ValueError("No monthly data generated") final_data = monthly_data[-1] return DripResult( monthly_data=monthly_data, final_portfolio_value=final_data.total_value, total_income=final_data.cumulative_income, total_shares=state['current_shares'].copy() ) # Utility methods for analysis and comparison def calculate_drip_vs_no_drip_comparison( self, portfolio_df: pd.DataFrame, config: DripConfig ) -> Dict[str, Any]: """Calculate comparison between DRIP and no-DRIP scenarios""" # Calculate DRIP scenario drip_result = self.calculate_drip_growth(portfolio_df, config) # Calculate no-DRIP scenario (dividends not reinvested) no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config) # Calculate comparison metrics drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value'] percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100 return { 'drip_final_value': drip_result.final_portfolio_value, 'no_drip_final_value': no_drip_result['final_value'], 'drip_advantage': drip_advantage, 'percentage_advantage': percentage_advantage, 'total_dividends_reinvested': drip_result.total_income, 'cash_dividends_no_drip': no_drip_result['total_dividends'] } def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]: """Calculate scenario where dividends are not reinvested""" ticker_data = self._initialize_ticker_data(portfolio_df) erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) erosion_rates = { result.ticker: { "nav": result.monthly_nav_erosion_rate, "yield": result.monthly_yield_erosion_rate } for result in erosion_data.results } state = self._initialize_simulation_state(ticker_data) total_dividends = 0.0 for month in range(1, config.months + 1): # Calculate dividends but don't reinvest monthly_dividends = self._calculate_monthly_distributions( month, state, ticker_data, self._create_distribution_schedule(ticker_data, config.months) ) total_dividends += monthly_dividends # Apply erosion for ticker, rates in erosion_rates.items(): if ticker in state['current_prices']: # Get monthly erosion rates (already in decimal form) monthly_nav_erosion = rates['nav'] monthly_yield_erosion = rates['yield'] # Apply NAV erosion (decrease price) old_price = state['current_prices'][ticker] new_price = old_price * (1 - monthly_nav_erosion) state['current_prices'][ticker] = max(0.01, new_price) # Prevent zero/negative prices # Apply yield erosion (decrease yield) old_yield = state['current_yields'][ticker] new_yield = old_yield * (1 - monthly_yield_erosion) state['current_yields'][ticker] = max(0.0, new_yield) # Prevent negative yields final_value = sum( state['current_shares'][ticker] * state['current_prices'][ticker] for ticker in ticker_data.keys() ) return { 'final_value': final_value, 'total_dividends': total_dividends }