fix: Update DRIP service to use correct MonthlyData structure and fix month attribute error

This commit is contained in:
Pascal BIBEHE 2025-06-02 16:07:27 +02:00
parent 7889608544
commit 4ea1fe2a73

View File

@ -0,0 +1,470 @@
from typing import Dict, List, Optional, Tuple, Any
import pandas as pd
import numpy as np
import logging
import traceback
from dataclasses import dataclass, field
from enum import Enum
from .models import (
MonthlyData,
DripConfig,
DripResult,
DRIPMetrics,
DRIPForecastResult,
DRIPPortfolioResult
)
from ..nav_erosion_service import NavErosionService
# Configure logging
logger = logging.getLogger(__name__)
__all__ = ['DRIPService', 'DistributionFrequency', 'TickerData', 'ErosionConfig']
class DistributionFrequency(Enum):
"""Enum for distribution frequencies"""
MONTHLY = ("Monthly", 12)
QUARTERLY = ("Quarterly", 4)
SEMI_ANNUALLY = ("Semi-Annually", 2)
ANNUALLY = ("Annually", 1)
UNKNOWN = ("Unknown", 12)
def __init__(self, name: str, payments_per_year: int):
self.display_name = name
self.payments_per_year = payments_per_year
@dataclass
class TickerData:
"""Data structure for individual ticker information"""
ticker: str
price: float
annual_yield: float
shares: float
allocation_pct: float
distribution_freq: DistributionFrequency
@property
def market_value(self) -> float:
return self.price * self.shares
@property
def monthly_yield(self) -> float:
return self.annual_yield / 12
@property
def distribution_yield(self) -> float:
return self.annual_yield / self.distribution_freq.payments_per_year
@dataclass
class ErosionConfig:
"""Configuration for erosion calculations"""
erosion_type: str
use_per_ticker: bool = False
global_nav_rate: float = 0.0
global_yield_rate: float = 0.0
per_ticker_rates: Dict[str, Dict[str, float]] = field(default_factory=dict)
class DRIPService:
"""Enhanced DRIP calculation service with improved performance and accuracy"""
def __init__(self) -> None:
self.MAX_EROSION_LEVEL = 9
self.MAX_MONTHLY_EROSION = 0.05 # 5% monthly max erosion
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
"""
Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy.
Args:
portfolio_df: DataFrame containing portfolio allocation
config: DripConfig object with simulation parameters
Returns:
DripResult object containing the simulation results
"""
try:
# Validate inputs
self._validate_inputs(portfolio_df, config)
# Initialize portfolio data
ticker_data = self._initialize_ticker_data(portfolio_df)
erosion_config = self._parse_erosion_config(config)
# Pre-calculate distribution schedule for performance
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
# Initialize simulation state
simulation_state = self._initialize_simulation_state(ticker_data)
monthly_data: List[MonthlyData] = []
# Run monthly simulation
for month in range(1, config.months + 1):
# Calculate monthly income from distributions
monthly_income = self._calculate_monthly_distributions(
month, simulation_state, ticker_data, distribution_schedule
)
# Update cumulative income
simulation_state['cumulative_income'] += monthly_income
# Apply erosion to prices and yields
if erosion_config.erosion_type != "None":
self._apply_monthly_erosion(simulation_state, erosion_config, ticker_data.keys())
# Reinvest dividends (DRIP)
self._reinvest_dividends(month, simulation_state, distribution_schedule)
# Calculate total portfolio value
total_value = sum(
simulation_state['current_shares'][ticker] * simulation_state['current_prices'][ticker]
for ticker in ticker_data.keys()
)
# Create monthly data
monthly_data.append(MonthlyData(
month=month,
total_value=total_value,
monthly_income=monthly_income,
cumulative_income=simulation_state['cumulative_income'],
shares=simulation_state['current_shares'].copy(),
prices=simulation_state['current_prices'].copy(),
yields=simulation_state['current_yields'].copy()
))
# Calculate final results
return self._create_drip_result(monthly_data, simulation_state)
except Exception as e:
logger.error(f"Error calculating DRIP growth: {str(e)}")
logger.error(traceback.format_exc())
raise
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
"""Validate input parameters"""
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
if config.months <= 0:
raise ValueError("Months must be positive")
if portfolio_df.empty:
raise ValueError("Portfolio DataFrame is empty")
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
"""Initialize ticker data with validation"""
ticker_data = {}
for _, row in portfolio_df.iterrows():
ticker = row["Ticker"]
# Handle distribution frequency
dist_period = row.get("Distribution Period", "Monthly")
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
ticker_data[ticker] = TickerData(
ticker=ticker,
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
shares=max(0.0, float(row["Shares"])),
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
distribution_freq=dist_freq
)
return ticker_data
def _parse_erosion_config(self, config: DripConfig) -> ErosionConfig:
"""Parse and validate erosion configuration"""
if not hasattr(config, 'erosion_level') or config.erosion_type == "None":
return ErosionConfig(erosion_type="None")
erosion_level = config.erosion_level
if isinstance(erosion_level, dict):
return ErosionConfig(
erosion_type=config.erosion_type,
use_per_ticker=erosion_level.get("use_per_ticker", False),
global_nav_rate=self._normalize_erosion_rate(erosion_level.get("global", {}).get("nav", 0)),
global_yield_rate=self._normalize_erosion_rate(erosion_level.get("global", {}).get("yield", 0)),
per_ticker_rates=erosion_level.get("per_ticker", {})
)
return ErosionConfig(erosion_type="None")
def _normalize_erosion_rate(self, erosion_level: float) -> float:
"""Convert erosion level (0-9) to monthly rate with validation"""
rate = (erosion_level / self.MAX_EROSION_LEVEL) * self.MAX_MONTHLY_EROSION
return min(max(0.0, rate), self.MAX_MONTHLY_EROSION)
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
"""Pre-calculate which months each ticker pays distributions"""
schedule = {}
for ticker, data in ticker_data.items():
distribution_months = []
freq = data.distribution_freq
for month in range(1, total_months + 1):
if self._is_distribution_month(month, freq):
distribution_months.append(month)
schedule[ticker] = distribution_months
return schedule
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
"""Initialize simulation state variables"""
return {
'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()},
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
'cumulative_income': 0.0
}
def _calculate_monthly_distributions(
self,
month: int,
state: Dict[str, Any],
ticker_data: Dict[str, TickerData],
distribution_schedule: Dict[str, List[int]]
) -> float:
"""Calculate distributions for the current month"""
monthly_income = 0.0
for ticker, data in ticker_data.items():
if month in distribution_schedule[ticker]:
shares = state['current_shares'][ticker]
price = state['current_prices'][ticker]
yield_rate = state['current_yields'][ticker]
# Calculate distribution amount
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
distribution_amount = shares * price * distribution_yield
monthly_income += distribution_amount
return monthly_income
def _apply_monthly_erosion(
self,
state: Dict[str, Any],
erosion_config: ErosionConfig,
tickers: List[str]
) -> None:
"""Apply erosion to current prices and yields"""
for ticker in tickers:
if erosion_config.use_per_ticker and ticker in erosion_config.per_ticker_rates:
# Use per-ticker erosion rates
ticker_rates = erosion_config.per_ticker_rates[ticker]
nav_erosion = self._normalize_erosion_rate(ticker_rates.get("nav", 0))
yield_erosion = self._normalize_erosion_rate(ticker_rates.get("yield", 0))
else:
# Use global erosion rates
nav_erosion = erosion_config.global_nav_rate
yield_erosion = erosion_config.global_yield_rate
# Apply erosion with bounds checking
state['current_prices'][ticker] *= max(0.01, 1 - nav_erosion)
state['current_yields'][ticker] *= max(0.0, 1 - yield_erosion)
def _reinvest_dividends(
self,
month: int,
state: Dict[str, Any],
distribution_schedule: Dict[str, List[int]]
) -> None:
"""Reinvest dividends for tickers that distributed in this month"""
for ticker, distribution_months in distribution_schedule.items():
if month in distribution_months:
shares = state['current_shares'][ticker]
price = state['current_prices'][ticker]
yield_rate = state['current_yields'][ticker]
# Calculate dividend income
# Note: This uses the distribution frequency from the original ticker data
dividend_income = shares * price * yield_rate / 12 # Simplified monthly calculation
# Purchase additional shares
if price > 0:
new_shares = dividend_income / price
state['current_shares'][ticker] += new_shares
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
"""Check if current month is a distribution month"""
if frequency == DistributionFrequency.MONTHLY:
return True
elif frequency == DistributionFrequency.QUARTERLY:
return month % 3 == 0
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
return month % 6 == 0
elif frequency == DistributionFrequency.ANNUALLY:
return month % 12 == 0
else:
return True # Default to monthly for unknown
def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult:
"""Create final DRIP result object"""
if not monthly_data:
raise ValueError("No monthly data generated")
final_data = monthly_data[-1]
return DripResult(
monthly_data=monthly_data,
final_portfolio_value=final_data.total_value,
total_income=state['cumulative_income'],
total_shares=state['current_shares'].copy()
)
# Utility methods for analysis and comparison
def calculate_drip_vs_no_drip_comparison(
self,
portfolio_df: pd.DataFrame,
config: DripConfig
) -> Dict[str, Any]:
"""Calculate comparison between DRIP and no-DRIP scenarios"""
# Calculate DRIP scenario
drip_result = self.calculate_drip_growth(portfolio_df, config)
# Calculate no-DRIP scenario (dividends not reinvested)
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
# Calculate comparison metrics
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
return {
'drip_final_value': drip_result.final_portfolio_value,
'no_drip_final_value': no_drip_result['final_value'],
'drip_advantage': drip_advantage,
'percentage_advantage': percentage_advantage,
'total_dividends_reinvested': drip_result.total_income,
'cash_dividends_no_drip': no_drip_result['total_dividends']
}
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
"""Calculate scenario where dividends are not reinvested"""
ticker_data = self._initialize_ticker_data(portfolio_df)
erosion_config = self._parse_erosion_config(config)
state = self._initialize_simulation_state(ticker_data)
total_dividends = 0.0
for month in range(1, config.months + 1):
# Calculate dividends but don't reinvest
monthly_dividends = self._calculate_monthly_distributions(
month, state, ticker_data,
self._create_distribution_schedule(ticker_data, config.months)
)
total_dividends += monthly_dividends
# Apply erosion
if erosion_config.erosion_type != "None":
self._apply_monthly_erosion(state, erosion_config, ticker_data.keys())
final_value = sum(
state['current_shares'][ticker] * state['current_prices'][ticker]
for ticker in ticker_data.keys()
)
return {
'final_value': final_value,
'total_dividends': total_dividends
}
def forecast_portfolio(
self,
portfolio_df: pd.DataFrame,
config: DripConfig,
tickers: Optional[List[str]] = None
) -> DRIPPortfolioResult:
"""
Forecast DRIP growth for an entire portfolio.
Args:
portfolio_df: DataFrame containing portfolio allocation with columns:
- Ticker: ETF ticker symbol
- Price: Current price
- Yield (%): Annual dividend yield
- Shares: Number of shares
- Allocation (%): Portfolio allocation percentage
config: DripConfig object with simulation parameters
tickers: Optional list of tickers to include in the forecast. If None, all tickers are included.
Returns:
DRIPPortfolioResult object containing the forecast results
"""
try:
# Filter portfolio_df if tickers are specified
if tickers is not None:
portfolio_df = portfolio_df[portfolio_df['Ticker'].isin(tickers)].copy()
if portfolio_df.empty:
raise ValueError(f"No matching tickers found in portfolio: {tickers}")
# Calculate DRIP growth for the portfolio
result = self.calculate_drip_growth(portfolio_df, config)
# Convert the result to DRIPPortfolioResult format
etf_results = {}
for ticker in portfolio_df['Ticker'].unique():
ticker_data = portfolio_df[portfolio_df['Ticker'] == ticker].iloc[0]
initial_shares = float(ticker_data['Shares'])
initial_value = initial_shares * float(ticker_data['Price'])
# Get final values from the result
final_shares = result.total_shares.get(ticker, initial_shares)
final_value = final_shares * float(ticker_data['Price'])
# Calculate metrics
total_income = sum(
month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares']))
for month_data in result.monthly_data
)
average_yield = float(ticker_data['Yield (%)']) / 100
# Create monthly metrics
monthly_metrics = []
for month_data in result.monthly_data:
metrics = DRIPMetrics(
ticker=ticker,
date=pd.Timestamp.now() + pd.DateOffset(months=month_data.month),
shares=month_data.shares.get(ticker, initial_shares),
price=month_data.prices.get(ticker, float(ticker_data['Price'])),
dividend_yield=month_data.yields.get(ticker, average_yield),
monthly_dividend=month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares'])),
new_shares=month_data.shares.get(ticker, initial_shares) - initial_shares,
portfolio_value=month_data.total_value * (initial_shares / sum(portfolio_df['Shares'])),
monthly_income=month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares'])),
yield_on_cost=average_yield
)
monthly_metrics.append(metrics)
# Create forecast result for this ETF
etf_results[ticker] = DRIPForecastResult(
ticker=ticker,
initial_shares=initial_shares,
final_shares=final_shares,
initial_value=initial_value,
final_value=final_value,
total_income=total_income,
average_yield=average_yield,
monthly_metrics=monthly_metrics
)
# Create and return the portfolio result
return DRIPPortfolioResult(
total_value=result.final_portfolio_value,
monthly_income=result.monthly_data[-1].monthly_income,
total_income=result.total_income,
etf_results=etf_results
)
except Exception as e:
logger.error(f"Error forecasting portfolio: {str(e)}")
logger.error(traceback.format_exc())
raise