ETF_Suite_Portal/services/drip_service/service.py

280 lines
12 KiB
Python

from typing import Dict, List, Optional, Tuple, Any
import pandas as pd
import logging
import traceback
from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult
# Configure logging
logger = logging.getLogger(__name__)
class DripService:
def __init__(self) -> None:
self.MAX_EROSION_LEVEL = 9
self.max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
self.dividend_frequency = {
"Monthly": 12,
"Quarterly": 4,
"Semi-Annually": 2,
"Annually": 1,
"Unknown": 12 # Default to monthly if unknown
}
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
"""
Calculate DRIP growth for a portfolio over a specified period.
Args:
portfolio_df: DataFrame containing portfolio allocation
config: DripConfig object with simulation parameters
Returns:
DripResult object containing the simulation results
"""
try:
# Initialize monthly data list
monthly_data: List[MonthlyData] = []
# Get initial values
initial_shares = self._calculate_initial_shares(portfolio_df)
initial_prices = dict(zip(portfolio_df["Ticker"], portfolio_df["Price"]))
initial_yields = dict(zip(portfolio_df["Ticker"], portfolio_df["Yield (%)"] / 100))
# Initialize tracking variables
current_shares = initial_shares.copy()
current_prices = initial_prices.copy()
current_yields = initial_yields.copy()
cumulative_income = 0.0
# Run simulation for each month
for month in range(1, config.months + 1):
# Calculate monthly income
monthly_income = sum(
(current_yields[ticker] / 12) *
(current_shares[ticker] * current_prices[ticker])
for ticker in current_shares.keys()
)
# Update cumulative income
cumulative_income += monthly_income
# Calculate total portfolio value
total_value = sum(
current_shares[ticker] * current_prices[ticker]
for ticker in current_shares.keys()
)
# Apply erosion if enabled
if config.erosion_type != "None":
current_prices, current_yields = self._apply_erosion(
current_prices,
current_yields,
config.erosion_type,
config.erosion_level
)
# Reinvest dividends
for ticker in current_shares.keys():
dividend_income = (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker])
new_shares = dividend_income / current_prices[ticker]
current_shares[ticker] += new_shares
# Store monthly data
monthly_data.append(MonthlyData(
month=month,
total_value=total_value,
monthly_income=monthly_income,
cumulative_income=cumulative_income,
shares=current_shares.copy(),
prices=current_prices.copy(),
yields=current_yields.copy()
))
# Calculate final values
final_portfolio_value = monthly_data[-1].total_value
total_income = monthly_data[-1].cumulative_income
total_shares = current_shares.copy()
return DripResult(
monthly_data=monthly_data,
final_portfolio_value=final_portfolio_value,
total_income=total_income,
total_shares=total_shares
)
except Exception as e:
logger.error(f"Error calculating DRIP growth: {str(e)}")
logger.error(traceback.format_exc())
raise
def _calculate_initial_shares(self, portfolio_df: pd.DataFrame) -> Dict[str, float]:
"""Calculate initial shares for each ETF."""
return dict(zip(portfolio_df["Ticker"], portfolio_df["Shares"]))
def _apply_erosion(
self,
prices: Dict[str, float],
yields: Dict[str, float],
erosion_type: str,
erosion_level: Dict[str, Any]
) -> Tuple[Dict[str, float], Dict[str, float]]:
"""
Apply erosion to prices and yields based on configuration.
Args:
prices: Dictionary of current prices
yields: Dictionary of current yields
erosion_type: Type of erosion to apply
erosion_level: Dictionary containing erosion levels
Returns:
Tuple of updated prices and yields
"""
try:
updated_prices = prices.copy()
updated_yields = yields.copy()
if erosion_type == "None":
return updated_prices, updated_yields
if erosion_level.get("use_per_ticker", False):
# Apply per-ticker erosion rates
for ticker in prices.keys():
if ticker in erosion_level:
ticker_erosion = erosion_level[ticker]
# Apply monthly erosion
nav_erosion = (ticker_erosion["nav"] / 9) * 0.1754
yield_erosion = (ticker_erosion["yield"] / 9) * 0.1754
updated_prices[ticker] *= (1 - nav_erosion)
updated_yields[ticker] *= (1 - yield_erosion)
else:
# Apply global erosion rates
nav_erosion = (erosion_level["global"]["nav"] / 9) * 0.1754
yield_erosion = (erosion_level["global"]["yield"] / 9) * 0.1754
for ticker in prices.keys():
updated_prices[ticker] *= (1 - nav_erosion)
updated_yields[ticker] *= (1 - yield_erosion)
return updated_prices, updated_yields
except Exception as e:
logger.error(f"Error applying erosion: {str(e)}")
logger.error(traceback.format_exc())
return prices, yields
def _initialize_erosion_rates(
self,
tickers: List[str],
erosion_type: str,
erosion_level: Dict[str, Any]
) -> Tuple[Dict[str, float], Dict[str, float]]:
"""Initialize erosion rates for each ticker based on configuration."""
ticker_nav_rates: Dict[str, float] = {}
ticker_yield_rates: Dict[str, float] = {}
if erosion_type != "None" and isinstance(erosion_level, dict):
if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level:
global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
for ticker in tickers:
ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0})
ticker_nav_rates[ticker] = ticker_settings["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
ticker_yield_rates[ticker] = ticker_settings["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
else:
global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
for ticker in tickers:
ticker_nav_rates[ticker] = global_nav
ticker_yield_rates[ticker] = global_yield
else:
for ticker in tickers:
ticker_nav_rates[ticker] = 0
ticker_yield_rates[ticker] = 0
return ticker_nav_rates, ticker_yield_rates
def _create_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, Dict[str, Any]]:
"""Create a dictionary of ticker-specific data."""
ticker_data: Dict[str, Dict[str, Any]] = {}
for _, row in portfolio_df.iterrows():
ticker = row["Ticker"]
ticker_data[ticker] = {
"price": row["Price"],
"yield_annual": row["Yield (%)"] / 100,
"initial_shares": row["Capital Allocated ($)"] / row["Price"],
"initial_allocation": row["Allocation (%)"] / 100,
"distribution": row.get("Distribution Period", "Monthly")
}
return ticker_data
def _calculate_monthly_income(
self,
current_shares: Dict[str, float],
current_prices: Dict[str, float],
current_yields: Dict[str, float],
tickers: List[str]
) -> float:
"""Calculate expected monthly income based on current portfolio and yields."""
return sum(
(current_yields[ticker] / 12) *
(current_shares[ticker] * current_prices[ticker])
for ticker in tickers
)
def _create_month_data(
self,
month: int,
current_total_value: float,
monthly_income: float,
cumulative_income: float,
current_shares: Dict[str, float],
current_prices: Dict[str, float],
current_yields: Dict[str, float],
tickers: List[str]
) -> MonthlyData:
"""Create monthly data object."""
return MonthlyData(
month=month,
total_value=current_total_value,
monthly_income=monthly_income,
cumulative_income=cumulative_income,
shares=current_shares.copy(),
prices=current_prices.copy(),
yields=current_yields.copy()
)
def _calculate_and_reinvest_dividends(
self,
month: int,
ticker_data: Dict[str, Dict[str, Any]],
current_shares: Dict[str, float],
current_prices: Dict[str, float],
current_yields: Dict[str, float],
ticker_yield_rates: Dict[str, float],
dividend_frequency: Dict[str, int]
) -> float:
"""Calculate dividends and reinvest them proportionally."""
month_dividends: Dict[str, float] = {}
for ticker, data in ticker_data.items():
freq = dividend_frequency[data["distribution"]]
if month % (12 / freq) == 0:
if ticker_yield_rates[ticker] > 0:
dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker]
else:
dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker]
else:
dividend = 0
month_dividends[ticker] = dividend
total_month_dividend = sum(month_dividends.values())
# Reinvest dividends proportionally
for ticker, data in ticker_data.items():
if current_prices[ticker] > 0:
new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker]
current_shares[ticker] += new_shares
return total_month_dividend