Compare commits
No commits in common. "edf2ce5e9cef6a609f62881d5affb3ddbb173070" and "78896085446d8ae0eff37e867a68e04179f6bb23" have entirely different histories.
edf2ce5e9c
...
7889608544
@ -209,12 +209,6 @@ class DataService:
|
|||||||
if hist.empty:
|
if hist.empty:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Get current price
|
|
||||||
current_price = info.get('regularMarketPrice', hist['Close'].iloc[-1])
|
|
||||||
|
|
||||||
# Get dividend yield
|
|
||||||
dividend_yield = info.get('dividendYield', 0) * 100 # Convert to percentage
|
|
||||||
|
|
||||||
# Get dividends with proper handling
|
# Get dividends with proper handling
|
||||||
try:
|
try:
|
||||||
dividends = yf_ticker.dividends
|
dividends = yf_ticker.dividends
|
||||||
@ -223,6 +217,7 @@ class DataService:
|
|||||||
dividend_rate = info.get('dividendRate', 0)
|
dividend_rate = info.get('dividendRate', 0)
|
||||||
if dividend_rate > 0:
|
if dividend_rate > 0:
|
||||||
# Create a synthetic dividend series
|
# Create a synthetic dividend series
|
||||||
|
last_price = hist['Close'].iloc[-1]
|
||||||
annual_dividend = dividend_rate
|
annual_dividend = dividend_rate
|
||||||
monthly_dividend = annual_dividend / 12
|
monthly_dividend = annual_dividend / 12
|
||||||
dividends = pd.Series(monthly_dividend, index=hist.index)
|
dividends = pd.Series(monthly_dividend, index=hist.index)
|
||||||
@ -253,49 +248,93 @@ class DataService:
|
|||||||
|
|
||||||
# Sharpe Ratio
|
# Sharpe Ratio
|
||||||
if volatility > 0:
|
if volatility > 0:
|
||||||
sharpe_ratio = (annual_return - risk_free_rate) / volatility
|
sharpe_ratio = np.sqrt(252) * excess_returns.mean() / volatility
|
||||||
else:
|
else:
|
||||||
sharpe_ratio = 0
|
sharpe_ratio = 0
|
||||||
|
|
||||||
# Sortino Ratio
|
# Sortino Ratio
|
||||||
downside_returns = returns[returns < 0]
|
negative_returns = returns[returns < 0]
|
||||||
if len(downside_returns) > 0:
|
if len(negative_returns) > 0:
|
||||||
downside_volatility = downside_returns.std() * np.sqrt(252)
|
downside_volatility = negative_returns.std() * np.sqrt(252)
|
||||||
if downside_volatility > 0:
|
if downside_volatility > 0:
|
||||||
sortino_ratio = (annual_return - risk_free_rate) / downside_volatility
|
sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_volatility
|
||||||
else:
|
else:
|
||||||
sortino_ratio = 0
|
sortino_ratio = 0
|
||||||
else:
|
else:
|
||||||
sortino_ratio = 0
|
sortino_ratio = 0
|
||||||
|
|
||||||
# Calculate dividend trend
|
# Calculate dividend trend with better handling
|
||||||
if not dividends.empty:
|
try:
|
||||||
dividend_trend = (dividends.iloc[-1] / dividends.iloc[0]) - 1 if dividends.iloc[0] > 0 else 0
|
if not dividends.empty:
|
||||||
else:
|
# Resample to monthly and handle missing values
|
||||||
dividend_trend = 0
|
monthly_div = dividends.resample('ME').sum().fillna(0)
|
||||||
|
if len(monthly_div) > 12:
|
||||||
|
# Calculate trailing 12-month dividends
|
||||||
|
earliest_ttm = monthly_div[-12:].sum()
|
||||||
|
latest_ttm = monthly_div[-1:].sum()
|
||||||
|
if earliest_ttm > 0:
|
||||||
|
dividend_trend = float((latest_ttm / earliest_ttm - 1))
|
||||||
|
else:
|
||||||
|
dividend_trend = 0.0
|
||||||
|
else:
|
||||||
|
# If less than 12 months of data, use the average
|
||||||
|
dividend_trend = float(monthly_div.mean()) if not monthly_div.empty else 0.0
|
||||||
|
else:
|
||||||
|
# Try to get dividend trend from info
|
||||||
|
dividend_rate = float(info.get('dividendRate', 0))
|
||||||
|
five_year_avg = float(info.get('fiveYearAvgDividendYield', 0))
|
||||||
|
if dividend_rate > 0 and five_year_avg > 0:
|
||||||
|
dividend_trend = float((dividend_rate / five_year_avg - 1))
|
||||||
|
else:
|
||||||
|
dividend_trend = 0.0
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error calculating dividend trend for {ticker}: {str(e)}")
|
||||||
|
dividend_trend = 0.0
|
||||||
|
|
||||||
|
# Ensure dividend_trend is a valid float
|
||||||
|
dividend_trend = float(dividend_trend) if dividend_trend is not None else 0.0
|
||||||
|
if not isinstance(dividend_trend, (int, float)) or pd.isna(dividend_trend):
|
||||||
|
dividend_trend = 0.0
|
||||||
|
|
||||||
# Calculate ETF age
|
# Calculate ETF age
|
||||||
if 'firstTradeDateEpochUtc' in info:
|
inception_date = info.get('fundInceptionDate')
|
||||||
age_years = (datetime.now() - datetime.fromtimestamp(info['firstTradeDateEpochUtc'])).days / 365.25
|
if inception_date:
|
||||||
|
try:
|
||||||
|
inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True)
|
||||||
|
age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25
|
||||||
|
except:
|
||||||
|
age_years = None
|
||||||
else:
|
else:
|
||||||
age_years = 0
|
age_years = None
|
||||||
|
|
||||||
# Return formatted data
|
# Ensure all values are valid numbers and properly formatted
|
||||||
return {
|
volatility = float(volatility) if volatility is not None else 0.0
|
||||||
'price': current_price,
|
max_drawdown = float(max_drawdown) if max_drawdown is not None else 0.0
|
||||||
'dividend_yield': dividend_yield,
|
sharpe_ratio = float(sharpe_ratio) if sharpe_ratio is not None else 0.0
|
||||||
|
sortino_ratio = float(sortino_ratio) if sortino_ratio is not None else 0.0
|
||||||
|
age_years = float(age_years) if age_years is not None else 0.0
|
||||||
|
|
||||||
|
# Format the response with proper types
|
||||||
|
response = {
|
||||||
|
'info': info,
|
||||||
|
'hist': hist.to_dict(),
|
||||||
|
'dividends': dividends.to_dict(),
|
||||||
'volatility': volatility,
|
'volatility': volatility,
|
||||||
'max_drawdown': max_drawdown,
|
'max_drawdown': max_drawdown,
|
||||||
'sharpe_ratio': sharpe_ratio,
|
'sharpe_ratio': sharpe_ratio,
|
||||||
'sortino_ratio': sortino_ratio,
|
'sortino_ratio': sortino_ratio,
|
||||||
'dividend_trend': dividend_trend,
|
'dividend_trend': dividend_trend,
|
||||||
'age_years': age_years,
|
'age_years': age_years,
|
||||||
'is_new': age_years < 2,
|
'is_new': age_years < 2
|
||||||
'info': info,
|
|
||||||
'hist': hist.to_dict('records'),
|
|
||||||
'dividends': dividends.to_dict()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Ensure all numeric values are properly formatted
|
||||||
|
for key in ['volatility', 'max_drawdown', 'sharpe_ratio', 'sortino_ratio', 'dividend_trend', 'age_years']:
|
||||||
|
if key in response:
|
||||||
|
response[key] = float(response[key])
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
|
logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|||||||
@ -1,16 +0,0 @@
|
|||||||
from .service import DRIPService
|
|
||||||
from .models import DRIPMetrics, DRIPForecastResult, DRIPPortfolioResult, DripConfig
|
|
||||||
from .exceptions import DRIPError, DataFetchError, CalculationError, ValidationError, CacheError
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
'DRIPService',
|
|
||||||
'DRIPMetrics',
|
|
||||||
'DRIPForecastResult',
|
|
||||||
'DRIPPortfolioResult',
|
|
||||||
'DRIPError',
|
|
||||||
'DataFetchError',
|
|
||||||
'CalculationError',
|
|
||||||
'ValidationError',
|
|
||||||
'CacheError',
|
|
||||||
'DripConfig'
|
|
||||||
]
|
|
||||||
@ -1,19 +0,0 @@
|
|||||||
class DRIPError(Exception):
|
|
||||||
"""Base exception for DRIP service errors"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class DataFetchError(DRIPError):
|
|
||||||
"""Raised when ETF data cannot be fetched"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class CalculationError(DRIPError):
|
|
||||||
"""Raised when DRIP calculations fail"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ValidationError(DRIPError):
|
|
||||||
"""Raised when input validation fails"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
class CacheError(DRIPError):
|
|
||||||
"""Raised when cache operations fail"""
|
|
||||||
pass
|
|
||||||
@ -1,35 +0,0 @@
|
|||||||
import logging
|
|
||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
def get_logger(name: str) -> logging.Logger:
|
|
||||||
"""Configure and return a logger for the DRIP service"""
|
|
||||||
logger = logging.getLogger(name)
|
|
||||||
|
|
||||||
if not logger.handlers:
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
# Create logs directory if it doesn't exist
|
|
||||||
log_dir = Path("logs")
|
|
||||||
log_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# File handler
|
|
||||||
file_handler = logging.FileHandler(log_dir / "drip_service.log")
|
|
||||||
file_handler.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
# Console handler
|
|
||||||
console_handler = logging.StreamHandler()
|
|
||||||
console_handler.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
# Formatter
|
|
||||||
formatter = logging.Formatter(
|
|
||||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
||||||
)
|
|
||||||
file_handler.setFormatter(formatter)
|
|
||||||
console_handler.setFormatter(formatter)
|
|
||||||
|
|
||||||
# Add handlers
|
|
||||||
logger.addHandler(file_handler)
|
|
||||||
logger.addHandler(console_handler)
|
|
||||||
|
|
||||||
return logger
|
|
||||||
@ -1,116 +0,0 @@
|
|||||||
from dataclasses import dataclass, asdict
|
|
||||||
from typing import Dict, List, Optional
|
|
||||||
from datetime import datetime
|
|
||||||
import json
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class MonthlyData:
|
|
||||||
"""Data for a single month in the DRIP simulation"""
|
|
||||||
month: int
|
|
||||||
total_value: float
|
|
||||||
monthly_income: float
|
|
||||||
cumulative_income: float
|
|
||||||
shares: Dict[str, float]
|
|
||||||
prices: Dict[str, float]
|
|
||||||
yields: Dict[str, float]
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DripConfig:
|
|
||||||
"""Configuration for DRIP calculations"""
|
|
||||||
months: int
|
|
||||||
erosion_type: str
|
|
||||||
erosion_level: Dict
|
|
||||||
dividend_frequency: Dict[str, int] = None
|
|
||||||
|
|
||||||
def __post_init__(self):
|
|
||||||
if self.dividend_frequency is None:
|
|
||||||
self.dividend_frequency = {
|
|
||||||
"Monthly": 12,
|
|
||||||
"Quarterly": 4,
|
|
||||||
"Semi-Annually": 2,
|
|
||||||
"Annually": 1,
|
|
||||||
"Unknown": 12 # Default to monthly if unknown
|
|
||||||
}
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DripResult:
|
|
||||||
"""Results of a DRIP calculation"""
|
|
||||||
monthly_data: List[MonthlyData]
|
|
||||||
final_portfolio_value: float
|
|
||||||
total_income: float
|
|
||||||
total_shares: Dict[str, float]
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DRIPMetrics:
|
|
||||||
"""Metrics for a single ETF's DRIP calculation"""
|
|
||||||
ticker: str
|
|
||||||
date: datetime
|
|
||||||
shares: float
|
|
||||||
price: float
|
|
||||||
dividend_yield: float
|
|
||||||
monthly_dividend: float
|
|
||||||
new_shares: float
|
|
||||||
portfolio_value: float
|
|
||||||
monthly_income: float
|
|
||||||
yield_on_cost: float
|
|
||||||
|
|
||||||
def to_dict(self) -> Dict:
|
|
||||||
"""Convert the metrics to a dictionary for JSON serialization"""
|
|
||||||
data = asdict(self)
|
|
||||||
data['date'] = self.date.isoformat()
|
|
||||||
return data
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, data: Dict) -> 'DRIPMetrics':
|
|
||||||
"""Create a DRIPMetrics instance from a dictionary"""
|
|
||||||
data = data.copy()
|
|
||||||
data['date'] = datetime.fromisoformat(data['date'])
|
|
||||||
return cls(**data)
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DRIPForecastResult:
|
|
||||||
"""Results of a DRIP forecast for a single ETF"""
|
|
||||||
ticker: str
|
|
||||||
initial_shares: float
|
|
||||||
final_shares: float
|
|
||||||
initial_value: float
|
|
||||||
final_value: float
|
|
||||||
total_income: float
|
|
||||||
average_yield: float
|
|
||||||
monthly_metrics: List[DRIPMetrics]
|
|
||||||
accumulated_cash: float = 0.0 # Added for No-DRIP scenarios
|
|
||||||
|
|
||||||
def to_dict(self) -> Dict:
|
|
||||||
"""Convert the forecast result to a dictionary for JSON serialization"""
|
|
||||||
data = asdict(self)
|
|
||||||
data['monthly_metrics'] = [m.to_dict() for m in self.monthly_metrics]
|
|
||||||
return data
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, data: Dict) -> 'DRIPForecastResult':
|
|
||||||
"""Create a DRIPForecastResult instance from a dictionary"""
|
|
||||||
data = data.copy()
|
|
||||||
data['monthly_metrics'] = [DRIPMetrics.from_dict(m) for m in data['monthly_metrics']]
|
|
||||||
return cls(**data)
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DRIPPortfolioResult:
|
|
||||||
"""Results of a DRIP forecast for an entire portfolio"""
|
|
||||||
total_value: float
|
|
||||||
monthly_income: float
|
|
||||||
total_income: float
|
|
||||||
etf_results: Dict[str, DRIPForecastResult]
|
|
||||||
accumulated_cash: float = 0.0 # Added for No-DRIP scenarios
|
|
||||||
|
|
||||||
def to_dict(self) -> Dict:
|
|
||||||
"""Convert the portfolio result to a dictionary for JSON serialization"""
|
|
||||||
data = asdict(self)
|
|
||||||
data['etf_results'] = {k: v.to_dict() for k, v in self.etf_results.items()}
|
|
||||||
return data
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, data: Dict) -> 'DRIPPortfolioResult':
|
|
||||||
"""Create a DRIPPortfolioResult instance from a dictionary"""
|
|
||||||
data = data.copy()
|
|
||||||
data['etf_results'] = {k: DRIPForecastResult.from_dict(v) for k, v in data['etf_results'].items()}
|
|
||||||
return cls(**data)
|
|
||||||
@ -1,470 +0,0 @@
|
|||||||
from typing import Dict, List, Optional, Tuple, Any
|
|
||||||
import pandas as pd
|
|
||||||
import numpy as np
|
|
||||||
import logging
|
|
||||||
import traceback
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from enum import Enum
|
|
||||||
from .models import (
|
|
||||||
MonthlyData,
|
|
||||||
DripConfig,
|
|
||||||
DripResult,
|
|
||||||
DRIPMetrics,
|
|
||||||
DRIPForecastResult,
|
|
||||||
DRIPPortfolioResult
|
|
||||||
)
|
|
||||||
from ..nav_erosion_service import NavErosionService
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
__all__ = ['DRIPService', 'DistributionFrequency', 'TickerData', 'ErosionConfig']
|
|
||||||
|
|
||||||
class DistributionFrequency(Enum):
|
|
||||||
"""Enum for distribution frequencies"""
|
|
||||||
MONTHLY = ("Monthly", 12)
|
|
||||||
QUARTERLY = ("Quarterly", 4)
|
|
||||||
SEMI_ANNUALLY = ("Semi-Annually", 2)
|
|
||||||
ANNUALLY = ("Annually", 1)
|
|
||||||
UNKNOWN = ("Unknown", 12)
|
|
||||||
|
|
||||||
def __init__(self, name: str, payments_per_year: int):
|
|
||||||
self.display_name = name
|
|
||||||
self.payments_per_year = payments_per_year
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TickerData:
|
|
||||||
"""Data structure for individual ticker information"""
|
|
||||||
ticker: str
|
|
||||||
price: float
|
|
||||||
annual_yield: float
|
|
||||||
shares: float
|
|
||||||
allocation_pct: float
|
|
||||||
distribution_freq: DistributionFrequency
|
|
||||||
|
|
||||||
@property
|
|
||||||
def market_value(self) -> float:
|
|
||||||
return self.price * self.shares
|
|
||||||
|
|
||||||
@property
|
|
||||||
def monthly_yield(self) -> float:
|
|
||||||
return self.annual_yield / 12
|
|
||||||
|
|
||||||
@property
|
|
||||||
def distribution_yield(self) -> float:
|
|
||||||
return self.annual_yield / self.distribution_freq.payments_per_year
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ErosionConfig:
|
|
||||||
"""Configuration for erosion calculations"""
|
|
||||||
erosion_type: str
|
|
||||||
use_per_ticker: bool = False
|
|
||||||
global_nav_rate: float = 0.0
|
|
||||||
global_yield_rate: float = 0.0
|
|
||||||
per_ticker_rates: Dict[str, Dict[str, float]] = field(default_factory=dict)
|
|
||||||
|
|
||||||
class DRIPService:
|
|
||||||
"""Enhanced DRIP calculation service with improved performance and accuracy"""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.MAX_EROSION_LEVEL = 9
|
|
||||||
self.MAX_MONTHLY_EROSION = 0.05 # 5% monthly max erosion
|
|
||||||
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
|
||||||
|
|
||||||
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
|
||||||
"""
|
|
||||||
Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
portfolio_df: DataFrame containing portfolio allocation
|
|
||||||
config: DripConfig object with simulation parameters
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DripResult object containing the simulation results
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Validate inputs
|
|
||||||
self._validate_inputs(portfolio_df, config)
|
|
||||||
|
|
||||||
# Initialize portfolio data
|
|
||||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
|
||||||
erosion_config = self._parse_erosion_config(config)
|
|
||||||
|
|
||||||
# Pre-calculate distribution schedule for performance
|
|
||||||
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
|
|
||||||
|
|
||||||
# Initialize simulation state
|
|
||||||
simulation_state = self._initialize_simulation_state(ticker_data)
|
|
||||||
monthly_data: List[MonthlyData] = []
|
|
||||||
|
|
||||||
# Run monthly simulation
|
|
||||||
for month in range(1, config.months + 1):
|
|
||||||
# Calculate monthly income from distributions
|
|
||||||
monthly_income = self._calculate_monthly_distributions(
|
|
||||||
month, simulation_state, ticker_data, distribution_schedule
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update cumulative income
|
|
||||||
simulation_state['cumulative_income'] += monthly_income
|
|
||||||
|
|
||||||
# Apply erosion to prices and yields
|
|
||||||
if erosion_config.erosion_type != "None":
|
|
||||||
self._apply_monthly_erosion(simulation_state, erosion_config, ticker_data.keys())
|
|
||||||
|
|
||||||
# Reinvest dividends (DRIP)
|
|
||||||
self._reinvest_dividends(month, simulation_state, distribution_schedule)
|
|
||||||
|
|
||||||
# Calculate total portfolio value
|
|
||||||
total_value = sum(
|
|
||||||
simulation_state['current_shares'][ticker] * simulation_state['current_prices'][ticker]
|
|
||||||
for ticker in ticker_data.keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create monthly data
|
|
||||||
monthly_data.append(MonthlyData(
|
|
||||||
month=month,
|
|
||||||
total_value=total_value,
|
|
||||||
monthly_income=monthly_income,
|
|
||||||
cumulative_income=simulation_state['cumulative_income'],
|
|
||||||
shares=simulation_state['current_shares'].copy(),
|
|
||||||
prices=simulation_state['current_prices'].copy(),
|
|
||||||
yields=simulation_state['current_yields'].copy()
|
|
||||||
))
|
|
||||||
|
|
||||||
# Calculate final results
|
|
||||||
return self._create_drip_result(monthly_data, simulation_state)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error calculating DRIP growth: {str(e)}")
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
raise
|
|
||||||
|
|
||||||
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
|
|
||||||
"""Validate input parameters"""
|
|
||||||
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
|
|
||||||
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
|
|
||||||
|
|
||||||
if missing_columns:
|
|
||||||
raise ValueError(f"Missing required columns: {missing_columns}")
|
|
||||||
|
|
||||||
if config.months <= 0:
|
|
||||||
raise ValueError("Months must be positive")
|
|
||||||
|
|
||||||
if portfolio_df.empty:
|
|
||||||
raise ValueError("Portfolio DataFrame is empty")
|
|
||||||
|
|
||||||
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
|
|
||||||
"""Initialize ticker data with validation"""
|
|
||||||
ticker_data = {}
|
|
||||||
|
|
||||||
for _, row in portfolio_df.iterrows():
|
|
||||||
ticker = row["Ticker"]
|
|
||||||
|
|
||||||
# Handle distribution frequency
|
|
||||||
dist_period = row.get("Distribution Period", "Monthly")
|
|
||||||
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
|
|
||||||
|
|
||||||
ticker_data[ticker] = TickerData(
|
|
||||||
ticker=ticker,
|
|
||||||
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
|
|
||||||
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
|
|
||||||
shares=max(0.0, float(row["Shares"])),
|
|
||||||
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
|
|
||||||
distribution_freq=dist_freq
|
|
||||||
)
|
|
||||||
|
|
||||||
return ticker_data
|
|
||||||
|
|
||||||
def _parse_erosion_config(self, config: DripConfig) -> ErosionConfig:
|
|
||||||
"""Parse and validate erosion configuration"""
|
|
||||||
if not hasattr(config, 'erosion_level') or config.erosion_type == "None":
|
|
||||||
return ErosionConfig(erosion_type="None")
|
|
||||||
|
|
||||||
erosion_level = config.erosion_level
|
|
||||||
|
|
||||||
if isinstance(erosion_level, dict):
|
|
||||||
return ErosionConfig(
|
|
||||||
erosion_type=config.erosion_type,
|
|
||||||
use_per_ticker=erosion_level.get("use_per_ticker", False),
|
|
||||||
global_nav_rate=self._normalize_erosion_rate(erosion_level.get("global", {}).get("nav", 0)),
|
|
||||||
global_yield_rate=self._normalize_erosion_rate(erosion_level.get("global", {}).get("yield", 0)),
|
|
||||||
per_ticker_rates=erosion_level.get("per_ticker", {})
|
|
||||||
)
|
|
||||||
|
|
||||||
return ErosionConfig(erosion_type="None")
|
|
||||||
|
|
||||||
def _normalize_erosion_rate(self, erosion_level: float) -> float:
|
|
||||||
"""Convert erosion level (0-9) to monthly rate with validation"""
|
|
||||||
rate = (erosion_level / self.MAX_EROSION_LEVEL) * self.MAX_MONTHLY_EROSION
|
|
||||||
return min(max(0.0, rate), self.MAX_MONTHLY_EROSION)
|
|
||||||
|
|
||||||
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
|
|
||||||
"""Pre-calculate which months each ticker pays distributions"""
|
|
||||||
schedule = {}
|
|
||||||
|
|
||||||
for ticker, data in ticker_data.items():
|
|
||||||
distribution_months = []
|
|
||||||
freq = data.distribution_freq
|
|
||||||
|
|
||||||
for month in range(1, total_months + 1):
|
|
||||||
if self._is_distribution_month(month, freq):
|
|
||||||
distribution_months.append(month)
|
|
||||||
|
|
||||||
schedule[ticker] = distribution_months
|
|
||||||
|
|
||||||
return schedule
|
|
||||||
|
|
||||||
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
|
|
||||||
"""Initialize simulation state variables"""
|
|
||||||
return {
|
|
||||||
'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()},
|
|
||||||
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
|
|
||||||
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
|
|
||||||
'cumulative_income': 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
def _calculate_monthly_distributions(
|
|
||||||
self,
|
|
||||||
month: int,
|
|
||||||
state: Dict[str, Any],
|
|
||||||
ticker_data: Dict[str, TickerData],
|
|
||||||
distribution_schedule: Dict[str, List[int]]
|
|
||||||
) -> float:
|
|
||||||
"""Calculate distributions for the current month"""
|
|
||||||
monthly_income = 0.0
|
|
||||||
|
|
||||||
for ticker, data in ticker_data.items():
|
|
||||||
if month in distribution_schedule[ticker]:
|
|
||||||
shares = state['current_shares'][ticker]
|
|
||||||
price = state['current_prices'][ticker]
|
|
||||||
yield_rate = state['current_yields'][ticker]
|
|
||||||
|
|
||||||
# Calculate distribution amount
|
|
||||||
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
|
|
||||||
distribution_amount = shares * price * distribution_yield
|
|
||||||
monthly_income += distribution_amount
|
|
||||||
|
|
||||||
return monthly_income
|
|
||||||
|
|
||||||
def _apply_monthly_erosion(
|
|
||||||
self,
|
|
||||||
state: Dict[str, Any],
|
|
||||||
erosion_config: ErosionConfig,
|
|
||||||
tickers: List[str]
|
|
||||||
) -> None:
|
|
||||||
"""Apply erosion to current prices and yields"""
|
|
||||||
|
|
||||||
for ticker in tickers:
|
|
||||||
if erosion_config.use_per_ticker and ticker in erosion_config.per_ticker_rates:
|
|
||||||
# Use per-ticker erosion rates
|
|
||||||
ticker_rates = erosion_config.per_ticker_rates[ticker]
|
|
||||||
nav_erosion = self._normalize_erosion_rate(ticker_rates.get("nav", 0))
|
|
||||||
yield_erosion = self._normalize_erosion_rate(ticker_rates.get("yield", 0))
|
|
||||||
else:
|
|
||||||
# Use global erosion rates
|
|
||||||
nav_erosion = erosion_config.global_nav_rate
|
|
||||||
yield_erosion = erosion_config.global_yield_rate
|
|
||||||
|
|
||||||
# Apply erosion with bounds checking
|
|
||||||
state['current_prices'][ticker] *= max(0.01, 1 - nav_erosion)
|
|
||||||
state['current_yields'][ticker] *= max(0.0, 1 - yield_erosion)
|
|
||||||
|
|
||||||
def _reinvest_dividends(
|
|
||||||
self,
|
|
||||||
month: int,
|
|
||||||
state: Dict[str, Any],
|
|
||||||
distribution_schedule: Dict[str, List[int]]
|
|
||||||
) -> None:
|
|
||||||
"""Reinvest dividends for tickers that distributed in this month"""
|
|
||||||
|
|
||||||
for ticker, distribution_months in distribution_schedule.items():
|
|
||||||
if month in distribution_months:
|
|
||||||
shares = state['current_shares'][ticker]
|
|
||||||
price = state['current_prices'][ticker]
|
|
||||||
yield_rate = state['current_yields'][ticker]
|
|
||||||
|
|
||||||
# Calculate dividend income
|
|
||||||
# Note: This uses the distribution frequency from the original ticker data
|
|
||||||
dividend_income = shares * price * yield_rate / 12 # Simplified monthly calculation
|
|
||||||
|
|
||||||
# Purchase additional shares
|
|
||||||
if price > 0:
|
|
||||||
new_shares = dividend_income / price
|
|
||||||
state['current_shares'][ticker] += new_shares
|
|
||||||
|
|
||||||
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
|
|
||||||
"""Check if current month is a distribution month"""
|
|
||||||
if frequency == DistributionFrequency.MONTHLY:
|
|
||||||
return True
|
|
||||||
elif frequency == DistributionFrequency.QUARTERLY:
|
|
||||||
return month % 3 == 0
|
|
||||||
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
|
|
||||||
return month % 6 == 0
|
|
||||||
elif frequency == DistributionFrequency.ANNUALLY:
|
|
||||||
return month % 12 == 0
|
|
||||||
else:
|
|
||||||
return True # Default to monthly for unknown
|
|
||||||
|
|
||||||
def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult:
|
|
||||||
"""Create final DRIP result object"""
|
|
||||||
if not monthly_data:
|
|
||||||
raise ValueError("No monthly data generated")
|
|
||||||
|
|
||||||
final_data = monthly_data[-1]
|
|
||||||
|
|
||||||
return DripResult(
|
|
||||||
monthly_data=monthly_data,
|
|
||||||
final_portfolio_value=final_data.total_value,
|
|
||||||
total_income=state['cumulative_income'],
|
|
||||||
total_shares=state['current_shares'].copy()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Utility methods for analysis and comparison
|
|
||||||
def calculate_drip_vs_no_drip_comparison(
|
|
||||||
self,
|
|
||||||
portfolio_df: pd.DataFrame,
|
|
||||||
config: DripConfig
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""Calculate comparison between DRIP and no-DRIP scenarios"""
|
|
||||||
|
|
||||||
# Calculate DRIP scenario
|
|
||||||
drip_result = self.calculate_drip_growth(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate no-DRIP scenario (dividends not reinvested)
|
|
||||||
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate comparison metrics
|
|
||||||
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
|
|
||||||
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
|
|
||||||
|
|
||||||
return {
|
|
||||||
'drip_final_value': drip_result.final_portfolio_value,
|
|
||||||
'no_drip_final_value': no_drip_result['final_value'],
|
|
||||||
'drip_advantage': drip_advantage,
|
|
||||||
'percentage_advantage': percentage_advantage,
|
|
||||||
'total_dividends_reinvested': drip_result.total_income,
|
|
||||||
'cash_dividends_no_drip': no_drip_result['total_dividends']
|
|
||||||
}
|
|
||||||
|
|
||||||
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
|
|
||||||
"""Calculate scenario where dividends are not reinvested"""
|
|
||||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
|
||||||
erosion_config = self._parse_erosion_config(config)
|
|
||||||
state = self._initialize_simulation_state(ticker_data)
|
|
||||||
|
|
||||||
total_dividends = 0.0
|
|
||||||
|
|
||||||
for month in range(1, config.months + 1):
|
|
||||||
# Calculate dividends but don't reinvest
|
|
||||||
monthly_dividends = self._calculate_monthly_distributions(
|
|
||||||
month, state, ticker_data,
|
|
||||||
self._create_distribution_schedule(ticker_data, config.months)
|
|
||||||
)
|
|
||||||
total_dividends += monthly_dividends
|
|
||||||
|
|
||||||
# Apply erosion
|
|
||||||
if erosion_config.erosion_type != "None":
|
|
||||||
self._apply_monthly_erosion(state, erosion_config, ticker_data.keys())
|
|
||||||
|
|
||||||
final_value = sum(
|
|
||||||
state['current_shares'][ticker] * state['current_prices'][ticker]
|
|
||||||
for ticker in ticker_data.keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'final_value': final_value,
|
|
||||||
'total_dividends': total_dividends
|
|
||||||
}
|
|
||||||
|
|
||||||
def forecast_portfolio(
|
|
||||||
self,
|
|
||||||
portfolio_df: pd.DataFrame,
|
|
||||||
config: DripConfig,
|
|
||||||
tickers: Optional[List[str]] = None
|
|
||||||
) -> DRIPPortfolioResult:
|
|
||||||
"""
|
|
||||||
Forecast DRIP growth for an entire portfolio.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
portfolio_df: DataFrame containing portfolio allocation with columns:
|
|
||||||
- Ticker: ETF ticker symbol
|
|
||||||
- Price: Current price
|
|
||||||
- Yield (%): Annual dividend yield
|
|
||||||
- Shares: Number of shares
|
|
||||||
- Allocation (%): Portfolio allocation percentage
|
|
||||||
config: DripConfig object with simulation parameters
|
|
||||||
tickers: Optional list of tickers to include in the forecast. If None, all tickers are included.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DRIPPortfolioResult object containing the forecast results
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Filter portfolio_df if tickers are specified
|
|
||||||
if tickers is not None:
|
|
||||||
portfolio_df = portfolio_df[portfolio_df['Ticker'].isin(tickers)].copy()
|
|
||||||
if portfolio_df.empty:
|
|
||||||
raise ValueError(f"No matching tickers found in portfolio: {tickers}")
|
|
||||||
|
|
||||||
# Calculate DRIP growth for the portfolio
|
|
||||||
result = self.calculate_drip_growth(portfolio_df, config)
|
|
||||||
|
|
||||||
# Convert the result to DRIPPortfolioResult format
|
|
||||||
etf_results = {}
|
|
||||||
for ticker in portfolio_df['Ticker'].unique():
|
|
||||||
ticker_data = portfolio_df[portfolio_df['Ticker'] == ticker].iloc[0]
|
|
||||||
initial_shares = float(ticker_data['Shares'])
|
|
||||||
initial_value = initial_shares * float(ticker_data['Price'])
|
|
||||||
|
|
||||||
# Get final values from the result
|
|
||||||
final_shares = result.total_shares.get(ticker, initial_shares)
|
|
||||||
final_value = final_shares * float(ticker_data['Price'])
|
|
||||||
|
|
||||||
# Calculate metrics
|
|
||||||
total_income = sum(
|
|
||||||
month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares']))
|
|
||||||
for month_data in result.monthly_data
|
|
||||||
)
|
|
||||||
|
|
||||||
average_yield = float(ticker_data['Yield (%)']) / 100
|
|
||||||
|
|
||||||
# Create monthly metrics
|
|
||||||
monthly_metrics = []
|
|
||||||
for month_data in result.monthly_data:
|
|
||||||
metrics = DRIPMetrics(
|
|
||||||
ticker=ticker,
|
|
||||||
date=pd.Timestamp.now() + pd.DateOffset(months=month_data.month),
|
|
||||||
shares=month_data.shares.get(ticker, initial_shares),
|
|
||||||
price=month_data.prices.get(ticker, float(ticker_data['Price'])),
|
|
||||||
dividend_yield=month_data.yields.get(ticker, average_yield),
|
|
||||||
monthly_dividend=month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares'])),
|
|
||||||
new_shares=month_data.shares.get(ticker, initial_shares) - initial_shares,
|
|
||||||
portfolio_value=month_data.total_value * (initial_shares / sum(portfolio_df['Shares'])),
|
|
||||||
monthly_income=month_data.monthly_income * (initial_shares / sum(portfolio_df['Shares'])),
|
|
||||||
yield_on_cost=average_yield
|
|
||||||
)
|
|
||||||
monthly_metrics.append(metrics)
|
|
||||||
|
|
||||||
# Create forecast result for this ETF
|
|
||||||
etf_results[ticker] = DRIPForecastResult(
|
|
||||||
ticker=ticker,
|
|
||||||
initial_shares=initial_shares,
|
|
||||||
final_shares=final_shares,
|
|
||||||
initial_value=initial_value,
|
|
||||||
final_value=final_value,
|
|
||||||
total_income=total_income,
|
|
||||||
average_yield=average_yield,
|
|
||||||
monthly_metrics=monthly_metrics
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create and return the portfolio result
|
|
||||||
return DRIPPortfolioResult(
|
|
||||||
total_value=result.final_portfolio_value,
|
|
||||||
monthly_income=result.monthly_data[-1].monthly_income,
|
|
||||||
total_income=result.total_income,
|
|
||||||
etf_results=etf_results
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error forecasting portfolio: {str(e)}")
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
raise
|
|
||||||
File diff suppressed because it is too large
Load Diff
@ -1,60 +1,27 @@
|
|||||||
from typing import Dict, List, Optional, Tuple, Any
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import numpy as np
|
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from enum import Enum
|
|
||||||
from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult
|
from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult
|
||||||
from ..nav_erosion_service import NavErosionService
|
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DistributionFrequency(Enum):
|
|
||||||
"""Enum for distribution frequencies"""
|
|
||||||
MONTHLY = ("Monthly", 12)
|
|
||||||
QUARTERLY = ("Quarterly", 4)
|
|
||||||
SEMI_ANNUALLY = ("Semi-Annually", 2)
|
|
||||||
ANNUALLY = ("Annually", 1)
|
|
||||||
UNKNOWN = ("Unknown", 12)
|
|
||||||
|
|
||||||
def __init__(self, name: str, payments_per_year: int):
|
|
||||||
self.display_name = name
|
|
||||||
self.payments_per_year = payments_per_year
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TickerData:
|
|
||||||
"""Data structure for individual ticker information"""
|
|
||||||
ticker: str
|
|
||||||
price: float
|
|
||||||
annual_yield: float
|
|
||||||
shares: float
|
|
||||||
allocation_pct: float
|
|
||||||
distribution_freq: DistributionFrequency
|
|
||||||
|
|
||||||
@property
|
|
||||||
def market_value(self) -> float:
|
|
||||||
return self.price * self.shares
|
|
||||||
|
|
||||||
@property
|
|
||||||
def monthly_yield(self) -> float:
|
|
||||||
return self.annual_yield / 12
|
|
||||||
|
|
||||||
@property
|
|
||||||
def distribution_yield(self) -> float:
|
|
||||||
return self.annual_yield / self.distribution_freq.payments_per_year
|
|
||||||
|
|
||||||
class DripService:
|
class DripService:
|
||||||
"""Enhanced DRIP calculation service with improved performance and accuracy"""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency}
|
self.MAX_EROSION_LEVEL = 9
|
||||||
self.nav_erosion_service = NavErosionService()
|
self.max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion
|
||||||
|
self.dividend_frequency = {
|
||||||
|
"Monthly": 12,
|
||||||
|
"Quarterly": 4,
|
||||||
|
"Semi-Annually": 2,
|
||||||
|
"Annually": 1,
|
||||||
|
"Unknown": 12 # Default to monthly if unknown
|
||||||
|
}
|
||||||
|
|
||||||
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult:
|
||||||
"""
|
"""
|
||||||
Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy.
|
Calculate DRIP growth for a portfolio over a specified period.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
portfolio_df: DataFrame containing portfolio allocation
|
portfolio_df: DataFrame containing portfolio allocation
|
||||||
@ -64,299 +31,250 @@ class DripService:
|
|||||||
DripResult object containing the simulation results
|
DripResult object containing the simulation results
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Validate inputs
|
# Initialize monthly data list
|
||||||
self._validate_inputs(portfolio_df, config)
|
|
||||||
|
|
||||||
# Get erosion data from nav_erosion_service
|
|
||||||
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
|
|
||||||
erosion_rates = {
|
|
||||||
result.ticker: {
|
|
||||||
"nav": result.estimated_nav_erosion / 100, # Convert to decimal
|
|
||||||
"yield": result.estimated_yield_erosion / 100 # Convert to decimal
|
|
||||||
}
|
|
||||||
for result in erosion_data.results
|
|
||||||
}
|
|
||||||
|
|
||||||
# Initialize portfolio data
|
|
||||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
|
||||||
|
|
||||||
# Pre-calculate distribution schedule for performance
|
|
||||||
distribution_schedule = self._create_distribution_schedule(ticker_data, config.months)
|
|
||||||
|
|
||||||
# Initialize simulation state
|
|
||||||
simulation_state = self._initialize_simulation_state(ticker_data)
|
|
||||||
monthly_data: List[MonthlyData] = []
|
monthly_data: List[MonthlyData] = []
|
||||||
|
|
||||||
# Run monthly simulation
|
# Get initial values
|
||||||
for month in range(1, config.months + 1):
|
initial_shares = self._calculate_initial_shares(portfolio_df)
|
||||||
month_result = self._simulate_month(
|
initial_prices = dict(zip(portfolio_df["Ticker"], portfolio_df["Price"]))
|
||||||
month,
|
initial_yields = dict(zip(portfolio_df["Ticker"], portfolio_df["Yield (%)"] / 100))
|
||||||
simulation_state,
|
|
||||||
ticker_data,
|
|
||||||
erosion_rates,
|
|
||||||
distribution_schedule
|
|
||||||
)
|
|
||||||
monthly_data.append(month_result)
|
|
||||||
|
|
||||||
# Calculate final results
|
# Initialize tracking variables
|
||||||
return self._create_drip_result(monthly_data, simulation_state)
|
current_shares = initial_shares.copy()
|
||||||
|
current_prices = initial_prices.copy()
|
||||||
|
current_yields = initial_yields.copy()
|
||||||
|
cumulative_income = 0.0
|
||||||
|
|
||||||
|
# Run simulation for each month
|
||||||
|
for month in range(1, config.months + 1):
|
||||||
|
# Calculate monthly income
|
||||||
|
monthly_income = sum(
|
||||||
|
(current_yields[ticker] / 12) *
|
||||||
|
(current_shares[ticker] * current_prices[ticker])
|
||||||
|
for ticker in current_shares.keys()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update cumulative income
|
||||||
|
cumulative_income += monthly_income
|
||||||
|
|
||||||
|
# Calculate total portfolio value
|
||||||
|
total_value = sum(
|
||||||
|
current_shares[ticker] * current_prices[ticker]
|
||||||
|
for ticker in current_shares.keys()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply erosion if enabled
|
||||||
|
if config.erosion_type != "None":
|
||||||
|
current_prices, current_yields = self._apply_erosion(
|
||||||
|
current_prices,
|
||||||
|
current_yields,
|
||||||
|
config.erosion_type,
|
||||||
|
config.erosion_level
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reinvest dividends
|
||||||
|
for ticker in current_shares.keys():
|
||||||
|
dividend_income = (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker])
|
||||||
|
new_shares = dividend_income / current_prices[ticker]
|
||||||
|
current_shares[ticker] += new_shares
|
||||||
|
|
||||||
|
# Store monthly data
|
||||||
|
monthly_data.append(MonthlyData(
|
||||||
|
month=month,
|
||||||
|
total_value=total_value,
|
||||||
|
monthly_income=monthly_income,
|
||||||
|
cumulative_income=cumulative_income,
|
||||||
|
shares=current_shares.copy(),
|
||||||
|
prices=current_prices.copy(),
|
||||||
|
yields=current_yields.copy()
|
||||||
|
))
|
||||||
|
|
||||||
|
# Calculate final values
|
||||||
|
final_portfolio_value = monthly_data[-1].total_value
|
||||||
|
total_income = monthly_data[-1].cumulative_income
|
||||||
|
total_shares = current_shares.copy()
|
||||||
|
|
||||||
|
return DripResult(
|
||||||
|
monthly_data=monthly_data,
|
||||||
|
final_portfolio_value=final_portfolio_value,
|
||||||
|
total_income=total_income,
|
||||||
|
total_shares=total_shares
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error calculating DRIP growth: {str(e)}")
|
logger.error(f"Error calculating DRIP growth: {str(e)}")
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None:
|
def _calculate_initial_shares(self, portfolio_df: pd.DataFrame) -> Dict[str, float]:
|
||||||
"""Validate input parameters"""
|
"""Calculate initial shares for each ETF."""
|
||||||
required_columns = ["Ticker", "Price", "Yield (%)", "Shares"]
|
return dict(zip(portfolio_df["Ticker"], portfolio_df["Shares"]))
|
||||||
missing_columns = [col for col in required_columns if col not in portfolio_df.columns]
|
|
||||||
|
|
||||||
if missing_columns:
|
def _apply_erosion(
|
||||||
raise ValueError(f"Missing required columns: {missing_columns}")
|
self,
|
||||||
|
prices: Dict[str, float],
|
||||||
|
yields: Dict[str, float],
|
||||||
|
erosion_type: str,
|
||||||
|
erosion_level: Dict[str, Any]
|
||||||
|
) -> Tuple[Dict[str, float], Dict[str, float]]:
|
||||||
|
"""
|
||||||
|
Apply erosion to prices and yields based on configuration.
|
||||||
|
|
||||||
if config.months <= 0:
|
Args:
|
||||||
raise ValueError("Months must be positive")
|
prices: Dictionary of current prices
|
||||||
|
yields: Dictionary of current yields
|
||||||
|
erosion_type: Type of erosion to apply
|
||||||
|
erosion_level: Dictionary containing erosion levels
|
||||||
|
|
||||||
if portfolio_df.empty:
|
Returns:
|
||||||
raise ValueError("Portfolio DataFrame is empty")
|
Tuple of updated prices and yields
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
updated_prices = prices.copy()
|
||||||
|
updated_yields = yields.copy()
|
||||||
|
|
||||||
def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]:
|
if erosion_type == "None":
|
||||||
"""Initialize ticker data with validation"""
|
return updated_prices, updated_yields
|
||||||
ticker_data = {}
|
|
||||||
|
|
||||||
|
if erosion_level.get("use_per_ticker", False):
|
||||||
|
# Apply per-ticker erosion rates
|
||||||
|
for ticker in prices.keys():
|
||||||
|
if ticker in erosion_level:
|
||||||
|
ticker_erosion = erosion_level[ticker]
|
||||||
|
# Apply monthly erosion
|
||||||
|
nav_erosion = (ticker_erosion["nav"] / 9) * 0.1754
|
||||||
|
yield_erosion = (ticker_erosion["yield"] / 9) * 0.1754
|
||||||
|
|
||||||
|
updated_prices[ticker] *= (1 - nav_erosion)
|
||||||
|
updated_yields[ticker] *= (1 - yield_erosion)
|
||||||
|
else:
|
||||||
|
# Apply global erosion rates
|
||||||
|
nav_erosion = (erosion_level["global"]["nav"] / 9) * 0.1754
|
||||||
|
yield_erosion = (erosion_level["global"]["yield"] / 9) * 0.1754
|
||||||
|
|
||||||
|
for ticker in prices.keys():
|
||||||
|
updated_prices[ticker] *= (1 - nav_erosion)
|
||||||
|
updated_yields[ticker] *= (1 - yield_erosion)
|
||||||
|
|
||||||
|
return updated_prices, updated_yields
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error applying erosion: {str(e)}")
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
return prices, yields
|
||||||
|
|
||||||
|
def _initialize_erosion_rates(
|
||||||
|
self,
|
||||||
|
tickers: List[str],
|
||||||
|
erosion_type: str,
|
||||||
|
erosion_level: Dict[str, Any]
|
||||||
|
) -> Tuple[Dict[str, float], Dict[str, float]]:
|
||||||
|
"""Initialize erosion rates for each ticker based on configuration."""
|
||||||
|
ticker_nav_rates: Dict[str, float] = {}
|
||||||
|
ticker_yield_rates: Dict[str, float] = {}
|
||||||
|
|
||||||
|
if erosion_type != "None" and isinstance(erosion_level, dict):
|
||||||
|
if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level:
|
||||||
|
global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
|
||||||
|
for ticker in tickers:
|
||||||
|
ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0})
|
||||||
|
ticker_nav_rates[ticker] = ticker_settings["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
ticker_yield_rates[ticker] = ticker_settings["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
else:
|
||||||
|
global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion
|
||||||
|
|
||||||
|
for ticker in tickers:
|
||||||
|
ticker_nav_rates[ticker] = global_nav
|
||||||
|
ticker_yield_rates[ticker] = global_yield
|
||||||
|
else:
|
||||||
|
for ticker in tickers:
|
||||||
|
ticker_nav_rates[ticker] = 0
|
||||||
|
ticker_yield_rates[ticker] = 0
|
||||||
|
|
||||||
|
return ticker_nav_rates, ticker_yield_rates
|
||||||
|
|
||||||
|
def _create_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, Dict[str, Any]]:
|
||||||
|
"""Create a dictionary of ticker-specific data."""
|
||||||
|
ticker_data: Dict[str, Dict[str, Any]] = {}
|
||||||
for _, row in portfolio_df.iterrows():
|
for _, row in portfolio_df.iterrows():
|
||||||
ticker = row["Ticker"]
|
ticker = row["Ticker"]
|
||||||
|
ticker_data[ticker] = {
|
||||||
# Handle distribution frequency
|
"price": row["Price"],
|
||||||
dist_period = row.get("Distribution Period", "Monthly")
|
"yield_annual": row["Yield (%)"] / 100,
|
||||||
dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY)
|
"initial_shares": row["Capital Allocated ($)"] / row["Price"],
|
||||||
|
"initial_allocation": row["Allocation (%)"] / 100,
|
||||||
ticker_data[ticker] = TickerData(
|
"distribution": row.get("Distribution Period", "Monthly")
|
||||||
ticker=ticker,
|
}
|
||||||
price=max(0.01, float(row["Price"])), # Prevent zero/negative prices
|
|
||||||
annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal
|
|
||||||
shares=max(0.0, float(row["Shares"])),
|
|
||||||
allocation_pct=float(row.get("Allocation (%)", 0) / 100),
|
|
||||||
distribution_freq=dist_freq
|
|
||||||
)
|
|
||||||
|
|
||||||
return ticker_data
|
return ticker_data
|
||||||
|
|
||||||
def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]:
|
def _calculate_monthly_income(
|
||||||
"""Pre-calculate which months each ticker pays distributions"""
|
|
||||||
schedule = {}
|
|
||||||
|
|
||||||
for ticker, data in ticker_data.items():
|
|
||||||
distribution_months = []
|
|
||||||
freq = data.distribution_freq
|
|
||||||
|
|
||||||
for month in range(1, total_months + 1):
|
|
||||||
if self._is_distribution_month(month, freq):
|
|
||||||
distribution_months.append(month)
|
|
||||||
|
|
||||||
schedule[ticker] = distribution_months
|
|
||||||
|
|
||||||
return schedule
|
|
||||||
|
|
||||||
def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]:
|
|
||||||
"""Initialize simulation state variables"""
|
|
||||||
return {
|
|
||||||
'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()},
|
|
||||||
'current_prices': {ticker: data.price for ticker, data in ticker_data.items()},
|
|
||||||
'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()},
|
|
||||||
'cumulative_income': 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
def _simulate_month(
|
|
||||||
self,
|
self,
|
||||||
month: int,
|
current_shares: Dict[str, float],
|
||||||
state: Dict[str, Any],
|
current_prices: Dict[str, float],
|
||||||
ticker_data: Dict[str, TickerData],
|
current_yields: Dict[str, float],
|
||||||
erosion_rates: Dict[str, Dict[str, float]],
|
tickers: List[str]
|
||||||
distribution_schedule: Dict[str, List[int]]
|
) -> float:
|
||||||
) -> MonthlyData:
|
"""Calculate expected monthly income based on current portfolio and yields."""
|
||||||
"""Simulate a single month with improved accuracy"""
|
return sum(
|
||||||
|
(current_yields[ticker] / 12) *
|
||||||
# Calculate monthly income from distributions
|
(current_shares[ticker] * current_prices[ticker])
|
||||||
monthly_income = self._calculate_monthly_distributions(
|
for ticker in tickers
|
||||||
month, state, ticker_data, distribution_schedule
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update cumulative income
|
def _create_month_data(
|
||||||
state['cumulative_income'] += monthly_income
|
self,
|
||||||
|
month: int,
|
||||||
# Apply erosion to prices and yields using nav_erosion_service data
|
current_total_value: float,
|
||||||
self._apply_monthly_erosion(state, erosion_rates)
|
monthly_income: float,
|
||||||
|
cumulative_income: float,
|
||||||
# Reinvest dividends (DRIP)
|
current_shares: Dict[str, float],
|
||||||
self._reinvest_dividends(month, state, distribution_schedule)
|
current_prices: Dict[str, float],
|
||||||
|
current_yields: Dict[str, float],
|
||||||
# Calculate total portfolio value with bounds checking
|
tickers: List[str]
|
||||||
total_value = 0.0
|
) -> MonthlyData:
|
||||||
for ticker in ticker_data.keys():
|
"""Create monthly data object."""
|
||||||
shares = state['current_shares'][ticker]
|
|
||||||
price = state['current_prices'][ticker]
|
|
||||||
if shares > 0 and price > 0:
|
|
||||||
total_value += shares * price
|
|
||||||
|
|
||||||
return MonthlyData(
|
return MonthlyData(
|
||||||
month=month,
|
month=month,
|
||||||
total_value=total_value,
|
total_value=current_total_value,
|
||||||
monthly_income=monthly_income,
|
monthly_income=monthly_income,
|
||||||
cumulative_income=state['cumulative_income'],
|
cumulative_income=cumulative_income,
|
||||||
shares=state['current_shares'].copy(),
|
shares=current_shares.copy(),
|
||||||
prices=state['current_prices'].copy(),
|
prices=current_prices.copy(),
|
||||||
yields=state['current_yields'].copy()
|
yields=current_yields.copy()
|
||||||
)
|
)
|
||||||
|
|
||||||
def _calculate_monthly_distributions(
|
def _calculate_and_reinvest_dividends(
|
||||||
self,
|
self,
|
||||||
month: int,
|
month: int,
|
||||||
state: Dict[str, Any],
|
ticker_data: Dict[str, Dict[str, Any]],
|
||||||
ticker_data: Dict[str, TickerData],
|
current_shares: Dict[str, float],
|
||||||
distribution_schedule: Dict[str, List[int]]
|
current_prices: Dict[str, float],
|
||||||
|
current_yields: Dict[str, float],
|
||||||
|
ticker_yield_rates: Dict[str, float],
|
||||||
|
dividend_frequency: Dict[str, int]
|
||||||
) -> float:
|
) -> float:
|
||||||
"""Calculate distributions for the current month"""
|
"""Calculate dividends and reinvest them proportionally."""
|
||||||
monthly_income = 0.0
|
month_dividends: Dict[str, float] = {}
|
||||||
|
|
||||||
for ticker, data in ticker_data.items():
|
for ticker, data in ticker_data.items():
|
||||||
if month in distribution_schedule[ticker]:
|
freq = dividend_frequency[data["distribution"]]
|
||||||
shares = state['current_shares'][ticker]
|
if month % (12 / freq) == 0:
|
||||||
price = state['current_prices'][ticker]
|
if ticker_yield_rates[ticker] > 0:
|
||||||
yield_rate = state['current_yields'][ticker]
|
dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker]
|
||||||
|
else:
|
||||||
|
dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker]
|
||||||
|
else:
|
||||||
|
dividend = 0
|
||||||
|
month_dividends[ticker] = dividend
|
||||||
|
|
||||||
# Calculate distribution amount using annual yield divided by payments per year
|
total_month_dividend = sum(month_dividends.values())
|
||||||
distribution_yield = yield_rate / data.distribution_freq.payments_per_year
|
|
||||||
distribution_amount = shares * price * distribution_yield
|
|
||||||
monthly_income += distribution_amount
|
|
||||||
|
|
||||||
return monthly_income
|
# Reinvest dividends proportionally
|
||||||
|
for ticker, data in ticker_data.items():
|
||||||
|
if current_prices[ticker] > 0:
|
||||||
|
new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker]
|
||||||
|
current_shares[ticker] += new_shares
|
||||||
|
|
||||||
def _apply_monthly_erosion(
|
return total_month_dividend
|
||||||
self,
|
|
||||||
state: Dict[str, Any],
|
|
||||||
erosion_rates: Dict[str, Dict[str, float]]
|
|
||||||
) -> None:
|
|
||||||
"""Apply erosion to current prices and yields using nav_erosion_service data"""
|
|
||||||
for ticker, rates in erosion_rates.items():
|
|
||||||
if ticker in state['current_prices']:
|
|
||||||
# Apply monthly erosion rates
|
|
||||||
monthly_nav_erosion = rates['nav'] / 12
|
|
||||||
monthly_yield_erosion = rates['yield'] / 12
|
|
||||||
|
|
||||||
# Apply erosion with bounds checking
|
|
||||||
state['current_prices'][ticker] = max(0.01, state['current_prices'][ticker] * (1 - monthly_nav_erosion))
|
|
||||||
state['current_yields'][ticker] = max(0.0, state['current_yields'][ticker] * (1 - monthly_yield_erosion))
|
|
||||||
|
|
||||||
def _reinvest_dividends(
|
|
||||||
self,
|
|
||||||
month: int,
|
|
||||||
state: Dict[str, Any],
|
|
||||||
distribution_schedule: Dict[str, List[int]]
|
|
||||||
) -> None:
|
|
||||||
"""Reinvest dividends for tickers that distributed in this month"""
|
|
||||||
|
|
||||||
for ticker, distribution_months in distribution_schedule.items():
|
|
||||||
if month in distribution_months:
|
|
||||||
shares = state['current_shares'][ticker]
|
|
||||||
price = state['current_prices'][ticker]
|
|
||||||
yield_rate = state['current_yields'][ticker]
|
|
||||||
|
|
||||||
# Calculate dividend income using the correct distribution frequency
|
|
||||||
freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY)
|
|
||||||
dividend_income = shares * price * (yield_rate / freq.payments_per_year)
|
|
||||||
|
|
||||||
# Purchase additional shares
|
|
||||||
if price > 0:
|
|
||||||
new_shares = dividend_income / price
|
|
||||||
state['current_shares'][ticker] += new_shares
|
|
||||||
|
|
||||||
def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool:
|
|
||||||
"""Check if current month is a distribution month"""
|
|
||||||
if frequency == DistributionFrequency.MONTHLY:
|
|
||||||
return True
|
|
||||||
elif frequency == DistributionFrequency.QUARTERLY:
|
|
||||||
return month % 3 == 0
|
|
||||||
elif frequency == DistributionFrequency.SEMI_ANNUALLY:
|
|
||||||
return month % 6 == 0
|
|
||||||
elif frequency == DistributionFrequency.ANNUALLY:
|
|
||||||
return month % 12 == 0
|
|
||||||
else:
|
|
||||||
return True # Default to monthly for unknown
|
|
||||||
|
|
||||||
def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult:
|
|
||||||
"""Create final DRIP result object"""
|
|
||||||
if not monthly_data:
|
|
||||||
raise ValueError("No monthly data generated")
|
|
||||||
|
|
||||||
final_data = monthly_data[-1]
|
|
||||||
|
|
||||||
return DripResult(
|
|
||||||
monthly_data=monthly_data,
|
|
||||||
final_portfolio_value=final_data.total_value,
|
|
||||||
total_income=final_data.cumulative_income,
|
|
||||||
total_shares=state['current_shares'].copy()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Utility methods for analysis and comparison
|
|
||||||
def calculate_drip_vs_no_drip_comparison(
|
|
||||||
self,
|
|
||||||
portfolio_df: pd.DataFrame,
|
|
||||||
config: DripConfig
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""Calculate comparison between DRIP and no-DRIP scenarios"""
|
|
||||||
|
|
||||||
# Calculate DRIP scenario
|
|
||||||
drip_result = self.calculate_drip_growth(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate no-DRIP scenario (dividends not reinvested)
|
|
||||||
no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config)
|
|
||||||
|
|
||||||
# Calculate comparison metrics
|
|
||||||
drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value']
|
|
||||||
percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100
|
|
||||||
|
|
||||||
return {
|
|
||||||
'drip_final_value': drip_result.final_portfolio_value,
|
|
||||||
'no_drip_final_value': no_drip_result['final_value'],
|
|
||||||
'drip_advantage': drip_advantage,
|
|
||||||
'percentage_advantage': percentage_advantage,
|
|
||||||
'total_dividends_reinvested': drip_result.total_income,
|
|
||||||
'cash_dividends_no_drip': no_drip_result['total_dividends']
|
|
||||||
}
|
|
||||||
|
|
||||||
def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]:
|
|
||||||
"""Calculate scenario where dividends are not reinvested"""
|
|
||||||
ticker_data = self._initialize_ticker_data(portfolio_df)
|
|
||||||
erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist())
|
|
||||||
erosion_rates = {
|
|
||||||
result.ticker: {
|
|
||||||
"nav": result.estimated_nav_erosion / 100,
|
|
||||||
"yield": result.estimated_yield_erosion / 100
|
|
||||||
}
|
|
||||||
for result in erosion_data.results
|
|
||||||
}
|
|
||||||
state = self._initialize_simulation_state(ticker_data)
|
|
||||||
|
|
||||||
total_dividends = 0.0
|
|
||||||
|
|
||||||
for month in range(1, config.months + 1):
|
|
||||||
# Calculate dividends but don't reinvest
|
|
||||||
monthly_dividends = self._calculate_monthly_distributions(
|
|
||||||
month, state, ticker_data,
|
|
||||||
self._create_distribution_schedule(ticker_data, config.months)
|
|
||||||
)
|
|
||||||
total_dividends += monthly_dividends
|
|
||||||
|
|
||||||
# Apply erosion
|
|
||||||
self._apply_monthly_erosion(state, erosion_rates)
|
|
||||||
|
|
||||||
final_value = sum(
|
|
||||||
state['current_shares'][ticker] * state['current_prices'][ticker]
|
|
||||||
for ticker in ticker_data.keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'final_value': final_value,
|
|
||||||
'total_dividends': total_dividends
|
|
||||||
}
|
|
||||||
@ -1,8 +0,0 @@
|
|||||||
"""
|
|
||||||
Nav Erosion Service package
|
|
||||||
"""
|
|
||||||
|
|
||||||
from .service import NavErosionService
|
|
||||||
from .models import NavErosionResult
|
|
||||||
|
|
||||||
__all__ = ['NavErosionService', 'NavErosionResult']
|
|
||||||
Loading…
Reference in New Issue
Block a user