Compare commits
2 Commits
30e1bbcbd9
...
30dc087ce3
| Author | SHA1 | Date | |
|---|---|---|---|
| 30dc087ce3 | |||
| 19f713673e |
@ -1,9 +1,13 @@
|
|||||||
from .service import DRIPService
|
from .service import DRIPService
|
||||||
|
from .no_drip_service import NoDRIPService, NoDRIPMonthlyData, NoDRIPResult
|
||||||
from .models import DRIPMetrics, DRIPForecastResult, DRIPPortfolioResult, DripConfig
|
from .models import DRIPMetrics, DRIPForecastResult, DRIPPortfolioResult, DripConfig
|
||||||
from .exceptions import DRIPError, DataFetchError, CalculationError, ValidationError, CacheError
|
from .exceptions import DRIPError, DataFetchError, CalculationError, ValidationError, CacheError
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'DRIPService',
|
'DRIPService',
|
||||||
|
'NoDRIPService',
|
||||||
|
'NoDRIPMonthlyData',
|
||||||
|
'NoDRIPResult',
|
||||||
'DRIPMetrics',
|
'DRIPMetrics',
|
||||||
'DRIPForecastResult',
|
'DRIPForecastResult',
|
||||||
'DRIPPortfolioResult',
|
'DRIPPortfolioResult',
|
||||||
|
|||||||
427
ETF_Portal/services/drip_service/no_drip_service.py
Normal file
427
ETF_Portal/services/drip_service/no_drip_service.py
Normal file
@ -0,0 +1,427 @@
|
|||||||
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
import logging
|
||||||
|
import traceback
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from .models import (
|
||||||
|
MonthlyData,
|
||||||
|
DripConfig,
|
||||||
|
DripResult,
|
||||||
|
DRIPMetrics,
|
||||||
|
DRIPForecastResult,
|
||||||
|
DRIPPortfolioResult
|
||||||
|
)
|
||||||
|
from ..nav_erosion_service import NavErosionService
|
||||||
|
|
||||||
|
# Duplicate necessary classes to avoid circular import
|
||||||
|
class DistributionFrequency(Enum):
|
||||||
|
"""Enum for distribution frequencies"""
|
||||||
|
MONTHLY = ("Monthly", 12)
|
||||||
|
QUARTERLY = ("Quarterly", 4)
|
||||||
|
SEMI_ANNUALLY = ("Semi-Annually", 2)
|
||||||
|
ANNUALLY = ("Annually", 1)
|
||||||
|
UNKNOWN = ("Unknown", 12)
|
||||||
|
|
||||||
|
def __init__(self, name: str, payments_per_year: int):
|
||||||
|
self.display_name = name
|
||||||
|
self.payments_per_year = payments_per_year
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TickerData:
|
||||||
|
"""Data structure for individual ticker information"""
|
||||||
|
ticker: str
|
||||||
|
price: float
|
||||||
|
annual_yield: float
|
||||||
|
shares: float
|
||||||
|
allocation_pct: float
|
||||||
|
distribution_freq: DistributionFrequency
|
||||||
|
|
||||||
|
@property
|
||||||
|
def market_value(self) -> float:
|
||||||
|
return self.price * self.shares
|
||||||
|
|
||||||
|
@property
|
||||||
|
def monthly_yield(self) -> float:
|
||||||
|
return self.annual_yield / 12
|
||||||
|
|
||||||
|
@property
|
||||||
|
def distribution_yield(self) -> float:
|
||||||
|
return self.annual_yield / self.distribution_freq.payments_per_year
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ErosionConfig:
|
||||||
|
"""Configuration for erosion calculations"""
|
||||||
|
erosion_type: str
|
||||||
|
erosion_level: Dict[str, Dict[str, float]] # Changed to match NavErosionService output
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
__all__ = ['NoDRIPService', 'NoDRIPMonthlyData', 'NoDRIPResult']
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NoDRIPMonthlyData:
|
||||||
|
"""Data for a single month in the No-DRIP simulation"""
|
||||||
|
month: int
|
||||||
|
portfolio_value: float # Original shares * current prices
|
||||||
|
monthly_income: float # Dividends received as cash
|
||||||
|
cumulative_income: float # Total cash accumulated
|
||||||
|
prices: Dict[str, float] # Current (eroded) prices
|
||||||
|
yields: Dict[str, float] # Current (eroded) yields
|
||||||
|
original_shares: Dict[str, float] # Original shares (constant)
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NoDRIPResult:
|
||||||
|
"""Results of a No-DRIP calculation"""
|
||||||
|
monthly_data: List[NoDRIPMonthlyData]
|
||||||
|
final_portfolio_value: float # Original shares * final prices
|
||||||
|
total_cash_income: float # All dividends as cash
|
||||||
|
total_value: float # Portfolio value + cash
|
||||||
|
original_shares: Dict[str, float] # Original share counts
|
||||||
|
|
||||||
|
class NoDRIPService:
|
||||||
|
"""No-DRIP calculation service - dividends are kept as cash, not reinvested"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.MAX_EROSION_LEVEL = 9
|
||||||
|
self.MAX_MONTHLY_EROSION = 0.05 # 5% monthly max erosion
|
||||||
|
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
||||||
|
self.nav_erosion_service = NavErosionService()
|
||||||
|
|
||||||
|
def calculate_no_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> NoDRIPResult:
|
||||||
|
"""
|
||||||
|
Calculate No-DRIP growth for a portfolio over a specified period.
|
||||||
|
In No-DRIP strategy, dividends are kept as cash and not reinvested.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
portfolio_df: DataFrame containing portfolio allocation
|
||||||
|
config: DripConfig object with simulation parameters
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
NoDRIPResult object containing the simulation results
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Validate inputs (reuse from DRIP service)
|
||||||
|
self._validate_inputs(portfolio_df, config)
|
||||||
|
|
||||||
|
# Initialize portfolio data
|
||||||
|
ticker_data = self._initialize_ticker_data(portfolio_df)
|
||||||
|
|
||||||
|
# Handle erosion configuration
|
||||||
|
erosion_config = self._parse_erosion_config(config)
|
||||||
|
|
||||||
|
# If erosion is requested but no proper erosion_level is provided, calculate it
|
||||||
|
if (config.erosion_type != "None" and
|
||||||
|
(not hasattr(config, 'erosion_level') or
|
||||||
|
not isinstance(config.erosion_level, dict) or
|
||||||
|
"per_ticker" not in config.erosion_level)):
|
||||||
|
|
||||||
|
logger.info(f"Calculating erosion rates for No-DRIP with erosion type: {config.erosion_type}")
|
||||||
|
tickers = list(ticker_data.keys())
|
||||||
|
calculated_erosion = self._calculate_erosion_from_analysis(tickers)
|
||||||
|
erosion_config = ErosionConfig(
|
||||||
|
erosion_type=config.erosion_type,
|
||||||
|
erosion_level=calculated_erosion
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pre-calculate distribution schedule for performance
|
||||||
|
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
|
||||||
|
|
||||||
|
# Initialize simulation state (shares remain constant in No-DRIP)
|
||||||
|
simulation_state = self._initialize_simulation_state(ticker_data)
|
||||||
|
monthly_data: List[NoDRIPMonthlyData] = []
|
||||||
|
|
||||||
|
# Create monthly tracking table
|
||||||
|
monthly_tracking = []
|
||||||
|
|
||||||
|
# Run monthly simulation
|
||||||
|
for month in range(1, config.months + 1):
|
||||||
|
# Calculate monthly income from distributions (keep as cash)
|
||||||
|
monthly_income = self._calculate_monthly_distributions(
|
||||||
|
month, simulation_state, ticker_data, distribution_schedule
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update cumulative cash income
|
||||||
|
simulation_state['cumulative_cash'] += monthly_income
|
||||||
|
|
||||||
|
# Apply erosion to prices and yields (but NOT to shares)
|
||||||
|
if erosion_config.erosion_type != "None":
|
||||||
|
self._apply_monthly_erosion(simulation_state, erosion_config, ticker_data.keys())
|
||||||
|
|
||||||
|
# Calculate portfolio value (original shares * current eroded prices)
|
||||||
|
portfolio_value = sum(
|
||||||
|
simulation_state['original_shares'][ticker] * simulation_state['current_prices'][ticker]
|
||||||
|
for ticker in ticker_data.keys()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Total value = portfolio + cash
|
||||||
|
total_value = portfolio_value + simulation_state['cumulative_cash']
|
||||||
|
|
||||||
|
# Add to monthly tracking
|
||||||
|
monthly_tracking.append({
|
||||||
|
'Month': month,
|
||||||
|
'Portfolio Value': portfolio_value,
|
||||||
|
'Monthly Income': monthly_income,
|
||||||
|
'Cumulative Income': simulation_state['cumulative_cash'],
|
||||||
|
'Total Value': total_value,
|
||||||
|
'Prices': {ticker: simulation_state['current_prices'][ticker] for ticker in ticker_data.keys()},
|
||||||
|
'Yields': {ticker: simulation_state['current_yields'][ticker] for ticker in ticker_data.keys()}
|
||||||
|
})
|
||||||
|
|
||||||
|
# Create monthly data
|
||||||
|
monthly_data.append(NoDRIPMonthlyData(
|
||||||
|
month=month,
|
||||||
|
portfolio_value=portfolio_value,
|
||||||
|
monthly_income=monthly_income,
|
||||||
|
cumulative_income=simulation_state['cumulative_cash'],
|
||||||
|
prices=simulation_state['current_prices'].copy(),
|
||||||
|
yields=simulation_state['current_yields'].copy(),
|
||||||
|
original_shares=simulation_state['original_shares'].copy()
|
||||||
|
))
|
||||||
|
|
||||||
|
# Print monthly tracking table
|
||||||
|
print("\nMonthly No-DRIP Simulation Results:")
|
||||||
|
print("=" * 100)
|
||||||
|
print(f"{'Month':<6} {'Portfolio Value':<15} {'Monthly Income':<15} {'Cumulative Income':<18} {'Total Value':<15}")
|
||||||
|
print("-" * 100)
|
||||||
|
|
||||||
|
for month_data in monthly_tracking:
|
||||||
|
print(f"{month_data['Month']:<6} ${month_data['Portfolio Value']:<14.2f} ${month_data['Monthly Income']:<14.2f} ${month_data['Cumulative Income']:<17.2f} ${month_data['Total Value']:<14.2f}")
|
||||||
|
|
||||||
|
print("=" * 100)
|
||||||
|
|
||||||
|
# Calculate final results
|
||||||
|
return self._create_no_drip_result(monthly_data, simulation_state)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calculating No-DRIP growth: {str(e)}")
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
|
||||||
|
"""Validate input parameters (reuse from DRIP service)"""
|
||||||
|
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
|
||||||
|
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
|
||||||
|
|
||||||
|
if missing_columns:
|
||||||
|
raise ValueError(f"Missing required columns: {missing_columns}")
|
||||||
|
|
||||||
|
if config.months <= 0:
|
||||||
|
raise ValueError("Months must be positive")
|
||||||
|
|
||||||
|
if portfolio_df.empty:
|
||||||
|
raise ValueError("Portfolio DataFrame is empty")
|
||||||
|
|
||||||
|
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
|
||||||
|
"""Initialize ticker data with validation (reuse from DRIP service)"""
|
||||||
|
ticker_data = {}
|
||||||
|
|
||||||
|
for _, row in portfolio_df.iterrows():
|
||||||
|
ticker = row["Ticker"]
|
||||||
|
|
||||||
|
# Handle distribution frequency
|
||||||
|
dist_period = row.get("Distribution Period", "Monthly")
|
||||||
|
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
|
||||||
|
|
||||||
|
ticker_data[ticker] = TickerData(
|
||||||
|
ticker=ticker,
|
||||||
|
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
|
||||||
|
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
|
||||||
|
shares=max(0.0, float(row["Shares"])),
|
||||||
|
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
|
||||||
|
distribution_freq=dist_freq
|
||||||
|
)
|
||||||
|
|
||||||
|
return ticker_data
|
||||||
|
|
||||||
|
def _parse_erosion_config(self, config: DripConfig) -> ErosionConfig:
|
||||||
|
"""Parse and validate erosion configuration (reuse from DRIP service)"""
|
||||||
|
if not hasattr(config, 'erosion_level') or config.erosion_type == "None":
|
||||||
|
return ErosionConfig(erosion_type="None", erosion_level={})
|
||||||
|
|
||||||
|
# Check if erosion_level is already in the correct format
|
||||||
|
if isinstance(config.erosion_level, dict) and "per_ticker" in config.erosion_level:
|
||||||
|
return ErosionConfig(
|
||||||
|
erosion_type=config.erosion_type,
|
||||||
|
erosion_level=config.erosion_level
|
||||||
|
)
|
||||||
|
|
||||||
|
return ErosionConfig(
|
||||||
|
erosion_type=config.erosion_type,
|
||||||
|
erosion_level=config.erosion_level
|
||||||
|
)
|
||||||
|
|
||||||
|
def _calculate_erosion_from_analysis(self, tickers: List[str]) -> Dict:
|
||||||
|
"""Calculate erosion rates using NavErosionService (reuse from DRIP service)"""
|
||||||
|
try:
|
||||||
|
# Use NavErosionService to analyze the tickers
|
||||||
|
analysis = self.nav_erosion_service.analyze_etf_erosion_risk(tickers)
|
||||||
|
|
||||||
|
# Convert to format expected by No-DRIP service
|
||||||
|
erosion_config = self.nav_erosion_service.convert_to_drip_erosion_config(analysis)
|
||||||
|
|
||||||
|
logger.info(f"Calculated erosion rates for No-DRIP tickers: {tickers}")
|
||||||
|
logger.info(f"Erosion configuration: {erosion_config}")
|
||||||
|
|
||||||
|
return erosion_config
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calculating erosion rates for No-DRIP: {str(e)}")
|
||||||
|
logger.warning("Falling back to no erosion")
|
||||||
|
return {"per_ticker": {ticker: {"nav": 0.0, "yield": 0.0} for ticker in tickers}}
|
||||||
|
|
||||||
|
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
|
||||||
|
"""Pre-calculate which months each ticker pays distributions (reuse from DRIP service)"""
|
||||||
|
schedule = {}
|
||||||
|
|
||||||
|
for ticker, data in ticker_data.items():
|
||||||
|
distribution_months = []
|
||||||
|
freq = data.distribution_freq
|
||||||
|
|
||||||
|
for month in range(1, total_months + 1):
|
||||||
|
if self._is_distribution_month(month, freq):
|
||||||
|
distribution_months.append(month)
|
||||||
|
|
||||||
|
schedule[ticker] = distribution_months
|
||||||
|
|
||||||
|
return schedule
|
||||||
|
|
||||||
|
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
|
||||||
|
"""Initialize simulation state variables"""
|
||||||
|
return {
|
||||||
|
'original_shares': {ticker: data.shares for ticker, data in ticker_data.items()}, # Constant
|
||||||
|
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
|
||||||
|
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
|
||||||
|
'cumulative_cash': 0.0 # Cash accumulated from dividends
|
||||||
|
}
|
||||||
|
|
||||||
|
def _calculate_monthly_distributions(
|
||||||
|
self,
|
||||||
|
month: int,
|
||||||
|
state: Dict[str, Any],
|
||||||
|
ticker_data: Dict[str, TickerData],
|
||||||
|
distribution_schedule: Dict[str, List[int]]
|
||||||
|
) -> float:
|
||||||
|
"""Calculate distributions for the current month (reuse logic from DRIP service)"""
|
||||||
|
monthly_income = 0.0
|
||||||
|
|
||||||
|
for ticker, data in ticker_data.items():
|
||||||
|
if month in distribution_schedule[ticker]:
|
||||||
|
shares = state['original_shares'][ticker] # Original shares (constant)
|
||||||
|
price = state['current_prices'][ticker]
|
||||||
|
yield_rate = state['current_yields'][ticker]
|
||||||
|
|
||||||
|
# Calculate distribution amount using current (eroded) values
|
||||||
|
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
|
||||||
|
distribution_amount = shares * price * distribution_yield
|
||||||
|
monthly_income += distribution_amount
|
||||||
|
|
||||||
|
# Log distribution calculation
|
||||||
|
logger.info(f"Month {month} No-DRIP distribution for {ticker}:")
|
||||||
|
logger.info(f" Shares: {shares:.4f} (constant)")
|
||||||
|
logger.info(f" Price: ${price:.2f}")
|
||||||
|
logger.info(f" Yield: {yield_rate:.2%}")
|
||||||
|
logger.info(f" Distribution: ${distribution_amount:.2f}")
|
||||||
|
|
||||||
|
return monthly_income
|
||||||
|
|
||||||
|
def _apply_monthly_erosion(
|
||||||
|
self,
|
||||||
|
state: Dict[str, Any],
|
||||||
|
erosion_config: ErosionConfig,
|
||||||
|
tickers: List[str]
|
||||||
|
) -> None:
|
||||||
|
"""Apply monthly erosion to prices and yields"""
|
||||||
|
if erosion_config.erosion_type == "None":
|
||||||
|
return
|
||||||
|
|
||||||
|
# Validate erosion configuration structure
|
||||||
|
if not isinstance(erosion_config.erosion_level, dict):
|
||||||
|
logger.warning(f"Invalid erosion_level format: {type(erosion_config.erosion_level)}")
|
||||||
|
return
|
||||||
|
|
||||||
|
per_ticker_data = erosion_config.erosion_level.get("per_ticker", {})
|
||||||
|
if not per_ticker_data:
|
||||||
|
logger.warning("No per_ticker erosion data found in erosion_level")
|
||||||
|
return
|
||||||
|
|
||||||
|
for ticker in tickers:
|
||||||
|
# Get per-ticker erosion rates with fallback
|
||||||
|
ticker_rates = per_ticker_data.get(ticker, {})
|
||||||
|
|
||||||
|
if not ticker_rates:
|
||||||
|
logger.warning(f"No erosion rates found for ticker {ticker}, skipping erosion")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get annual erosion rates (assumed to be in percentage form, e.g., 6.7 for 6.7%)
|
||||||
|
nav_annual_pct = ticker_rates.get("nav", 0.0)
|
||||||
|
yield_annual_pct = ticker_rates.get("yield", 0.0)
|
||||||
|
|
||||||
|
# Convert annual percentage to monthly rate using compound formula
|
||||||
|
# Monthly rate = (1 + annual_rate)^(1/12) - 1
|
||||||
|
nav_annual_rate = nav_annual_pct / 100.0 # Convert percentage to decimal
|
||||||
|
yield_annual_rate = yield_annual_pct / 100.0 # Convert percentage to decimal
|
||||||
|
|
||||||
|
# Calculate monthly erosion rates using compound formula
|
||||||
|
if nav_annual_rate > 0:
|
||||||
|
nav_monthly_rate = (1 + nav_annual_rate) ** (1/12) - 1
|
||||||
|
else:
|
||||||
|
nav_monthly_rate = 0.0
|
||||||
|
|
||||||
|
if yield_annual_rate > 0:
|
||||||
|
yield_monthly_rate = (1 + yield_annual_rate) ** (1/12) - 1
|
||||||
|
else:
|
||||||
|
yield_monthly_rate = 0.0
|
||||||
|
|
||||||
|
# Validate rates are reasonable (0 to 5% monthly max)
|
||||||
|
nav_monthly_rate = max(0.0, min(nav_monthly_rate, self.MAX_MONTHLY_EROSION))
|
||||||
|
yield_monthly_rate = max(0.0, min(yield_monthly_rate, self.MAX_MONTHLY_EROSION))
|
||||||
|
|
||||||
|
# Store original values for logging
|
||||||
|
original_price = state['current_prices'][ticker]
|
||||||
|
original_yield = state['current_yields'][ticker]
|
||||||
|
|
||||||
|
# Apply monthly erosion
|
||||||
|
state['current_prices'][ticker] *= (1 - nav_monthly_rate)
|
||||||
|
state['current_yields'][ticker] *= (1 - yield_monthly_rate)
|
||||||
|
|
||||||
|
# Ensure prices and yields don't go below reasonable minimums
|
||||||
|
state['current_prices'][ticker] = max(state['current_prices'][ticker], 0.01)
|
||||||
|
state['current_yields'][ticker] = max(state['current_yields'][ticker], 0.0)
|
||||||
|
|
||||||
|
# Log erosion application with both annual and monthly rates
|
||||||
|
logger.info(f"Applied monthly erosion to {ticker} (No-DRIP):")
|
||||||
|
logger.info(f" NAV: {nav_annual_pct:.1f}% annual -> {nav_monthly_rate:.4%} monthly")
|
||||||
|
logger.info(f" Price: ${original_price:.2f} -> ${state['current_prices'][ticker]:.2f}")
|
||||||
|
logger.info(f" Yield: {yield_annual_pct:.1f}% annual -> {yield_monthly_rate:.4%} monthly")
|
||||||
|
logger.info(f" Yield: {original_yield:.2%} -> {state['current_yields'][ticker]:.2%}")
|
||||||
|
|
||||||
|
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
|
||||||
|
"""Check if current month is a distribution month (reuse from DRIP service)"""
|
||||||
|
if frequency == DistributionFrequency.MONTHLY:
|
||||||
|
return True
|
||||||
|
elif frequency == DistributionFrequency.QUARTERLY:
|
||||||
|
return month % 3 == 0
|
||||||
|
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
|
||||||
|
return month % 6 == 0
|
||||||
|
elif frequency == DistributionFrequency.ANNUALLY:
|
||||||
|
return month % 12 == 0
|
||||||
|
else:
|
||||||
|
return True # Default to monthly for unknown
|
||||||
|
|
||||||
|
def _create_no_drip_result(self, monthly_data: List[NoDRIPMonthlyData], state: Dict[str, Any]) -> NoDRIPResult:
|
||||||
|
"""Create final No-DRIP result object"""
|
||||||
|
if not monthly_data:
|
||||||
|
raise ValueError("No monthly data generated")
|
||||||
|
|
||||||
|
final_data = monthly_data[-1]
|
||||||
|
|
||||||
|
return NoDRIPResult(
|
||||||
|
monthly_data=monthly_data,
|
||||||
|
final_portfolio_value=final_data.portfolio_value,
|
||||||
|
total_cash_income=state['cumulative_cash'],
|
||||||
|
total_value=final_data.portfolio_value + state['cumulative_cash'],
|
||||||
|
original_shares=state['original_shares'].copy()
|
||||||
|
)
|
||||||
@ -68,6 +68,7 @@ class DRIPService:
|
|||||||
self.MAX_MONTHLY_EROSION = 0.05 # 5% monthly max erosion
|
self.MAX_MONTHLY_EROSION = 0.05 # 5% monthly max erosion
|
||||||
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
||||||
self.nav_erosion_service = NavErosionService()
|
self.nav_erosion_service = NavErosionService()
|
||||||
|
self.no_drip_service = None # Will be initialized when needed to avoid circular import
|
||||||
|
|
||||||
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
||||||
"""
|
"""
|
||||||
@ -349,29 +350,48 @@ class DRIPService:
|
|||||||
logger.warning(f"No erosion rates found for ticker {ticker}, skipping erosion")
|
logger.warning(f"No erosion rates found for ticker {ticker}, skipping erosion")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
nav_rate = ticker_rates.get("nav", 0.0) # Monthly rate in decimal form
|
# Get annual erosion rates (assumed to be in percentage form, e.g., 6.7 for 6.7%)
|
||||||
yield_rate = ticker_rates.get("yield", 0.0) # Monthly rate in decimal form
|
nav_annual_pct = ticker_rates.get("nav", 0.0)
|
||||||
|
yield_annual_pct = ticker_rates.get("yield", 0.0)
|
||||||
|
|
||||||
|
# Convert annual percentage to monthly rate using compound formula
|
||||||
|
# Monthly rate = (1 + annual_rate)^(1/12) - 1
|
||||||
|
nav_annual_rate = nav_annual_pct / 100.0 # Convert percentage to decimal
|
||||||
|
yield_annual_rate = yield_annual_pct / 100.0 # Convert percentage to decimal
|
||||||
|
|
||||||
|
# Calculate monthly erosion rates using compound formula
|
||||||
|
if nav_annual_rate > 0:
|
||||||
|
nav_monthly_rate = (1 + nav_annual_rate) ** (1/12) - 1
|
||||||
|
else:
|
||||||
|
nav_monthly_rate = 0.0
|
||||||
|
|
||||||
|
if yield_annual_rate > 0:
|
||||||
|
yield_monthly_rate = (1 + yield_annual_rate) ** (1/12) - 1
|
||||||
|
else:
|
||||||
|
yield_monthly_rate = 0.0
|
||||||
|
|
||||||
# Validate rates are reasonable (0 to 5% monthly max)
|
# Validate rates are reasonable (0 to 5% monthly max)
|
||||||
nav_rate = max(0.0, min(nav_rate, self.MAX_MONTHLY_EROSION))
|
nav_monthly_rate = max(0.0, min(nav_monthly_rate, self.MAX_MONTHLY_EROSION))
|
||||||
yield_rate = max(0.0, min(yield_rate, self.MAX_MONTHLY_EROSION))
|
yield_monthly_rate = max(0.0, min(yield_monthly_rate, self.MAX_MONTHLY_EROSION))
|
||||||
|
|
||||||
# Store original values for logging
|
# Store original values for logging
|
||||||
original_price = state['current_prices'][ticker]
|
original_price = state['current_prices'][ticker]
|
||||||
original_yield = state['current_yields'][ticker]
|
original_yield = state['current_yields'][ticker]
|
||||||
|
|
||||||
# Apply erosion directly (rates are already monthly)
|
# Apply monthly erosion
|
||||||
state['current_prices'][ticker] *= (1 - nav_rate)
|
state['current_prices'][ticker] *= (1 - nav_monthly_rate)
|
||||||
state['current_yields'][ticker] *= (1 - yield_rate)
|
state['current_yields'][ticker] *= (1 - yield_monthly_rate)
|
||||||
|
|
||||||
# Ensure prices and yields don't go below reasonable minimums
|
# Ensure prices and yields don't go below reasonable minimums
|
||||||
state['current_prices'][ticker] = max(state['current_prices'][ticker], 0.01)
|
state['current_prices'][ticker] = max(state['current_prices'][ticker], 0.01)
|
||||||
state['current_yields'][ticker] = max(state['current_yields'][ticker], 0.0)
|
state['current_yields'][ticker] = max(state['current_yields'][ticker], 0.0)
|
||||||
|
|
||||||
# Log erosion application
|
# Log erosion application with both annual and monthly rates
|
||||||
logger.info(f"Applied monthly erosion to {ticker}:")
|
logger.info(f"Applied monthly erosion to {ticker}:")
|
||||||
logger.info(f" NAV: {nav_rate:.4%} -> Price: ${original_price:.2f} -> ${state['current_prices'][ticker]:.2f}")
|
logger.info(f" NAV: {nav_annual_pct:.1f}% annual -> {nav_monthly_rate:.4%} monthly")
|
||||||
logger.info(f" Yield: {yield_rate:.4%} -> Yield: {original_yield:.2%} -> {state['current_yields'][ticker]:.2%}")
|
logger.info(f" Price: ${original_price:.2f} -> ${state['current_prices'][ticker]:.2f}")
|
||||||
|
logger.info(f" Yield: {yield_annual_pct:.1f}% annual -> {yield_monthly_rate:.4%} monthly")
|
||||||
|
logger.info(f" Yield: {original_yield:.2%} -> {state['current_yields'][ticker]:.2%}")
|
||||||
|
|
||||||
def _reinvest_dividends(
|
def _reinvest_dividends(
|
||||||
self,
|
self,
|
||||||
@ -428,79 +448,8 @@ class DRIPService:
|
|||||||
total_shares=state['current_shares'].copy()
|
total_shares=state['current_shares'].copy()
|
||||||
)
|
)
|
||||||
|
|
||||||
# Utility methods for analysis and comparison
|
# Utility methods for analysis and comparison - duplicate method removed
|
||||||
def calculate_drip_vs_no_drip_comparison(
|
# The main calculate_drip_vs_no_drip_comparison method is defined below
|
||||||
self,
|
|
||||||
portfolio_df: pd.DataFrame,
|
|
||||||
config: DripConfig
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""Calculate comparison between DRIP and no-DRIP scenarios"""
|
|
||||||
|
|
||||||
# Calculate DRIP scenario
|
|
||||||
drip_result = self.calculate_drip_growth(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate no-DRIP scenario (dividends not reinvested)
|
|
||||||
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate comparison metrics
|
|
||||||
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
|
|
||||||
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
|
|
||||||
|
|
||||||
return {
|
|
||||||
'drip_final_value': drip_result.final_portfolio_value,
|
|
||||||
'no_drip_final_value': no_drip_result['final_value'],
|
|
||||||
'drip_advantage': drip_advantage,
|
|
||||||
'percentage_advantage': percentage_advantage,
|
|
||||||
'total_dividends_reinvested': drip_result.total_income,
|
|
||||||
'cash_dividends_no_drip': no_drip_result['total_dividends']
|
|
||||||
}
|
|
||||||
|
|
||||||
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
|
|
||||||
"""Calculate scenario where dividends are not reinvested"""
|
|
||||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
|
||||||
|
|
||||||
# Handle erosion configuration same way as main calculation
|
|
||||||
erosion_config = self._parse_erosion_config(config)
|
|
||||||
|
|
||||||
# If erosion is requested but no proper erosion_level is provided, calculate it
|
|
||||||
if (config.erosion_type != "None" and
|
|
||||||
(not hasattr(config, 'erosion_level') or
|
|
||||||
not isinstance(config.erosion_level, dict) or
|
|
||||||
"per_ticker" not in config.erosion_level)):
|
|
||||||
|
|
||||||
logger.info(f"Calculating erosion rates for no-DRIP scenario with erosion type: {config.erosion_type}")
|
|
||||||
tickers = list(ticker_data.keys())
|
|
||||||
calculated_erosion = self.calculate_erosion_from_analysis(tickers)
|
|
||||||
erosion_config = ErosionConfig(
|
|
||||||
erosion_type=config.erosion_type,
|
|
||||||
erosion_level=calculated_erosion
|
|
||||||
)
|
|
||||||
|
|
||||||
state = self._initialize_simulation_state(ticker_data)
|
|
||||||
|
|
||||||
total_dividends = 0.0
|
|
||||||
|
|
||||||
for month in range(1, config.months + 1):
|
|
||||||
# Calculate dividends but don't reinvest
|
|
||||||
monthly_dividends = self._calculate_monthly_distributions(
|
|
||||||
month, state, ticker_data,
|
|
||||||
self._create_distribution_schedule(ticker_data, config.months)
|
|
||||||
)
|
|
||||||
total_dividends += monthly_dividends
|
|
||||||
|
|
||||||
# Apply erosion
|
|
||||||
if erosion_config.erosion_type != "None":
|
|
||||||
self._apply_monthly_erosion(state, erosion_config, ticker_data.keys())
|
|
||||||
|
|
||||||
final_value = sum(
|
|
||||||
state['current_shares'][ticker] * state['current_prices'][ticker]
|
|
||||||
for ticker in ticker_data.keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'final_value': final_value,
|
|
||||||
'total_dividends': total_dividends
|
|
||||||
}
|
|
||||||
|
|
||||||
def forecast_portfolio(
|
def forecast_portfolio(
|
||||||
self,
|
self,
|
||||||
@ -594,3 +543,236 @@ class DRIPService:
|
|||||||
logger.error(f"Error forecasting portfolio: {str(e)}")
|
logger.error(f"Error forecasting portfolio: {str(e)}")
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def calculate_drip_vs_no_drip_comparison(
|
||||||
|
self,
|
||||||
|
portfolio_df: pd.DataFrame,
|
||||||
|
config: DripConfig
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Calculate and compare DRIP vs No-DRIP strategies with detailed analysis.
|
||||||
|
This method runs both simulations and displays comparison tables.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
portfolio_df: DataFrame containing portfolio allocation
|
||||||
|
config: DripConfig object with simulation parameters
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing both results and comparison analysis
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Initialize No-DRIP service if needed
|
||||||
|
if self.no_drip_service is None:
|
||||||
|
from .no_drip_service import NoDRIPService
|
||||||
|
self.no_drip_service = NoDRIPService()
|
||||||
|
|
||||||
|
# Calculate initial investment
|
||||||
|
initial_investment = (portfolio_df['Price'] * portfolio_df['Shares']).sum()
|
||||||
|
|
||||||
|
# Run DRIP simulation (this will print the DRIP table)
|
||||||
|
logger.info("Running DRIP simulation...")
|
||||||
|
drip_result = self.calculate_drip_growth(portfolio_df, config)
|
||||||
|
|
||||||
|
# Run No-DRIP simulation (this will print the No-DRIP table)
|
||||||
|
logger.info("Running No-DRIP simulation...")
|
||||||
|
no_drip_result = self.no_drip_service.calculate_no_drip_growth(portfolio_df, config)
|
||||||
|
|
||||||
|
# Calculate break-even analysis
|
||||||
|
drip_break_even = self._calculate_break_even_analysis(
|
||||||
|
"DRIP", drip_result.monthly_data, initial_investment,
|
||||||
|
lambda md: md.total_value
|
||||||
|
)
|
||||||
|
|
||||||
|
no_drip_break_even = self._calculate_break_even_analysis(
|
||||||
|
"No-DRIP", no_drip_result.monthly_data, initial_investment,
|
||||||
|
lambda md: md.portfolio_value + md.cumulative_income
|
||||||
|
)
|
||||||
|
|
||||||
|
# Determine winner
|
||||||
|
drip_final = drip_result.final_portfolio_value
|
||||||
|
no_drip_final = no_drip_result.total_value
|
||||||
|
|
||||||
|
if drip_final > no_drip_final:
|
||||||
|
winner = "DRIP"
|
||||||
|
advantage_amount = drip_final - no_drip_final
|
||||||
|
advantage_percentage = (advantage_amount / no_drip_final) * 100
|
||||||
|
elif no_drip_final > drip_final:
|
||||||
|
winner = "No-DRIP"
|
||||||
|
advantage_amount = no_drip_final - drip_final
|
||||||
|
advantage_percentage = (advantage_amount / drip_final) * 100
|
||||||
|
else:
|
||||||
|
winner = "Tie"
|
||||||
|
advantage_amount = 0.0
|
||||||
|
advantage_percentage = 0.0
|
||||||
|
|
||||||
|
# Print comparison table
|
||||||
|
self._print_strategy_comparison(
|
||||||
|
drip_result, no_drip_result, initial_investment,
|
||||||
|
winner, advantage_amount, advantage_percentage,
|
||||||
|
drip_break_even, no_drip_break_even
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'drip_result': drip_result,
|
||||||
|
'no_drip_result': no_drip_result,
|
||||||
|
'initial_investment': initial_investment,
|
||||||
|
'drip_final_value': drip_final,
|
||||||
|
'no_drip_final_value': no_drip_final,
|
||||||
|
'winner': winner,
|
||||||
|
'advantage_amount': advantage_amount,
|
||||||
|
'advantage_percentage': advantage_percentage,
|
||||||
|
'drip_break_even': drip_break_even,
|
||||||
|
'no_drip_break_even': no_drip_break_even,
|
||||||
|
'comparison_summary': self._generate_comparison_summary(
|
||||||
|
drip_final, no_drip_final, initial_investment, winner, advantage_percentage
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in DRIP vs No-DRIP comparison: {str(e)}")
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _calculate_break_even_analysis(
|
||||||
|
self,
|
||||||
|
strategy_name: str,
|
||||||
|
monthly_data: List,
|
||||||
|
initial_investment: float,
|
||||||
|
value_extractor: callable
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Calculate break-even analysis for a strategy.
|
||||||
|
Break-even occurs when total value exceeds initial investment,
|
||||||
|
accounting for the erosion effects on the portfolio.
|
||||||
|
"""
|
||||||
|
|
||||||
|
break_even_month = None
|
||||||
|
profit_at_break_even = 0.0
|
||||||
|
max_total_return = -float('inf')
|
||||||
|
|
||||||
|
logger.info(f"=== Break-Even Analysis for {strategy_name} ===")
|
||||||
|
logger.info(f"Initial Investment: ${initial_investment:,.2f}")
|
||||||
|
|
||||||
|
for month_data in monthly_data:
|
||||||
|
total_value = value_extractor(month_data)
|
||||||
|
total_return_pct = ((total_value - initial_investment) / initial_investment) * 100
|
||||||
|
max_total_return = max(max_total_return, total_return_pct)
|
||||||
|
|
||||||
|
logger.info(f"Month {month_data.month}: Total Value = ${total_value:,.2f}, Return = {total_return_pct:+.2f}%")
|
||||||
|
|
||||||
|
# Break-even when total value exceeds initial investment by a meaningful margin (>0.1%)
|
||||||
|
# This accounts for rounding errors and ensures true profitability
|
||||||
|
if total_return_pct > 0.1 and break_even_month is None:
|
||||||
|
break_even_month = month_data.month
|
||||||
|
profit_at_break_even = total_value - initial_investment
|
||||||
|
logger.info(f"Break-even achieved in month {break_even_month} with {total_return_pct:.2f}% return")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Additional validation: if max return is still negative, definitely no break-even
|
||||||
|
if max_total_return < 0:
|
||||||
|
logger.info(f"Maximum return achieved: {max_total_return:.2f}% - Never breaks even")
|
||||||
|
break_even_month = None
|
||||||
|
|
||||||
|
# Format break-even time
|
||||||
|
if break_even_month is None:
|
||||||
|
months_to_break_even = "Never (within simulation period)"
|
||||||
|
logger.info(f"Result: No break-even within {len(monthly_data)} months")
|
||||||
|
else:
|
||||||
|
years = break_even_month // 12
|
||||||
|
months = break_even_month % 12
|
||||||
|
if years > 0:
|
||||||
|
months_to_break_even = f"{years} year(s) and {months} month(s)"
|
||||||
|
else:
|
||||||
|
months_to_break_even = f"{months} month(s)"
|
||||||
|
logger.info(f"Result: Break-even in {months_to_break_even}")
|
||||||
|
|
||||||
|
logger.info(f"=== End Break-Even Analysis for {strategy_name} ===")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'strategy_name': strategy_name,
|
||||||
|
'break_even_month': break_even_month,
|
||||||
|
'profit_at_break_even': profit_at_break_even,
|
||||||
|
'months_to_break_even': months_to_break_even,
|
||||||
|
'initial_investment': initial_investment,
|
||||||
|
'max_return_pct': max_total_return
|
||||||
|
}
|
||||||
|
|
||||||
|
def _print_strategy_comparison(
|
||||||
|
self,
|
||||||
|
drip_result: DripResult,
|
||||||
|
no_drip_result: Any, # NoDRIPResult
|
||||||
|
initial_investment: float,
|
||||||
|
winner: str,
|
||||||
|
advantage_amount: float,
|
||||||
|
advantage_percentage: float,
|
||||||
|
drip_break_even: Dict[str, Any],
|
||||||
|
no_drip_break_even: Dict[str, Any]
|
||||||
|
) -> None:
|
||||||
|
"""Print detailed strategy comparison table"""
|
||||||
|
|
||||||
|
print("\n" + "="*100)
|
||||||
|
print("DRIP vs No-DRIP STRATEGY COMPARISON")
|
||||||
|
print("="*100)
|
||||||
|
|
||||||
|
print(f"{'Metric':<35} {'DRIP Strategy':<25} {'No-DRIP Strategy':<25}")
|
||||||
|
print("-"*100)
|
||||||
|
|
||||||
|
print(f"{'Initial Investment':<35} ${initial_investment:<24,.2f} ${initial_investment:<24,.2f}")
|
||||||
|
print(f"{'Final Portfolio Value':<35} ${drip_result.final_portfolio_value:<24,.2f} ${no_drip_result.final_portfolio_value:<24,.2f}")
|
||||||
|
print(f"{'Total Cash Income':<35} ${drip_result.total_income:<24,.2f} ${no_drip_result.total_cash_income:<24,.2f}")
|
||||||
|
print(f"{'Total Final Value':<35} ${drip_result.final_portfolio_value:<24,.2f} ${no_drip_result.total_value:<24,.2f}")
|
||||||
|
|
||||||
|
drip_return = ((drip_result.final_portfolio_value / initial_investment) - 1) * 100
|
||||||
|
no_drip_return = ((no_drip_result.total_value / initial_investment) - 1) * 100
|
||||||
|
|
||||||
|
print(f"{'Total Return %':<35} {drip_return:<24.1f}% {no_drip_return:<24.1f}%")
|
||||||
|
|
||||||
|
# Break-even analysis
|
||||||
|
print(f"{'Break-even Time':<35} {drip_break_even['months_to_break_even']:<25} {no_drip_break_even['months_to_break_even']:<25}")
|
||||||
|
|
||||||
|
print("-"*100)
|
||||||
|
print(f"WINNER: {winner}")
|
||||||
|
if winner != "Tie":
|
||||||
|
print(f"ADVANTAGE: ${advantage_amount:,.2f} ({advantage_percentage:.1f}%)")
|
||||||
|
|
||||||
|
# Investment recommendation
|
||||||
|
recommendation = self._generate_investment_recommendation(winner, advantage_percentage)
|
||||||
|
print(f"RECOMMENDATION: {recommendation}")
|
||||||
|
print("="*100)
|
||||||
|
|
||||||
|
def _generate_investment_recommendation(self, winner: str, advantage_percentage: float) -> str:
|
||||||
|
"""Generate investment recommendation based on comparison results"""
|
||||||
|
|
||||||
|
if winner == "Tie":
|
||||||
|
return "Both strategies perform equally. Choose based on your liquidity needs."
|
||||||
|
|
||||||
|
if advantage_percentage < 1.0:
|
||||||
|
return f"{winner} wins by a small margin ({advantage_percentage:.1f}%). Choose based on liquidity preferences."
|
||||||
|
elif advantage_percentage < 5.0:
|
||||||
|
return f"{winner} strategy is recommended with a {advantage_percentage:.1f}% advantage."
|
||||||
|
else:
|
||||||
|
return f"{winner} strategy is strongly recommended with a {advantage_percentage:.1f}% advantage."
|
||||||
|
|
||||||
|
def _generate_comparison_summary(
|
||||||
|
self,
|
||||||
|
drip_final: float,
|
||||||
|
no_drip_final: float,
|
||||||
|
initial_investment: float,
|
||||||
|
winner: str,
|
||||||
|
advantage_percentage: float
|
||||||
|
) -> str:
|
||||||
|
"""Generate comparison summary"""
|
||||||
|
|
||||||
|
drip_total_return = ((drip_final / initial_investment) - 1) * 100
|
||||||
|
no_drip_total_return = ((no_drip_final / initial_investment) - 1) * 100
|
||||||
|
|
||||||
|
summary = f"Initial Investment: ${initial_investment:,.2f}\n"
|
||||||
|
summary += f"DRIP Final Value: ${drip_final:,.2f} (Total Return: {drip_total_return:.1f}%)\n"
|
||||||
|
summary += f"No-DRIP Final Value: ${no_drip_final:,.2f} (Total Return: {no_drip_total_return:.1f}%)\n"
|
||||||
|
|
||||||
|
if winner != "Tie":
|
||||||
|
summary += f"Winner: {winner} strategy ({advantage_percentage:.1f}% advantage)"
|
||||||
|
else:
|
||||||
|
summary += "Result: Both strategies perform equally"
|
||||||
|
|
||||||
|
return summary
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user