from typing import Dict, List, Optional, Tuple, Any import pandas as pd import logging import traceback from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult # Configure logging logger = logging.getLogger(__name__) class DripService: def __init__(self) -> None: self.MAX_EROSION_LEVEL = 9 self.max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion self.dividend_frequency = { "Monthly": 12, "Quarterly": 4, "Semi-Annually": 2, "Annually": 1, "Unknown": 12 # Default to monthly if unknown } def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult: """ Calculate DRIP growth for a portfolio over a specified period. Args: portfolio_df: DataFrame containing portfolio allocation config: DripConfig object with simulation parameters Returns: DripResult object containing the simulation results """ try: # Initialize monthly data list monthly_data: List[MonthlyData] = [] # Get initial values initial_shares = self._calculate_initial_shares(portfolio_df) initial_prices = dict(zip(portfolio_df["Ticker"], portfolio_df["Price"])) initial_yields = dict(zip(portfolio_df["Ticker"], portfolio_df["Yield (%)"] / 100)) # Initialize tracking variables current_shares = initial_shares.copy() current_prices = initial_prices.copy() current_yields = initial_yields.copy() cumulative_income = 0.0 # Run simulation for each month for month in range(1, config.months + 1): # Calculate monthly income monthly_income = sum( (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker]) for ticker in current_shares.keys() ) # Update cumulative income cumulative_income += monthly_income # Calculate total portfolio value total_value = sum( current_shares[ticker] * current_prices[ticker] for ticker in current_shares.keys() ) # Apply erosion if enabled if config.erosion_type != "None": current_prices, current_yields = self._apply_erosion( current_prices, current_yields, config.erosion_type, config.erosion_level ) # Reinvest dividends for ticker in current_shares.keys(): dividend_income = (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker]) new_shares = dividend_income / current_prices[ticker] current_shares[ticker] += new_shares # Store monthly data monthly_data.append(MonthlyData( month=month, total_value=total_value, monthly_income=monthly_income, cumulative_income=cumulative_income, shares=current_shares.copy(), prices=current_prices.copy(), yields=current_yields.copy() )) # Calculate final values final_portfolio_value = monthly_data[-1].total_value total_income = monthly_data[-1].cumulative_income total_shares = current_shares.copy() return DripResult( monthly_data=monthly_data, final_portfolio_value=final_portfolio_value, total_income=total_income, total_shares=total_shares ) except Exception as e: logger.error(f"Error calculating DRIP growth: {str(e)}") logger.error(traceback.format_exc()) raise def _calculate_initial_shares(self, portfolio_df: pd.DataFrame) -> Dict[str, float]: """Calculate initial shares for each ETF.""" return dict(zip(portfolio_df["Ticker"], portfolio_df["Shares"])) def _apply_erosion( self, prices: Dict[str, float], yields: Dict[str, float], erosion_type: str, erosion_level: Dict[str, Any] ) -> Tuple[Dict[str, float], Dict[str, float]]: """ Apply erosion to prices and yields based on configuration. Args: prices: Dictionary of current prices yields: Dictionary of current yields erosion_type: Type of erosion to apply erosion_level: Dictionary containing erosion levels Returns: Tuple of updated prices and yields """ try: updated_prices = prices.copy() updated_yields = yields.copy() if erosion_type == "None": return updated_prices, updated_yields if erosion_level.get("use_per_ticker", False): # Apply per-ticker erosion rates for ticker in prices.keys(): if ticker in erosion_level: ticker_erosion = erosion_level[ticker] # Apply monthly erosion nav_erosion = (ticker_erosion["nav"] / 9) * 0.1754 yield_erosion = (ticker_erosion["yield"] / 9) * 0.1754 updated_prices[ticker] *= (1 - nav_erosion) updated_yields[ticker] *= (1 - yield_erosion) else: # Apply global erosion rates nav_erosion = (erosion_level["global"]["nav"] / 9) * 0.1754 yield_erosion = (erosion_level["global"]["yield"] / 9) * 0.1754 for ticker in prices.keys(): updated_prices[ticker] *= (1 - nav_erosion) updated_yields[ticker] *= (1 - yield_erosion) return updated_prices, updated_yields except Exception as e: logger.error(f"Error applying erosion: {str(e)}") logger.error(traceback.format_exc()) return prices, yields def _initialize_erosion_rates( self, tickers: List[str], erosion_type: str, erosion_level: Dict[str, Any] ) -> Tuple[Dict[str, float], Dict[str, float]]: """Initialize erosion rates for each ticker based on configuration.""" ticker_nav_rates: Dict[str, float] = {} ticker_yield_rates: Dict[str, float] = {} if erosion_type != "None" and isinstance(erosion_level, dict): if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion for ticker in tickers: ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) ticker_nav_rates[ticker] = ticker_settings["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion ticker_yield_rates[ticker] = ticker_settings["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion else: global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion for ticker in tickers: ticker_nav_rates[ticker] = global_nav ticker_yield_rates[ticker] = global_yield else: for ticker in tickers: ticker_nav_rates[ticker] = 0 ticker_yield_rates[ticker] = 0 return ticker_nav_rates, ticker_yield_rates def _create_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: """Create a dictionary of ticker-specific data.""" ticker_data: Dict[str, Dict[str, Any]] = {} for _, row in portfolio_df.iterrows(): ticker = row["Ticker"] ticker_data[ticker] = { "price": row["Price"], "yield_annual": row["Yield (%)"] / 100, "initial_shares": row["Capital Allocated ($)"] / row["Price"], "initial_allocation": row["Allocation (%)"] / 100, "distribution": row.get("Distribution Period", "Monthly") } return ticker_data def _calculate_monthly_income( self, current_shares: Dict[str, float], current_prices: Dict[str, float], current_yields: Dict[str, float], tickers: List[str] ) -> float: """Calculate expected monthly income based on current portfolio and yields.""" return sum( (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker]) for ticker in tickers ) def _create_month_data( self, month: int, current_total_value: float, monthly_income: float, cumulative_income: float, current_shares: Dict[str, float], current_prices: Dict[str, float], current_yields: Dict[str, float], tickers: List[str] ) -> MonthlyData: """Create monthly data object.""" return MonthlyData( month=month, total_value=current_total_value, monthly_income=monthly_income, cumulative_income=cumulative_income, shares=current_shares.copy(), prices=current_prices.copy(), yields=current_yields.copy() ) def _calculate_and_reinvest_dividends( self, month: int, ticker_data: Dict[str, Dict[str, Any]], current_shares: Dict[str, float], current_prices: Dict[str, float], current_yields: Dict[str, float], ticker_yield_rates: Dict[str, float], dividend_frequency: Dict[str, int] ) -> float: """Calculate dividends and reinvest them proportionally.""" month_dividends: Dict[str, float] = {} for ticker, data in ticker_data.items(): freq = dividend_frequency[data["distribution"]] if month % (12 / freq) == 0: if ticker_yield_rates[ticker] > 0: dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker] else: dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker] else: dividend = 0 month_dividends[ticker] = dividend total_month_dividend = sum(month_dividends.values()) # Reinvest dividends proportionally for ticker, data in ticker_data.items(): if current_prices[ticker] > 0: new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker] current_shares[ticker] += new_shares return total_month_dividend