From edf2ce5e9cef6a609f62881d5affb3ddbb173070 Mon Sep 17 00:00:00 2001 From: Pascal Date: Mon, 2 Jun 2025 16:16:57 +0200 Subject: [PATCH] feat: Add DRIP service support files and update related services - Add exceptions and logger - Update service models and implementations - Update portfolio builder integration --- ETF_Portal/services/data_service.py | 93 +- ETF_Portal/services/drip_service/__init__.py | 16 + .../services/drip_service/exceptions.py | 19 + ETF_Portal/services/drip_service/logger.py | 35 + ETF_Portal/services/drip_service/models.py | 116 ++ ETF_Portal/tests/__init__.py | 0 pages/ETF_Portfolio_Builder.py | 1189 +++++++---------- services/drip_service/service.py | 534 ++++---- services/nav_erosion_service/__init__.py | 8 + 9 files changed, 978 insertions(+), 1032 deletions(-) create mode 100644 ETF_Portal/services/drip_service/exceptions.py create mode 100644 ETF_Portal/services/drip_service/logger.py create mode 100644 ETF_Portal/tests/__init__.py create mode 100644 services/nav_erosion_service/__init__.py diff --git a/ETF_Portal/services/data_service.py b/ETF_Portal/services/data_service.py index d96a8f6..574775b 100644 --- a/ETF_Portal/services/data_service.py +++ b/ETF_Portal/services/data_service.py @@ -209,6 +209,12 @@ class DataService: if hist.empty: return None + # Get current price + current_price = info.get('regularMarketPrice', hist['Close'].iloc[-1]) + + # Get dividend yield + dividend_yield = info.get('dividendYield', 0) * 100 # Convert to percentage + # Get dividends with proper handling try: dividends = yf_ticker.dividends @@ -217,7 +223,6 @@ class DataService: dividend_rate = info.get('dividendRate', 0) if dividend_rate > 0: # Create a synthetic dividend series - last_price = hist['Close'].iloc[-1] annual_dividend = dividend_rate monthly_dividend = annual_dividend / 12 dividends = pd.Series(monthly_dividend, index=hist.index) @@ -248,93 +253,49 @@ class DataService: # Sharpe Ratio if volatility > 0: - sharpe_ratio = np.sqrt(252) * excess_returns.mean() / volatility + sharpe_ratio = (annual_return - risk_free_rate) / volatility else: sharpe_ratio = 0 # Sortino Ratio - negative_returns = returns[returns < 0] - if len(negative_returns) > 0: - downside_volatility = negative_returns.std() * np.sqrt(252) + downside_returns = returns[returns < 0] + if len(downside_returns) > 0: + downside_volatility = downside_returns.std() * np.sqrt(252) if downside_volatility > 0: - sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_volatility + sortino_ratio = (annual_return - risk_free_rate) / downside_volatility else: sortino_ratio = 0 else: sortino_ratio = 0 - # Calculate dividend trend with better handling - try: - if not dividends.empty: - # Resample to monthly and handle missing values - monthly_div = dividends.resample('ME').sum().fillna(0) - if len(monthly_div) > 12: - # Calculate trailing 12-month dividends - earliest_ttm = monthly_div[-12:].sum() - latest_ttm = monthly_div[-1:].sum() - if earliest_ttm > 0: - dividend_trend = float((latest_ttm / earliest_ttm - 1)) - else: - dividend_trend = 0.0 - else: - # If less than 12 months of data, use the average - dividend_trend = float(monthly_div.mean()) if not monthly_div.empty else 0.0 - else: - # Try to get dividend trend from info - dividend_rate = float(info.get('dividendRate', 0)) - five_year_avg = float(info.get('fiveYearAvgDividendYield', 0)) - if dividend_rate > 0 and five_year_avg > 0: - dividend_trend = float((dividend_rate / five_year_avg - 1)) - else: - dividend_trend = 0.0 - except Exception as e: - logger.warning(f"Error calculating dividend trend for {ticker}: {str(e)}") - dividend_trend = 0.0 - - # Ensure dividend_trend is a valid float - dividend_trend = float(dividend_trend) if dividend_trend is not None else 0.0 - if not isinstance(dividend_trend, (int, float)) or pd.isna(dividend_trend): - dividend_trend = 0.0 + # Calculate dividend trend + if not dividends.empty: + dividend_trend = (dividends.iloc[-1] / dividends.iloc[0]) - 1 if dividends.iloc[0] > 0 else 0 + else: + dividend_trend = 0 # Calculate ETF age - inception_date = info.get('fundInceptionDate') - if inception_date: - try: - inception_date_dt = pd.to_datetime(inception_date, unit='s', utc=True) - age_years = (pd.Timestamp.now(tz='UTC') - inception_date_dt).days / 365.25 - except: - age_years = None + if 'firstTradeDateEpochUtc' in info: + age_years = (datetime.now() - datetime.fromtimestamp(info['firstTradeDateEpochUtc'])).days / 365.25 else: - age_years = None + age_years = 0 - # Ensure all values are valid numbers and properly formatted - volatility = float(volatility) if volatility is not None else 0.0 - max_drawdown = float(max_drawdown) if max_drawdown is not None else 0.0 - sharpe_ratio = float(sharpe_ratio) if sharpe_ratio is not None else 0.0 - sortino_ratio = float(sortino_ratio) if sortino_ratio is not None else 0.0 - age_years = float(age_years) if age_years is not None else 0.0 - - # Format the response with proper types - response = { - 'info': info, - 'hist': hist.to_dict(), - 'dividends': dividends.to_dict(), + # Return formatted data + return { + 'price': current_price, + 'dividend_yield': dividend_yield, 'volatility': volatility, 'max_drawdown': max_drawdown, 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio, 'dividend_trend': dividend_trend, 'age_years': age_years, - 'is_new': age_years < 2 + 'is_new': age_years < 2, + 'info': info, + 'hist': hist.to_dict('records'), + 'dividends': dividends.to_dict() } - # Ensure all numeric values are properly formatted - for key in ['volatility', 'max_drawdown', 'sharpe_ratio', 'sortino_ratio', 'dividend_trend', 'age_years']: - if key in response: - response[key] = float(response[key]) - - return response - except Exception as e: logger.error(f"Error fetching yfinance data for {ticker}: {str(e)}") return None diff --git a/ETF_Portal/services/drip_service/__init__.py b/ETF_Portal/services/drip_service/__init__.py index e69de29..f2a1469 100644 --- a/ETF_Portal/services/drip_service/__init__.py +++ b/ETF_Portal/services/drip_service/__init__.py @@ -0,0 +1,16 @@ +from .service import DRIPService +from .models import DRIPMetrics, DRIPForecastResult, DRIPPortfolioResult, DripConfig +from .exceptions import DRIPError, DataFetchError, CalculationError, ValidationError, CacheError + +__all__ = [ + 'DRIPService', + 'DRIPMetrics', + 'DRIPForecastResult', + 'DRIPPortfolioResult', + 'DRIPError', + 'DataFetchError', + 'CalculationError', + 'ValidationError', + 'CacheError', + 'DripConfig' +] diff --git a/ETF_Portal/services/drip_service/exceptions.py b/ETF_Portal/services/drip_service/exceptions.py new file mode 100644 index 0000000..7bf8d43 --- /dev/null +++ b/ETF_Portal/services/drip_service/exceptions.py @@ -0,0 +1,19 @@ +class DRIPError(Exception): + """Base exception for DRIP service errors""" + pass + +class DataFetchError(DRIPError): + """Raised when ETF data cannot be fetched""" + pass + +class CalculationError(DRIPError): + """Raised when DRIP calculations fail""" + pass + +class ValidationError(DRIPError): + """Raised when input validation fails""" + pass + +class CacheError(DRIPError): + """Raised when cache operations fail""" + pass \ No newline at end of file diff --git a/ETF_Portal/services/drip_service/logger.py b/ETF_Portal/services/drip_service/logger.py new file mode 100644 index 0000000..971f1f4 --- /dev/null +++ b/ETF_Portal/services/drip_service/logger.py @@ -0,0 +1,35 @@ +import logging +import os +from pathlib import Path + +def get_logger(name: str) -> logging.Logger: + """Configure and return a logger for the DRIP service""" + logger = logging.getLogger(name) + + if not logger.handlers: + logger.setLevel(logging.INFO) + + # Create logs directory if it doesn't exist + log_dir = Path("logs") + log_dir.mkdir(parents=True, exist_ok=True) + + # File handler + file_handler = logging.FileHandler(log_dir / "drip_service.log") + file_handler.setLevel(logging.INFO) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + # Formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + # Add handlers + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger \ No newline at end of file diff --git a/ETF_Portal/services/drip_service/models.py b/ETF_Portal/services/drip_service/models.py index e69de29..2df4205 100644 --- a/ETF_Portal/services/drip_service/models.py +++ b/ETF_Portal/services/drip_service/models.py @@ -0,0 +1,116 @@ +from dataclasses import dataclass, asdict +from typing import Dict, List, Optional +from datetime import datetime +import json + +@dataclass +class MonthlyData: + """Data for a single month in the DRIP simulation""" + month: int + total_value: float + monthly_income: float + cumulative_income: float + shares: Dict[str, float] + prices: Dict[str, float] + yields: Dict[str, float] + +@dataclass +class DripConfig: + """Configuration for DRIP calculations""" + months: int + erosion_type: str + erosion_level: Dict + dividend_frequency: Dict[str, int] = None + + def __post_init__(self): + if self.dividend_frequency is None: + self.dividend_frequency = { + "Monthly": 12, + "Quarterly": 4, + "Semi-Annually": 2, + "Annually": 1, + "Unknown": 12 # Default to monthly if unknown + } + +@dataclass +class DripResult: + """Results of a DRIP calculation""" + monthly_data: List[MonthlyData] + final_portfolio_value: float + total_income: float + total_shares: Dict[str, float] + +@dataclass +class DRIPMetrics: + """Metrics for a single ETF's DRIP calculation""" + ticker: str + date: datetime + shares: float + price: float + dividend_yield: float + monthly_dividend: float + new_shares: float + portfolio_value: float + monthly_income: float + yield_on_cost: float + + def to_dict(self) -> Dict: + """Convert the metrics to a dictionary for JSON serialization""" + data = asdict(self) + data['date'] = self.date.isoformat() + return data + + @classmethod + def from_dict(cls, data: Dict) -> 'DRIPMetrics': + """Create a DRIPMetrics instance from a dictionary""" + data = data.copy() + data['date'] = datetime.fromisoformat(data['date']) + return cls(**data) + +@dataclass +class DRIPForecastResult: + """Results of a DRIP forecast for a single ETF""" + ticker: str + initial_shares: float + final_shares: float + initial_value: float + final_value: float + total_income: float + average_yield: float + monthly_metrics: List[DRIPMetrics] + accumulated_cash: float = 0.0 # Added for No-DRIP scenarios + + def to_dict(self) -> Dict: + """Convert the forecast result to a dictionary for JSON serialization""" + data = asdict(self) + data['monthly_metrics'] = [m.to_dict() for m in self.monthly_metrics] + return data + + @classmethod + def from_dict(cls, data: Dict) -> 'DRIPForecastResult': + """Create a DRIPForecastResult instance from a dictionary""" + data = data.copy() + data['monthly_metrics'] = [DRIPMetrics.from_dict(m) for m in data['monthly_metrics']] + return cls(**data) + +@dataclass +class DRIPPortfolioResult: + """Results of a DRIP forecast for an entire portfolio""" + total_value: float + monthly_income: float + total_income: float + etf_results: Dict[str, DRIPForecastResult] + accumulated_cash: float = 0.0 # Added for No-DRIP scenarios + + def to_dict(self) -> Dict: + """Convert the portfolio result to a dictionary for JSON serialization""" + data = asdict(self) + data['etf_results'] = {k: v.to_dict() for k, v in self.etf_results.items()} + return data + + @classmethod + def from_dict(cls, data: Dict) -> 'DRIPPortfolioResult': + """Create a DRIPPortfolioResult instance from a dictionary""" + data = data.copy() + data['etf_results'] = {k: DRIPForecastResult.from_dict(v) for k, v in data['etf_results'].items()} + return cls(**data) diff --git a/ETF_Portal/tests/__init__.py b/ETF_Portal/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pages/ETF_Portfolio_Builder.py b/pages/ETF_Portfolio_Builder.py index c1502f9..e1e1e29 100644 --- a/pages/ETF_Portfolio_Builder.py +++ b/pages/ETF_Portfolio_Builder.py @@ -20,6 +20,7 @@ import logging import traceback from dotenv import load_dotenv import re +from ETF_Portal.services.drip_service import DRIPService, DripConfig # Load environment variables load_dotenv(override=True) # Force reload of environment variables @@ -80,18 +81,63 @@ def test_fmp_data_fetching(): # High-yield ETFs reference data HIGH_YIELD_ETFS = { - "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"}, - "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"}, - "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"}, - "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"}, - "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"}, - "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"}, - "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"}, - "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, - "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, - "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"} + "MSTY": {"expected_yield": 125.0, "frequency": "Monthly"}, # 125% + "SMCY": {"expected_yield": 100.0, "frequency": "Monthly"}, # 100% + "TSLY": {"expected_yield": 85.0, "frequency": "Monthly"}, # 85% + "NVDY": {"expected_yield": 75.0, "frequency": "Monthly"}, # 75% + "ULTY": {"expected_yield": 70.0, "frequency": "Monthly"}, # 70% + "JEPQ": {"expected_yield": 9.5, "frequency": "Monthly"}, # 9.5% + "JEPI": {"expected_yield": 7.8, "frequency": "Monthly"}, # 7.8% + "XYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, # 12.0% + "QYLD": {"expected_yield": 12.0, "frequency": "Monthly"}, # 12.0% + "RYLD": {"expected_yield": 12.0, "frequency": "Monthly"} # 12.0% } +def calculate_erosion_risk(yield_pct: float) -> Dict[str, float]: + """ + Calculate erosion risk based on yield percentage. + Higher yields have higher erosion risk. + + Args: + yield_pct: Yield percentage + + Returns: + Dictionary with NAV and yield erosion risk scores (0-9) + """ + # Base erosion risk calculation + if yield_pct >= 100: # Ultra high yield (100%+) + nav_risk = 9 + yield_risk = 9 + elif yield_pct >= 50: # Very high yield (50-100%) + nav_risk = 8 + yield_risk = 8 + elif yield_pct >= 25: # High yield (25-50%) + nav_risk = 7 + yield_risk = 7 + elif yield_pct >= 15: # Medium-high yield (15-25%) + nav_risk = 6 + yield_risk = 6 + elif yield_pct >= 10: # Medium yield (10-15%) + nav_risk = 5 + yield_risk = 5 + elif yield_pct >= 5: # Medium-low yield (5-10%) + nav_risk = 4 + yield_risk = 4 + elif yield_pct >= 3: # Low yield (3-5%) + nav_risk = 3 + yield_risk = 3 + elif yield_pct >= 1: # Very low yield (1-3%) + nav_risk = 2 + yield_risk = 2 + else: # Ultra low yield (<1%) + nav_risk = 1 + yield_risk = 1 + + return { + "nav_risk": nav_risk, + "yield_risk": yield_risk + } + def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: pd.DataFrame) -> Dict[str, Any]: """ Calculate ETF metrics based on available data. @@ -141,6 +187,10 @@ def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: # Calculate annual yield metrics["Yield (%)"] = (ttm_dividend / metrics["Price"]) * 100 + # For high-yield ETFs, use the expected yield if available + if ticker in HIGH_YIELD_ETFS: + metrics["Yield (%)"] = HIGH_YIELD_ETFS[ticker]["expected_yield"] + logger.info(f"Calculated yield for {ticker}: {metrics['Yield (%)']:.2f}% (TTM dividend: ${ttm_dividend:.2f}, Price: ${metrics['Price']:.2f})") else: logger.warning(f"No recent dividends found for {ticker}") @@ -172,6 +222,9 @@ def calculate_etf_metrics(ticker: str, price_data: pd.DataFrame, dividend_data: else: metrics["missing_metrics"].append("sortino_ratio") + # Calculate erosion risk based on yield + erosion_risk = calculate_erosion_risk(metrics["Yield (%)"]) + metrics["nav_erosion_risk"] = erosion_risk["nav_risk"] # Categorize risk based on available metrics metrics["Risk Level"] = categorize_etf_risk(metrics) @@ -1054,6 +1107,19 @@ def run_portfolio_simulation( st.error(f"Error running portfolio simulation: {str(e)}") return None +def format_large_number(value: float) -> str: + """Format large numbers with K, M, B, T suffixes.""" + if abs(value) >= 1e12: # Trillions + return f"${value/1e12:.2f}T" + elif abs(value) >= 1e9: # Billions + return f"${value/1e9:.2f}B" + elif abs(value) >= 1e6: # Millions + return f"${value/1e6:.2f}M" + elif abs(value) >= 1e3: # Thousands + return f"${value/1e3:.2f}K" + else: + return f"${value:,.2f}" + def portfolio_summary(final_alloc: pd.DataFrame) -> None: """ Display a summary of the portfolio allocation. @@ -1077,11 +1143,11 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None: col1, col2, col3 = st.columns(3) with col1: - st.metric("Total Capital", f"${total_capital:,.2f}") + st.metric("Total Capital", format_large_number(total_capital)) with col2: - st.metric("Annual Income", f"${total_income:,.2f}") - st.metric("Monthly Income", f"${total_income/12:,.2f}") + st.metric("Annual Income", format_large_number(total_income)) + st.metric("Monthly Income", format_large_number(total_income/12)) with col3: st.metric("Average Yield", f"{weighted_yield:.2f}%") @@ -1108,6 +1174,11 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None: display_df = final_alloc.copy() display_df["Monthly Income"] = display_df["Income Contributed ($)"] / 12 + # Format large numbers in the display DataFrame + display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(format_large_number) + display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(format_large_number) + display_df["Monthly Income"] = display_df["Monthly Income"].apply(format_large_number) + # Ensure data_source column exists and rename it for display if "data_source" in display_df.columns: display_df = display_df.rename(columns={"data_source": "Data Source"}) @@ -1134,10 +1205,7 @@ def portfolio_summary(final_alloc: pd.DataFrame) -> None: "Allocation (%)": "{:.2f}%", "Yield (%)": "{:.2f}%", "Price": "${:,.2f}", - "Shares": "{:,.4f}", - "Capital Allocated ($)": "${:,.2f}", - "Monthly Income": "${:,.2f}", - "Income Contributed ($)": "${:,.2f}" + "Shares": "{:,.4f}" }), column_config={ "Ticker": st.column_config.TextColumn("Ticker", disabled=True), @@ -1324,8 +1392,8 @@ def allocate_for_income(df: pd.DataFrame, target: float, etf_allocations: List[D logger.error("Weighted yield is zero") return None - # Calculate required capital based on weighted yield - required_capital = (annual_income / weighted_yield) * 100 + # Calculate required capital based on weighted yield (convert percentage to decimal) + required_capital = annual_income / (weighted_yield / 100) # Calculate capital allocation and income final_alloc["Capital Allocated ($)"] = (final_alloc["Allocation (%)"] / 100) * required_capital @@ -1969,6 +2037,203 @@ with st.sidebar: parallel_processing = st.checkbox("Enable Parallel Processing", value=True, help="Fetch data for multiple ETFs simultaneously") +def display_drip_forecast(portfolio_result, tickers): + """Display DRIP forecast results and charts.""" + try: + # Validate portfolio results + if not portfolio_result or not portfolio_result.etf_results: + st.error("No portfolio results available") + return + + # Display erosion table + st.subheader("ETF Erosion Analysis") + erosion_data = [] + + for ticker, etf_result in portfolio_result.etf_results.items(): + # Get erosion analysis for this ticker + from ETF_Portal.services.nav_erosion_service import NavErosionService + erosion_service = NavErosionService() + erosion_analysis = erosion_service.analyze_etf_erosion_risk([ticker]) + + if erosion_analysis and erosion_analysis.results: + result = erosion_analysis.results[0] + erosion_data.append({ + "Ticker": ticker, + "NAV Erosion (Annual %)": f"{result.estimated_nav_erosion:.2%}", + "Yield Erosion (Annual %)": f"{result.estimated_yield_erosion:.2%}" + }) + + if erosion_data: + st.dataframe( + pd.DataFrame(erosion_data), + use_container_width=True, + hide_index=True + ) + else: + st.warning("No erosion data available for the selected ETFs") + + # Display portfolio summary + st.subheader("Portfolio Summary") + + # Calculate total values + total_value = portfolio_result.total_value + total_income = portfolio_result.total_income + accumulated_cash = portfolio_result.accumulated_cash + + # Create columns for key metrics + col1, col2, col3 = st.columns(3) + + with col1: + st.metric( + "Portfolio Value", + f"${total_value:,.2f}", + f"${accumulated_cash:,.2f} cash" if accumulated_cash > 0 else None + ) + with col2: + st.metric( + "Monthly Income", + f"${portfolio_result.monthly_income:,.2f}" + ) + with col3: + st.metric( + "Total Income", + f"${total_income:,.2f}" + ) + + # Create comparison chart + st.subheader("DRIP vs No-DRIP Comparison") + + # Get DRIP and No-DRIP results + from ETF_Portal.services.drip_service import DRIPService + drip_service = DRIPService() + + # Calculate DRIP scenario + portfolio_result = drip_service.forecast_portfolio( + portfolio_df=final_alloc, + config=DripConfig( + months=12, + erosion_type=st.session_state.get("erosion_type", "None"), + erosion_level={ + "nav": st.session_state.get("erosion_level", {}).get("nav", 0), + "yield": st.session_state.get("erosion_level", {}).get("yield", 0) + } + ), + tickers=tickers + ) + + # Calculate No-DRIP scenario + nodrip_result = drip_service.forecast_portfolio( + portfolio_df=final_alloc, + config=DripConfig( + months=12, + erosion_type=st.session_state.get("erosion_type", "None"), + erosion_level={ + "nav": st.session_state.get("erosion_level", {}).get("nav", 0), + "yield": st.session_state.get("erosion_level", {}).get("yield", 0) + } + ), + tickers=tickers + ) + + # Create comparison data + comparison_data = { + "Strategy": ["DRIP", "No-DRIP"], + "Portfolio Value": [ + portfolio_result.total_value, + nodrip_result.total_value + ], + "Accumulated Cash": [ + 0, + nodrip_result.accumulated_cash + ], + "Total Value": [ + portfolio_result.total_value, + nodrip_result.total_value + nodrip_result.accumulated_cash + ] + } + + # Create comparison chart + fig = go.Figure() + + # Add DRIP bars + fig.add_trace(go.Bar( + name="DRIP", + x=["Portfolio Value"], + y=[portfolio_result.total_value], + marker_color="#1f77b4" + )) + + # Add No-DRIP bars + fig.add_trace(go.Bar( + name="No-DRIP Portfolio", + x=["Portfolio Value"], + y=[nodrip_result.total_value], + marker_color="#ff7f0e" + )) + fig.add_trace(go.Bar( + name="No-DRIP Cash", + x=["Portfolio Value"], + y=[nodrip_result.accumulated_cash], + marker_color="#2ca02c", + base=nodrip_result.total_value + )) + + fig.update_layout( + title="DRIP vs No-DRIP Comparison", + barmode="group", + template="plotly_dark", + showlegend=True, + yaxis_title="Value ($)" + ) + + st.plotly_chart(fig, use_container_width=True) + + # Display detailed comparison table + st.subheader("Detailed Comparison") + + comparison_df = pd.DataFrame({ + "Metric": [ + "Portfolio Value", + "Accumulated Cash", + "Total Value", + "Monthly Income", + "Total Income", + "Share Growth" + ], + "DRIP": [ + f"${portfolio_result.total_value:,.2f}", + "$0.00", + f"${portfolio_result.total_value:,.2f}", + f"${portfolio_result.monthly_income:,.2f}", + f"${portfolio_result.total_income:,.2f}", + f"{((portfolio_result.etf_results[tickers[0]].final_shares / portfolio_result.etf_results[tickers[0]].initial_shares - 1) * 100):.1f}%" + ], + "No-DRIP": [ + f"${nodrip_result.total_value:,.2f}", + f"${nodrip_result.accumulated_cash:,.2f}", + f"${nodrip_result.total_value + nodrip_result.accumulated_cash:,.2f}", + f"${nodrip_result.monthly_income:,.2f}", + f"${nodrip_result.total_income:,.2f}", + "0%" + ] + }) + + st.dataframe(comparison_df, use_container_width=True, hide_index=True) + + # Display assumptions + st.info(""" + **Assumptions:** + - DRIP: All dividends are reinvested to buy more shares + - No-DRIP: Dividends are taken as cash income + - Both strategies are affected by NAV & Yield erosion + - Portfolio value changes due to NAV erosion and share growth (DRIP) or cash accumulation (No-DRIP) + """) + + except Exception as e: + st.error(f"Error calculating DRIP forecast: {str(e)}") + logger.error(f"DRIP forecast error: {str(e)}") + logger.error(traceback.format_exc()) + # Display results and interactive allocation adjustment UI after simulation is run if st.session_state.simulation_run and st.session_state.df_data is not None: df = st.session_state.df_data @@ -1979,733 +2244,177 @@ if st.session_state.simulation_run and st.session_state.df_data is not None: st.error("No portfolio data available. Please run the simulation again.") st.session_state.simulation_run = False else: - # Verify required columns exist - required_columns = ["Capital Allocated ($)", "Yield (%)", "Price", "Ticker"] - missing_columns = [col for col in required_columns if col not in final_alloc.columns] + # Create tabs for better organization + tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"]) - if missing_columns: - st.error(f"Missing required columns in portfolio data: {', '.join(missing_columns)}") - st.session_state.simulation_run = False - else: - # Create tabs for better organization - tab1, tab2, tab3, tab4, tab5 = st.tabs(["📈 Portfolio Overview", "📊 DRIP Forecast", "📉 Erosion Risk Assessment", "🤖 AI Suggestions", "📊 ETF Details"]) + with tab1: + st.subheader("💰 Portfolio Summary") + portfolio_summary(final_alloc) - with tab1: - st.subheader("💰 Portfolio Summary") - portfolio_summary(final_alloc) - - # Display mode-specific information - if st.session_state.mode == "Income Target": - try: - monthly_target = st.session_state.target - ANNUAL_TARGET = monthly_target * 12 - total_capital = final_alloc["Capital Allocated ($)"].sum() - st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).") - except Exception as e: - st.error(f"Error displaying income target information: {str(e)}") - else: - try: - initial_capital = st.session_state.initial_capital - annual_income = final_alloc["Income Contributed ($)"].sum() - monthly_income = annual_income / 12 - st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).") - except Exception as e: - st.error(f"Error displaying capital investment information: {str(e)}") - - # Add save/load section - st.subheader("💾 Save/Load Portfolio") - - # Create two columns for save and load - save_col, load_col = st.columns(2) - - with save_col: - st.write("Save current portfolio") - portfolio_name = st.text_input("Portfolio Name", key="save_portfolio_name") - if st.button("Save Portfolio", key="save_portfolio"): - if portfolio_name: - if save_portfolio(portfolio_name, final_alloc, - st.session_state.mode, - st.session_state.target): - st.success(f"Portfolio '{portfolio_name}' saved successfully!") - else: - st.warning("Please enter a portfolio name.") - - with load_col: - st.write("Load saved portfolio") - if st.button("Show Saved Portfolios", key="show_portfolios"): - saved_portfolios = list_saved_portfolios() - if saved_portfolios: - selected_portfolio = st.selectbox("Select Portfolio", saved_portfolios, key="load_portfolio") - if st.button("Load Portfolio", key="load_portfolio_btn"): - loaded_df, loaded_mode, loaded_target = load_portfolio(selected_portfolio) - if loaded_df is not None: - st.session_state.final_alloc = loaded_df - st.session_state.mode = loaded_mode - st.session_state.target = loaded_target - st.success(f"Portfolio '{selected_portfolio}' loaded successfully!") - st.rerun() - else: - st.info("No saved portfolios found.") - - # Display full detailed allocation table - st.subheader("📊 Capital Allocation Details") - + # Display mode-specific information + if st.session_state.mode == "Income Target": try: - # Format currencies for better readability - display_df = final_alloc.copy() - # Calculate shares for each ETF - display_df["Shares"] = display_df["Capital Allocated ($)"] / display_df["Price"] - display_df["Price Per Share"] = display_df["Price"].apply(lambda x: f"${x:,.2f}") - display_df["Capital Allocated ($)"] = display_df["Capital Allocated ($)"].apply(lambda x: f"${x:,.2f}") - display_df["Income Contributed ($)"] = display_df["Income Contributed ($)"].apply(lambda x: f"${x:,.2f}") - display_df["Yield (%)"] = display_df["Yield (%)"].apply(lambda x: f"{x:.2f}%") - display_df["Shares"] = display_df["Shares"].apply(lambda x: f"{x:,.4f}") - - # Create a form for the allocation table - with st.form("allocation_form"): - # Create an editable DataFrame - edited_df = st.data_editor( - display_df[["Ticker", "Allocation (%)", "Yield (%)", "Price Per Share", "Risk Level"]], - column_config={ - "Ticker": st.column_config.TextColumn("Ticker", disabled=True), - "Allocation (%)": st.column_config.NumberColumn( - "Allocation (%)", - min_value=0.0, - max_value=100.0, - step=0.1, - format="%.1f", - required=True - ), - "Yield (%)": st.column_config.TextColumn("Yield (%)", disabled=True), - "Price Per Share": st.column_config.TextColumn("Price Per Share", disabled=True), - "Risk Level": st.column_config.TextColumn("Risk Level", disabled=True) - }, - hide_index=True, - use_container_width=True - ) - - # Calculate total allocation - total_alloc = edited_df["Allocation (%)"].sum() - - # Validate individual allocations - invalid_allocations = edited_df[ - (edited_df["Allocation (%)"] <= 0) | - (edited_df["Allocation (%)"] > 100) - ] - - if not invalid_allocations.empty: - for _, row in invalid_allocations.iterrows(): - st.error(f"Invalid allocation for {row['Ticker']}: must be between 0% and 100%") - - # Display total allocation with color coding - if abs(total_alloc - 100) <= 0.1: - st.metric("Total Allocation (%)", f"{total_alloc:.2f}", delta=None) - else: - st.metric("Total Allocation (%)", f"{total_alloc:.2f}", - delta=f"{total_alloc - 100:.2f}", - delta_color="off") - st.error("Total allocation must be exactly 100%") - - # Create columns for quick actions - col1, col2, col3 = st.columns(3) - - with col1: - equal_weight = st.form_submit_button("Equal Weight", use_container_width=True) - - with col2: - focus_income = st.form_submit_button("Focus on Income", use_container_width=True) - - with col3: - focus_capital = st.form_submit_button("Focus on Capital", use_container_width=True) - - # Submit button for manual edits - submitted = st.form_submit_button("Update Allocations", - disabled=abs(total_alloc - 100) > 0.1 or not invalid_allocations.empty, - type="primary", - use_container_width=True) - - # Handle form submission - if submitted: - try: - # Convert the edited allocations to a dictionary - new_allocations = {row["Ticker"]: float(row["Allocation (%)"]) for _, row in edited_df.iterrows()} - - # Convert to the format expected by allocation functions - etf_allocations = [{"ticker": ticker, "allocation": alloc} for ticker, alloc in new_allocations.items()] - - # Get the mode and target from session state - mode = st.session_state.mode - target = st.session_state.target - initial_capital = st.session_state.initial_capital - - # Use the same allocation functions as the main navigation - if mode == "Income Target": - final_alloc = allocate_for_income(df, target, etf_allocations) - else: # Capital Target - final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) - - if final_alloc is not None: - st.session_state.final_alloc = final_alloc - st.success("Portfolio updated with new allocations!") - st.rerun() - else: - st.error("Failed to update portfolio. Please try again.") - except Exception as e: - st.error(f"Error updating allocations: {str(e)}") - - # Handle quick actions - if equal_weight: - try: - # Calculate equal weight allocation - num_etfs = len(edited_df) - equal_allocation = 100 / num_etfs - - # Create new allocations in the format expected by allocation functions - etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] - - # Get the mode and target from session state - mode = st.session_state.mode - target = st.session_state.target - initial_capital = st.session_state.initial_capital - - # Use the same allocation functions as the main navigation - if mode == "Income Target": - final_alloc = allocate_for_income(df, target, etf_allocations) - else: # Capital Target - final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) - - if final_alloc is not None: - st.session_state.final_alloc = final_alloc - st.success("Portfolio adjusted to equal weight!") - st.rerun() - except Exception as e: - st.error(f"Error applying equal weight: {str(e)}") - - elif focus_income: - try: - # Sort by yield and adjust allocations - sorted_alloc = edited_df.sort_values("Yield (%)", ascending=False) - total_yield = sorted_alloc["Yield (%)"].str.rstrip('%').astype('float').sum() - - # Calculate new allocations based on yield - etf_allocations = [] - for _, row in sorted_alloc.iterrows(): - yield_val = float(row["Yield (%)"].rstrip('%')) - allocation = (yield_val / total_yield) * 100 - etf_allocations.append({"ticker": row["Ticker"], "allocation": allocation}) - - # Get the mode and target from session state - mode = st.session_state.mode - target = st.session_state.target - initial_capital = st.session_state.initial_capital - - # Use the same allocation functions as the main navigation - if mode == "Income Target": - final_alloc = allocate_for_income(df, target, etf_allocations) - else: # Capital Target - final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) - - if final_alloc is not None: - st.session_state.final_alloc = final_alloc - st.success("Portfolio adjusted to focus on income!") - st.rerun() - except Exception as e: - st.error(f"Error focusing on income: {str(e)}") - - elif focus_capital: - try: - # Calculate equal weight allocation (same as equal weight) - num_etfs = len(edited_df) - equal_allocation = 100 / num_etfs - - # Create new allocations in the format expected by allocation functions - etf_allocations = [{"ticker": row["Ticker"], "allocation": equal_allocation} for _, row in edited_df.iterrows()] - - # Get the mode and target from session state - mode = st.session_state.mode - target = st.session_state.target - initial_capital = st.session_state.initial_capital - - # Use the same allocation functions as the main navigation - if mode == "Income Target": - final_alloc = allocate_for_income(df, target, etf_allocations) - else: # Capital Target - final_alloc = allocate_for_capital(df, initial_capital, etf_allocations) - - if final_alloc is not None: - st.session_state.final_alloc = final_alloc - st.success("Portfolio adjusted to focus on capital!") - st.rerun() - except Exception as e: - st.error(f"Error focusing on capital: {str(e)}") + monthly_target = st.session_state.target + ANNUAL_TARGET = monthly_target * 12 + total_capital = final_alloc["Capital Allocated ($)"].sum() + st.info(f"🎯 **Income Target Mode**: You need ${total_capital:,.2f} to generate ${monthly_target:,.2f} in monthly income (${ANNUAL_TARGET:,.2f} annually).") except Exception as e: - st.error(f"Error displaying allocation details: {str(e)}") - logger.error(f"Error in allocation display: {str(e)}") - logger.error(traceback.format_exc()) - - with tab2: - st.subheader("📊 DRIP Forecast") - - # Get erosion settings from session state - erosion_type = st.session_state.get("erosion_type", "None") - erosion_level = st.session_state.get("erosion_level", 0) - - # Create DRIP configuration - from services.drip_service import DripConfig, DripService - - drip_config = DripConfig( - months=12, # Default to 12 months for initial view - erosion_type=erosion_type, - erosion_level=erosion_level - ) - - # Initialize DRIP service and calculate forecast - drip_service = DripService() - drip_result = drip_service.calculate_drip_growth(final_alloc, drip_config) - - # Display portfolio growth chart - st.subheader("Portfolio Growth") - fig1 = go.Figure() - fig1.add_trace(go.Scatter( - x=[data.month for data in drip_result.monthly_data], - y=[data.total_value for data in drip_result.monthly_data], - mode='lines', - name='Portfolio Value', - line=dict(color="#1f77b4", width=3) - )) - fig1.update_layout( - title="Portfolio Value Over Time", - xaxis_title="Month", - yaxis_title="Value ($)", - template="plotly_dark", - xaxis=dict(tickmode='linear', tick0=0, dtick=1) - ) - st.plotly_chart(fig1, use_container_width=True) - - # Display income growth chart - st.subheader("Income Growth") - fig2 = go.Figure() - fig2.add_trace(go.Scatter( - x=[data.month for data in drip_result.monthly_data], - y=[data.monthly_income for data in drip_result.monthly_data], - mode='lines', - name='Monthly Income', - line=dict(color="#ff7f0e", width=3) - )) - fig2.update_layout( - title="Monthly Income Over Time", - xaxis_title="Month", - yaxis_title="Monthly Income ($)", - template="plotly_dark", - xaxis=dict(tickmode='linear', tick0=0, dtick=1) - ) - st.plotly_chart(fig2, use_container_width=True) - - # Display detailed forecast table - st.subheader("DRIP Forecast Details") - - # Convert monthly data to DataFrame for display - forecast_data = [] - for data in drip_result.monthly_data: - row = { - "Month": data.month, - "Total Value ($)": data.total_value, - "Monthly Income ($)": data.monthly_income, - "Cumulative Income ($)": data.cumulative_income - } - # Add ticker-specific data - for ticker, shares in data.shares.items(): - row[f"{ticker} Shares"] = shares - row[f"{ticker} Price ($)"] = data.prices[ticker] - row[f"{ticker} Yield (%)"] = data.yields[ticker] * 100 - forecast_data.append(row) - - forecast_df = pd.DataFrame(forecast_data) - - # Format the data for display - display_forecast = forecast_df.copy() - display_forecast["Total Value ($)"] = display_forecast["Total Value ($)"].apply(lambda x: f"${x:,.2f}") - display_forecast["Monthly Income ($)"] = display_forecast["Monthly Income ($)"].apply(lambda x: f"${x:,.2f}") - display_forecast["Cumulative Income ($)"] = display_forecast["Cumulative Income ($)"].apply(lambda x: f"${x:,.2f}") - - # Format share counts and prices - share_columns = [col for col in display_forecast.columns if "Shares" in col] - price_columns = [col for col in display_forecast.columns if "Price" in col] - yield_columns = [col for col in display_forecast.columns if "Yield (%)" in col] - - for col in share_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.4f}") - - for col in price_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"${x:.2f}") - - for col in yield_columns: - display_forecast[col] = display_forecast[col].apply(lambda x: f"{x:.2f}%") - - # Create tabs for different views of the data - detail_tabs = st.tabs(["Summary View", "Full Details"]) - - with detail_tabs[0]: - st.dataframe(display_forecast[["Month", "Total Value ($)", "Monthly Income ($)", "Cumulative Income ($)"]], - use_container_width=True) - - with detail_tabs[1]: - # Group columns by ticker for better readability - tickers = list(drip_result.monthly_data[0].shares.keys()) - ticker_columns = {} - for ticker in tickers: - ticker_columns[ticker] = [ - f"{ticker} Shares", - f"{ticker} Price ($)", - f"{ticker} Yield (%)" - ] - - # Create ordered columns list - basic_columns = ["Month", "Total Value ($)", "Monthly Income ($)", "Cumulative Income ($)"] - ordered_columns = basic_columns.copy() - for ticker in tickers: - ordered_columns.extend(ticker_columns[ticker]) - - st.dataframe( - display_forecast[ordered_columns], - use_container_width=True, - height=500 - ) - - # Add comparison between DRIP and No-DRIP strategies - st.subheader("📊 1-Year DRIP vs. No-DRIP Comparison") - - # Add note about erosion effects if applicable - if erosion_type != "None" and isinstance(erosion_level, dict): - if erosion_level.get("use_per_ticker", False): - st.info(""" - This comparison factors in the custom per-ETF erosion rates. - Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. - """) - else: - nav_annual = (1 - (1 - (erosion_level["global"]["nav"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - yield_annual = (1 - (1 - (erosion_level["global"]["yield"] / MAX_EROSION_LEVEL) * max_monthly_erosion)**12) * 100 - st.info(f""" - This comparison factors in: - - NAV Erosion: {nav_annual:.1f}% annually - - Yield Erosion: {yield_annual:.1f}% annually - - Both strategies are affected by erosion, but DRIP helps mitigate losses by steadily acquiring more shares. - """) - - # Calculate no-drip scenario (taking dividends as income) - initial_value = drip_result.monthly_data[0].total_value - initial_monthly_income = drip_result.monthly_data[0].monthly_income - annual_income = initial_monthly_income * 12 - - # Initialize ticker data dictionary - ticker_data_dict = {} - for _, row in final_alloc.iterrows(): - ticker = row["Ticker"] - ticker_data_dict[ticker] = { - "price": row["Price"], - "yield_annual": row["Yield (%)"] / 100, # Convert from % to decimal - "distribution": row.get("Distribution Period", "Monthly") - } - - # Get the final prices after erosion from the last month of the DRIP forecast - final_prices = {} - for ticker in tickers: - price_col = f"{ticker} Price ($)" - if price_col in forecast_df.columns: - final_prices[ticker] = forecast_df[price_col].iloc[-1] - else: - # Fallback to initial price if column doesn't exist - final_prices[ticker] = ticker_data_dict[ticker]["price"] - - # Extract initial shares for each ETF from month 1 - initial_shares = {ticker: forecast_df.iloc[0][f"{ticker} Shares"] for ticker in tickers} - - # Calculate the No-DRIP final value by multiplying initial shares by final prices - # This correctly accounts for NAV erosion while keeping shares constant - nodrip_final_value = sum(initial_shares[ticker] * final_prices[ticker] for ticker in tickers) - - # The final income should account for erosion but not compounding growth - # This requires simulation of the erosion that would have happened - if erosion_type != "None" and isinstance(erosion_level, dict): - # Initialize the current prices and yields from the final_alloc dataframe - current_prices = {} - current_yields = {} - - # Reconstruct ticker data from final_alloc - for _, row in final_alloc.iterrows(): - ticker = row["Ticker"] - current_prices[ticker] = row["Price"] - current_yields[ticker] = row["Yield (%)"] / 100 - - # Get the erosion rates for each ticker - if erosion_level.get("use_per_ticker", False): - ticker_nav_rates = {} - ticker_yield_rates = {} - for ticker in tickers: - ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) - ticker_nav_rates[ticker] = ticker_settings["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - ticker_yield_rates[ticker] = ticker_settings["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - else: - # Use global rates for all tickers - global_nav = erosion_level["global"]["nav"] / MAX_EROSION_LEVEL * max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / MAX_EROSION_LEVEL * max_monthly_erosion - ticker_nav_rates = {ticker: global_nav for ticker in tickers} - ticker_yield_rates = {ticker: global_yield for ticker in tickers} - - # Apply 12 months of erosion - for month in range(1, 13): - # Apply erosion to each ticker - for ticker in tickers: - # Apply NAV erosion - if ticker_nav_rates[ticker] > 0: - current_prices[ticker] *= (1 - ticker_nav_rates[ticker]) - - # Apply yield erosion - if ticker_yield_rates[ticker] > 0: - current_yields[ticker] *= (1 - ticker_yield_rates[ticker]) - - # Calculate final monthly income with eroded prices and yields but original shares - final_monthly_income_nodrip = sum( - (current_yields[ticker] / 12) * - (initial_shares[ticker] * current_prices[ticker]) - for ticker in tickers - ) - else: - # No erosion, so final income is the same as initial income - final_monthly_income_nodrip = initial_monthly_income - - nodrip_final_annual_income = final_monthly_income_nodrip * 12 - - # Get values for DRIP scenario from forecast - drip_final_value = drip_result.final_portfolio_value - drip_final_monthly_income = drip_result.monthly_data[-1].monthly_income - drip_annual_income_end = drip_final_monthly_income * 12 - - # Create comparison dataframe with withdrawn income for a more complete financial picture - - # For No-DRIP strategy, calculate cumulative withdrawn income (sum of monthly dividends) - # This is equivalent to the cumulative income in the DRIP forecast, but in No-DRIP it's withdrawn - withdrawn_income = 0 - monthly_dividends = [] - - # Reconstruct the monthly dividend calculation for No-DRIP - current_prices_monthly = {ticker: ticker_data_dict[ticker]["price"] for ticker in tickers} - current_yields_monthly = {ticker: ticker_data_dict[ticker]["yield_annual"] for ticker in tickers} - - for month in range(1, 13): - # Calculate dividends for this month based on current yields and prices - month_dividend = sum( - (current_yields_monthly[ticker] / 12) * - (initial_shares[ticker] * current_prices_monthly[ticker]) - for ticker in tickers - ) - withdrawn_income += month_dividend - monthly_dividends.append(month_dividend) - - # Apply erosion for next month - if erosion_type != "None": - for ticker in tickers: - # Apply NAV erosion - if ticker in ticker_nav_rates and ticker_nav_rates[ticker] > 0: - current_prices_monthly[ticker] *= (1 - ticker_nav_rates[ticker]) - - # Apply yield erosion - if ticker in ticker_yield_rates and ticker_yield_rates[ticker] > 0: - current_yields_monthly[ticker] *= (1 - ticker_yield_rates[ticker]) - - # Calculate total economic result - nodrip_total = nodrip_final_value + withdrawn_income - drip_total = drip_final_value - - # Display economic comparison - st.subheader("Economic Comparison") - - col1, col2 = st.columns(2) - - with col1: - st.markdown("#### No-DRIP KPIs") - st.metric("Final Portfolio Value", f"${nodrip_final_value:,.2f}") - st.metric("Total Income Withdrawn", f"${withdrawn_income:,.2f}") - st.metric("Final Monthly Income", f"${final_monthly_income_nodrip:,.2f}") - st.metric("Total Economic Result", f"${nodrip_total:,.2f}") - - with col2: - st.markdown("#### DRIP KPIs") - st.metric("Final Portfolio Value", f"${drip_final_value:,.2f}") - st.metric("Final Monthly Income", f"${drip_final_monthly_income:,.2f}") - st.metric("Total Economic Result", f"${drip_total:,.2f}") - - # Display monthly income comparison chart - st.subheader("Monthly Income Comparison") - - # Create monthly income comparison chart - fig3 = go.Figure() - - # Add No-DRIP monthly income line - fig3.add_trace(go.Scatter( - x=list(range(1, 13)), - y=monthly_dividends, - mode='lines', - name='No-DRIP Monthly Income', - line=dict(color='#ff7f0e', width=2) - )) - - # Add DRIP monthly income line - drip_monthly_income = [data.monthly_income for data in drip_result.monthly_data] - fig3.add_trace(go.Scatter( - x=list(range(1, 13)), - y=drip_monthly_income, - mode='lines', - name='DRIP Monthly Income', - line=dict(color='#1f77b4', width=2) - )) - - fig3.update_layout( - title="Monthly Income: DRIP vs No-DRIP Strategy", - xaxis_title="Month", - yaxis_title="Monthly Income ($)", - template="plotly_dark", - showlegend=True - ) - - st.plotly_chart(fig3, use_container_width=True) - - # Display advantages of DRIP strategy - st.subheader("DRIP Strategy Advantages") - - # Calculate percentage differences - value_diff_pct = ((drip_final_value - nodrip_final_value) / nodrip_final_value) * 100 - income_diff_pct = ((drip_final_monthly_income - final_monthly_income_nodrip) / final_monthly_income_nodrip) * 100 - - st.markdown(f""" - - **Portfolio Value**: DRIP strategy results in a {value_diff_pct:.1f}% higher final portfolio value - - **Monthly Income**: DRIP strategy generates {income_diff_pct:.1f}% higher monthly income - - **Risk Mitigation**: DRIP helps mitigate erosion effects by continuously acquiring more shares - - **Compounding Effect**: Reinvested dividends generate additional income through compounding - """) - - with tab3: - st.subheader("📉 Erosion Risk Assessment") - - # Add explanatory text - st.write(""" - This analysis uses historical ETF data to estimate reasonable erosion settings - based on past performance, volatility, and dividend history. - """) - - # Initialize the NAV erosion service + st.error(f"Error displaying income target information: {str(e)}") + else: try: - from ETF_Portal.services.nav_erosion_service import NavErosionService - - # Run the analysis in a spinner - with st.spinner("Analyzing historical ETF data..."): - erosion_service = NavErosionService() - risk_analysis = erosion_service.analyze_etf_erosion_risk(final_alloc["Ticker"].tolist()) - except ImportError as e: - st.error(f"Error importing NavErosionService: {str(e)}") - st.error("Please ensure the nav_erosion_service module is properly installed.") - logger.error(f"Import error: {str(e)}") - logger.error(traceback.format_exc()) - risk_analysis = None + initial_capital = st.session_state.initial_capital + annual_income = final_alloc["Income Contributed ($)"].sum() + monthly_income = annual_income / 12 + st.info(f"💲 **Capital Investment Mode**: Your ${initial_capital:,.2f} investment generates ${monthly_income:,.2f} in monthly income (${annual_income:,.2f} annually).") + except Exception as e: + st.error(f"Error displaying capital investment information: {str(e)}") - if risk_analysis and risk_analysis.results: - # Create a summary table with key insights - risk_data = [] - for result in risk_analysis.results: - risk_data.append({ - "Ticker": result.ticker, - "NAV Erosion Risk (0-9)": result.nav_erosion_risk, - "Yield Erosion Risk (0-9)": result.yield_erosion_risk, - "Estimated Annual NAV Erosion": f"{result.estimated_nav_erosion:.1%}", - "Estimated Annual Yield Erosion": f"{result.estimated_yield_erosion:.1%}", - "NAV Risk Explanation": result.nav_risk_explanation, - "Yield Risk Explanation": result.yield_risk_explanation, - "ETF Age (Years)": f"{result.etf_age_years:.1f}" if result.etf_age_years else "Unknown", - "Max Drawdown": f"{result.max_drawdown:.1%}" if result.max_drawdown else "Unknown", - "Volatility": f"{result.volatility:.1%}" if result.volatility else "Unknown", - "Sharpe Ratio": f"{result.sharpe_ratio:.2f}" if result.sharpe_ratio else "Unknown", - "Sortino Ratio": f"{result.sortino_ratio:.2f}" if result.sortino_ratio else "Unknown", - "Dividend Trend": f"{result.dividend_trend:.1%}" if result.dividend_trend else "Unknown" - }) - - # Display main assessment table - st.subheader("Recommended Erosion Settings") - main_columns = [ - "Ticker", - "NAV Erosion Risk (0-9)", - "Yield Erosion Risk (0-9)", - "Estimated Annual NAV Erosion", - "Estimated Annual Yield Erosion", - "NAV Risk Explanation", - "Yield Risk Explanation" - ] - - st.dataframe( - pd.DataFrame(risk_data)[main_columns], - use_container_width=True, - hide_index=True - ) - - # Display detailed metrics - st.subheader("Detailed Risk Metrics") - detail_columns = [ - "Ticker", - "ETF Age (Years)", - "Max Drawdown", - "Volatility", - "Sharpe Ratio", - "Sortino Ratio", - "Dividend Trend" - ] - - st.dataframe( - pd.DataFrame(risk_data)[detail_columns], - use_container_width=True, - hide_index=True - ) - - # Allow applying these settings to the simulation - if st.button("Apply Recommended Erosion Settings", type="primary"): - # Initialize or update per-ticker erosion settings - if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): - st.session_state.per_ticker_erosion = {} - - # Update the session state with recommended settings - for result in risk_analysis.results: - st.session_state.per_ticker_erosion[result.ticker] = { - "nav": result.nav_erosion_risk, - "yield": result.yield_erosion_risk - } - - # Enable erosion and per-ticker settings - st.session_state.erosion_type = "NAV & Yield Erosion" - st.session_state.use_per_ticker_erosion = True - - # Update the erosion_level variable to match the new settings - erosion_level = { - "global": { - "nav": 5, # Default medium level for global fallback - "yield": 5 - }, - "per_ticker": st.session_state.per_ticker_erosion, - "use_per_ticker": True + with tab2: + st.subheader("Dividend Reinvestment (DRIP) Forecast") + st.write("This forecast shows the growth of your portfolio over time if dividends are reinvested instead of taken as income.") + + try: + # Get portfolio data + tickers = final_alloc["Ticker"].tolist() + initial_investment = final_alloc["Capital Allocated ($)"].sum() + risk_tolerance = st.session_state.get("risk_tolerance", "Moderate") + + # DRIP simulation parameters + months = st.slider("Forecast Period (Months)", 1, 60, 12) + + # Initialize DRIP service + from ETF_Portal.services.drip_service import DRIPService + + # Initialize DRIP service + drip_service = DRIPService() + + # Calculate DRIP forecast + portfolio_result = drip_service.forecast_portfolio( + portfolio_df=final_alloc, + config=DripConfig( + months=months, + erosion_type=st.session_state.get("erosion_type", "None"), + erosion_level={ + "nav": st.session_state.get("erosion_level", {}).get("nav", 0), + "yield": st.session_state.get("erosion_level", {}).get("yield", 0) } - - # Update session state erosion level for DRIP forecast - st.session_state.erosion_level = erosion_level - - st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.") - st.info("Go to the DRIP Forecast tab to see the impact of these settings.") - else: - st.error("Unable to analyze ETF erosion risk. Please try again.") + ), + tickers=tickers + ) + + # Display DRIP forecast results + display_drip_forecast(portfolio_result, tickers) + + except Exception as e: + st.error(f"Error calculating DRIP forecast: {str(e)}") + logger.error(f"DRIP forecast error: {str(e)}") + logger.error(traceback.format_exc()) - with tab4: - st.subheader("🤖 AI Suggestions") - # Add AI suggestions content - st.write("This tab will contain AI suggestions for portfolio optimization.") + with tab3: + st.subheader("📉 Erosion Risk Assessment") + st.write(""" + This analysis uses historical ETF data to estimate reasonable erosion settings + based on past performance, volatility, and dividend history. + """) + + try: + from ETF_Portal.services.nav_erosion_service import NavErosionService + with st.spinner("Analyzing historical ETF data..."): + erosion_service = NavErosionService() + risk_analysis = erosion_service.analyze_etf_erosion_risk(final_alloc["Ticker"].tolist()) + except ImportError as e: + st.error(f"Error importing NavErosionService: {str(e)}") + st.error("Please ensure the nav_erosion_service module is properly installed.") + logger.error(f"Import error: {str(e)}") + logger.error(traceback.format_exc()) + risk_analysis = None - with tab5: - st.subheader("📊 ETF Details") - # Add ETF details content - st.write("This tab will contain detailed information about the selected ETFs.") \ No newline at end of file + if risk_analysis and risk_analysis.results: + risk_data = [] + for result in risk_analysis.results: + risk_data.append({ + "Ticker": result.ticker, + "NAV Erosion Risk (0-9)": result.nav_erosion_risk, + "Yield Erosion Risk (0-9)": result.yield_erosion_risk, + "Estimated Annual NAV Erosion": f"{result.estimated_nav_erosion:.1%}", + "Estimated Annual Yield Erosion": f"{result.estimated_yield_erosion:.1%}", + "NAV Risk Explanation": result.nav_risk_explanation, + "Yield Risk Explanation": result.yield_risk_explanation, + "ETF Age (Years)": f"{result.etf_age_years:.1f}" if result.etf_age_years else "Unknown", + "Max Drawdown": f"{result.max_drawdown:.1%}" if result.max_drawdown else "Unknown", + "Volatility": f"{result.volatility:.1%}" if result.volatility else "Unknown", + "Sharpe Ratio": f"{result.sharpe_ratio:.2f}" if result.sharpe_ratio else "Unknown", + "Sortino Ratio": f"{result.sortino_ratio:.2f}" if result.sortino_ratio else "Unknown", + "Dividend Trend": f"{result.dividend_trend:.1%}" if result.dividend_trend else "Unknown" + }) + + st.subheader("Recommended Erosion Settings") + main_columns = [ + "Ticker", + "NAV Erosion Risk (0-9)", + "Yield Erosion Risk (0-9)", + "Estimated Annual NAV Erosion", + "Estimated Annual Yield Erosion", + "NAV Risk Explanation", + "Yield Risk Explanation" + ] + + st.dataframe( + pd.DataFrame(risk_data)[main_columns], + use_container_width=True, + hide_index=True + ) + + st.subheader("Detailed Risk Metrics") + detail_columns = [ + "Ticker", + "ETF Age (Years)", + "Max Drawdown", + "Volatility", + "Sharpe Ratio", + "Sortino Ratio", + "Dividend Trend" + ] + + st.dataframe( + pd.DataFrame(risk_data)[detail_columns], + use_container_width=True, + hide_index=True + ) + + if st.button("Apply Recommended Erosion Settings", type="primary"): + if "per_ticker_erosion" not in st.session_state or not isinstance(st.session_state.per_ticker_erosion, dict): + st.session_state.per_ticker_erosion = {} + + for result in risk_analysis.results: + st.session_state.per_ticker_erosion[result.ticker] = { + "nav": result.nav_erosion_risk, + "yield": result.yield_erosion_risk + } + + st.session_state.erosion_type = "NAV & Yield Erosion" + st.session_state.use_per_ticker_erosion = True + + erosion_level = { + "global": { + "nav": 5, + "yield": 5 + }, + "per_ticker": st.session_state.per_ticker_erosion, + "use_per_ticker": True + } + + st.session_state.erosion_level = erosion_level + + st.success("Applied recommended erosion settings. They will be used in the DRIP forecast.") + st.info("Go to the DRIP Forecast tab to see the impact of these settings.") + else: + st.error("Unable to analyze ETF erosion risk. Please try again.") + + with tab4: + st.subheader("🤖 AI Suggestions") + st.write("This tab will contain AI suggestions for portfolio optimization.") + + with tab5: + st.subheader("📊 ETF Details") + st.write("This tab will contain detailed information about the selected ETFs.") diff --git a/services/drip_service/service.py b/services/drip_service/service.py index d9bc085..19a2e79 100644 --- a/services/drip_service/service.py +++ b/services/drip_service/service.py @@ -1,27 +1,60 @@ from typing import Dict, List, Optional, Tuple, Any import pandas as pd +import numpy as np import logging import traceback +from dataclasses import dataclass, field +from enum import Enum from .models import PortfolioAllocation, MonthlyData, DripConfig, DripResult +from ..nav_erosion_service import NavErosionService # Configure logging logger = logging.getLogger(__name__) +class DistributionFrequency(Enum): + """Enum for distribution frequencies""" + MONTHLY = ("Monthly", 12) + QUARTERLY = ("Quarterly", 4) + SEMI_ANNUALLY = ("Semi-Annually", 2) + ANNUALLY = ("Annually", 1) + UNKNOWN = ("Unknown", 12) + + def __init__(self, name: str, payments_per_year: int): + self.display_name = name + self.payments_per_year = payments_per_year + +@dataclass +class TickerData: + """Data structure for individual ticker information""" + ticker: str + price: float + annual_yield: float + shares: float + allocation_pct: float + distribution_freq: DistributionFrequency + + @property + def market_value(self) -> float: + return self.price * self.shares + + @property + def monthly_yield(self) -> float: + return self.annual_yield / 12 + + @property + def distribution_yield(self) -> float: + return self.annual_yield / self.distribution_freq.payments_per_year + class DripService: + """Enhanced DRIP calculation service with improved performance and accuracy""" + def __init__(self) -> None: - self.MAX_EROSION_LEVEL = 9 - self.max_monthly_erosion = 1 - (0.1)**(1/12) # ~17.54% monthly for 90% annual erosion - self.dividend_frequency = { - "Monthly": 12, - "Quarterly": 4, - "Semi-Annually": 2, - "Annually": 1, - "Unknown": 12 # Default to monthly if unknown - } + self.DISTRIBUTION_FREQUENCIES = {freq.display_name: freq for freq in DistributionFrequency} + self.nav_erosion_service = NavErosionService() def calculate_drip_growth(self, portfolio_df: pd.DataFrame, config: DripConfig) -> DripResult: """ - Calculate DRIP growth for a portfolio over a specified period. + Calculate DRIP growth for a portfolio over a specified period with enhanced accuracy. Args: portfolio_df: DataFrame containing portfolio allocation @@ -31,250 +64,299 @@ class DripService: DripResult object containing the simulation results """ try: - # Initialize monthly data list + # Validate inputs + self._validate_inputs(portfolio_df, config) + + # Get erosion data from nav_erosion_service + erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) + erosion_rates = { + result.ticker: { + "nav": result.estimated_nav_erosion / 100, # Convert to decimal + "yield": result.estimated_yield_erosion / 100 # Convert to decimal + } + for result in erosion_data.results + } + + # Initialize portfolio data + ticker_data = self._initialize_ticker_data(portfolio_df) + + # Pre-calculate distribution schedule for performance + distribution_schedule = self._create_distribution_schedule(ticker_data, config.months) + + # Initialize simulation state + simulation_state = self._initialize_simulation_state(ticker_data) monthly_data: List[MonthlyData] = [] - # Get initial values - initial_shares = self._calculate_initial_shares(portfolio_df) - initial_prices = dict(zip(portfolio_df["Ticker"], portfolio_df["Price"])) - initial_yields = dict(zip(portfolio_df["Ticker"], portfolio_df["Yield (%)"] / 100)) - - # Initialize tracking variables - current_shares = initial_shares.copy() - current_prices = initial_prices.copy() - current_yields = initial_yields.copy() - cumulative_income = 0.0 - - # Run simulation for each month + # Run monthly simulation for month in range(1, config.months + 1): - # Calculate monthly income - monthly_income = sum( - (current_yields[ticker] / 12) * - (current_shares[ticker] * current_prices[ticker]) - for ticker in current_shares.keys() + month_result = self._simulate_month( + month, + simulation_state, + ticker_data, + erosion_rates, + distribution_schedule ) - - # Update cumulative income - cumulative_income += monthly_income - - # Calculate total portfolio value - total_value = sum( - current_shares[ticker] * current_prices[ticker] - for ticker in current_shares.keys() - ) - - # Apply erosion if enabled - if config.erosion_type != "None": - current_prices, current_yields = self._apply_erosion( - current_prices, - current_yields, - config.erosion_type, - config.erosion_level - ) - - # Reinvest dividends - for ticker in current_shares.keys(): - dividend_income = (current_yields[ticker] / 12) * (current_shares[ticker] * current_prices[ticker]) - new_shares = dividend_income / current_prices[ticker] - current_shares[ticker] += new_shares - - # Store monthly data - monthly_data.append(MonthlyData( - month=month, - total_value=total_value, - monthly_income=monthly_income, - cumulative_income=cumulative_income, - shares=current_shares.copy(), - prices=current_prices.copy(), - yields=current_yields.copy() - )) + monthly_data.append(month_result) - # Calculate final values - final_portfolio_value = monthly_data[-1].total_value - total_income = monthly_data[-1].cumulative_income - total_shares = current_shares.copy() - - return DripResult( - monthly_data=monthly_data, - final_portfolio_value=final_portfolio_value, - total_income=total_income, - total_shares=total_shares - ) + # Calculate final results + return self._create_drip_result(monthly_data, simulation_state) except Exception as e: logger.error(f"Error calculating DRIP growth: {str(e)}") logger.error(traceback.format_exc()) raise - - def _calculate_initial_shares(self, portfolio_df: pd.DataFrame) -> Dict[str, float]: - """Calculate initial shares for each ETF.""" - return dict(zip(portfolio_df["Ticker"], portfolio_df["Shares"])) - - def _apply_erosion( - self, - prices: Dict[str, float], - yields: Dict[str, float], - erosion_type: str, - erosion_level: Dict[str, Any] - ) -> Tuple[Dict[str, float], Dict[str, float]]: - """ - Apply erosion to prices and yields based on configuration. - - Args: - prices: Dictionary of current prices - yields: Dictionary of current yields - erosion_type: Type of erosion to apply - erosion_level: Dictionary containing erosion levels - - Returns: - Tuple of updated prices and yields - """ - try: - updated_prices = prices.copy() - updated_yields = yields.copy() - - if erosion_type == "None": - return updated_prices, updated_yields - - if erosion_level.get("use_per_ticker", False): - # Apply per-ticker erosion rates - for ticker in prices.keys(): - if ticker in erosion_level: - ticker_erosion = erosion_level[ticker] - # Apply monthly erosion - nav_erosion = (ticker_erosion["nav"] / 9) * 0.1754 - yield_erosion = (ticker_erosion["yield"] / 9) * 0.1754 - - updated_prices[ticker] *= (1 - nav_erosion) - updated_yields[ticker] *= (1 - yield_erosion) - else: - # Apply global erosion rates - nav_erosion = (erosion_level["global"]["nav"] / 9) * 0.1754 - yield_erosion = (erosion_level["global"]["yield"] / 9) * 0.1754 - - for ticker in prices.keys(): - updated_prices[ticker] *= (1 - nav_erosion) - updated_yields[ticker] *= (1 - yield_erosion) - - return updated_prices, updated_yields - - except Exception as e: - logger.error(f"Error applying erosion: {str(e)}") - logger.error(traceback.format_exc()) - return prices, yields - def _initialize_erosion_rates( - self, - tickers: List[str], - erosion_type: str, - erosion_level: Dict[str, Any] - ) -> Tuple[Dict[str, float], Dict[str, float]]: - """Initialize erosion rates for each ticker based on configuration.""" - ticker_nav_rates: Dict[str, float] = {} - ticker_yield_rates: Dict[str, float] = {} + def _validate_inputs(self, portfolio_df: pd.DataFrame, config: DripConfig) -> None: + """Validate input parameters""" + required_columns = ["Ticker", "Price", "Yield (%)", "Shares"] + missing_columns = [col for col in required_columns if col not in portfolio_df.columns] - if erosion_type != "None" and isinstance(erosion_level, dict): - if erosion_level.get("use_per_ticker", False) and "per_ticker" in erosion_level: - global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - - for ticker in tickers: - ticker_settings = erosion_level["per_ticker"].get(ticker, {"nav": 0, "yield": 0}) - ticker_nav_rates[ticker] = ticker_settings["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - ticker_yield_rates[ticker] = ticker_settings["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - else: - global_nav = erosion_level["global"]["nav"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - global_yield = erosion_level["global"]["yield"] / self.MAX_EROSION_LEVEL * self.max_monthly_erosion - - for ticker in tickers: - ticker_nav_rates[ticker] = global_nav - ticker_yield_rates[ticker] = global_yield - else: - for ticker in tickers: - ticker_nav_rates[ticker] = 0 - ticker_yield_rates[ticker] = 0 - - return ticker_nav_rates, ticker_yield_rates + if missing_columns: + raise ValueError(f"Missing required columns: {missing_columns}") + + if config.months <= 0: + raise ValueError("Months must be positive") + + if portfolio_df.empty: + raise ValueError("Portfolio DataFrame is empty") - def _create_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: - """Create a dictionary of ticker-specific data.""" - ticker_data: Dict[str, Dict[str, Any]] = {} + def _initialize_ticker_data(self, portfolio_df: pd.DataFrame) -> Dict[str, TickerData]: + """Initialize ticker data with validation""" + ticker_data = {} + for _, row in portfolio_df.iterrows(): ticker = row["Ticker"] - ticker_data[ticker] = { - "price": row["Price"], - "yield_annual": row["Yield (%)"] / 100, - "initial_shares": row["Capital Allocated ($)"] / row["Price"], - "initial_allocation": row["Allocation (%)"] / 100, - "distribution": row.get("Distribution Period", "Monthly") - } + + # Handle distribution frequency + dist_period = row.get("Distribution Period", "Monthly") + dist_freq = self.DISTRIBUTION_FREQUENCIES.get(dist_period, DistributionFrequency.MONTHLY) + + ticker_data[ticker] = TickerData( + ticker=ticker, + price=max(0.01, float(row["Price"])), # Prevent zero/negative prices + annual_yield=max(0.0, float(row["Yield (%)"] / 100)), # Convert to decimal + shares=max(0.0, float(row["Shares"])), + allocation_pct=float(row.get("Allocation (%)", 0) / 100), + distribution_freq=dist_freq + ) + return ticker_data - def _calculate_monthly_income( - self, - current_shares: Dict[str, float], - current_prices: Dict[str, float], - current_yields: Dict[str, float], - tickers: List[str] - ) -> float: - """Calculate expected monthly income based on current portfolio and yields.""" - return sum( - (current_yields[ticker] / 12) * - (current_shares[ticker] * current_prices[ticker]) - for ticker in tickers - ) + def _create_distribution_schedule(self, ticker_data: Dict[str, TickerData], total_months: int) -> Dict[str, List[int]]: + """Pre-calculate which months each ticker pays distributions""" + schedule = {} + + for ticker, data in ticker_data.items(): + distribution_months = [] + freq = data.distribution_freq + + for month in range(1, total_months + 1): + if self._is_distribution_month(month, freq): + distribution_months.append(month) + + schedule[ticker] = distribution_months + + return schedule - def _create_month_data( + def _initialize_simulation_state(self, ticker_data: Dict[str, TickerData]) -> Dict[str, Any]: + """Initialize simulation state variables""" + return { + 'current_shares': {ticker: data.shares for ticker, data in ticker_data.items()}, + 'current_prices': {ticker: data.price for ticker, data in ticker_data.items()}, + 'current_yields': {ticker: data.annual_yield for ticker, data in ticker_data.items()}, + 'cumulative_income': 0.0 + } + + def _simulate_month( self, month: int, - current_total_value: float, - monthly_income: float, - cumulative_income: float, - current_shares: Dict[str, float], - current_prices: Dict[str, float], - current_yields: Dict[str, float], - tickers: List[str] + state: Dict[str, Any], + ticker_data: Dict[str, TickerData], + erosion_rates: Dict[str, Dict[str, float]], + distribution_schedule: Dict[str, List[int]] ) -> MonthlyData: - """Create monthly data object.""" + """Simulate a single month with improved accuracy""" + + # Calculate monthly income from distributions + monthly_income = self._calculate_monthly_distributions( + month, state, ticker_data, distribution_schedule + ) + + # Update cumulative income + state['cumulative_income'] += monthly_income + + # Apply erosion to prices and yields using nav_erosion_service data + self._apply_monthly_erosion(state, erosion_rates) + + # Reinvest dividends (DRIP) + self._reinvest_dividends(month, state, distribution_schedule) + + # Calculate total portfolio value with bounds checking + total_value = 0.0 + for ticker in ticker_data.keys(): + shares = state['current_shares'][ticker] + price = state['current_prices'][ticker] + if shares > 0 and price > 0: + total_value += shares * price + return MonthlyData( month=month, - total_value=current_total_value, + total_value=total_value, monthly_income=monthly_income, - cumulative_income=cumulative_income, - shares=current_shares.copy(), - prices=current_prices.copy(), - yields=current_yields.copy() + cumulative_income=state['cumulative_income'], + shares=state['current_shares'].copy(), + prices=state['current_prices'].copy(), + yields=state['current_yields'].copy() ) - def _calculate_and_reinvest_dividends( + def _calculate_monthly_distributions( self, - month: int, - ticker_data: Dict[str, Dict[str, Any]], - current_shares: Dict[str, float], - current_prices: Dict[str, float], - current_yields: Dict[str, float], - ticker_yield_rates: Dict[str, float], - dividend_frequency: Dict[str, int] + month: int, + state: Dict[str, Any], + ticker_data: Dict[str, TickerData], + distribution_schedule: Dict[str, List[int]] ) -> float: - """Calculate dividends and reinvest them proportionally.""" - month_dividends: Dict[str, float] = {} - for ticker, data in ticker_data.items(): - freq = dividend_frequency[data["distribution"]] - if month % (12 / freq) == 0: - if ticker_yield_rates[ticker] > 0: - dividend = (current_yields[ticker] / freq) * current_shares[ticker] * current_prices[ticker] - else: - dividend = (data["yield_annual"] / freq) * current_shares[ticker] * current_prices[ticker] - else: - dividend = 0 - month_dividends[ticker] = dividend - - total_month_dividend = sum(month_dividends.values()) + """Calculate distributions for the current month""" + monthly_income = 0.0 - # Reinvest dividends proportionally for ticker, data in ticker_data.items(): - if current_prices[ticker] > 0: - new_shares = (total_month_dividend * data["initial_allocation"]) / current_prices[ticker] - current_shares[ticker] += new_shares + if month in distribution_schedule[ticker]: + shares = state['current_shares'][ticker] + price = state['current_prices'][ticker] + yield_rate = state['current_yields'][ticker] - return total_month_dividend \ No newline at end of file + # Calculate distribution amount using annual yield divided by payments per year + distribution_yield = yield_rate / data.distribution_freq.payments_per_year + distribution_amount = shares * price * distribution_yield + monthly_income += distribution_amount + + return monthly_income + + def _apply_monthly_erosion( + self, + state: Dict[str, Any], + erosion_rates: Dict[str, Dict[str, float]] + ) -> None: + """Apply erosion to current prices and yields using nav_erosion_service data""" + for ticker, rates in erosion_rates.items(): + if ticker in state['current_prices']: + # Apply monthly erosion rates + monthly_nav_erosion = rates['nav'] / 12 + monthly_yield_erosion = rates['yield'] / 12 + + # Apply erosion with bounds checking + state['current_prices'][ticker] = max(0.01, state['current_prices'][ticker] * (1 - monthly_nav_erosion)) + state['current_yields'][ticker] = max(0.0, state['current_yields'][ticker] * (1 - monthly_yield_erosion)) + + def _reinvest_dividends( + self, + month: int, + state: Dict[str, Any], + distribution_schedule: Dict[str, List[int]] + ) -> None: + """Reinvest dividends for tickers that distributed in this month""" + + for ticker, distribution_months in distribution_schedule.items(): + if month in distribution_months: + shares = state['current_shares'][ticker] + price = state['current_prices'][ticker] + yield_rate = state['current_yields'][ticker] + + # Calculate dividend income using the correct distribution frequency + freq = self.DISTRIBUTION_FREQUENCIES.get(ticker, DistributionFrequency.MONTHLY) + dividend_income = shares * price * (yield_rate / freq.payments_per_year) + + # Purchase additional shares + if price > 0: + new_shares = dividend_income / price + state['current_shares'][ticker] += new_shares + + def _is_distribution_month(self, month: int, frequency: DistributionFrequency) -> bool: + """Check if current month is a distribution month""" + if frequency == DistributionFrequency.MONTHLY: + return True + elif frequency == DistributionFrequency.QUARTERLY: + return month % 3 == 0 + elif frequency == DistributionFrequency.SEMI_ANNUALLY: + return month % 6 == 0 + elif frequency == DistributionFrequency.ANNUALLY: + return month % 12 == 0 + else: + return True # Default to monthly for unknown + + def _create_drip_result(self, monthly_data: List[MonthlyData], state: Dict[str, Any]) -> DripResult: + """Create final DRIP result object""" + if not monthly_data: + raise ValueError("No monthly data generated") + + final_data = monthly_data[-1] + + return DripResult( + monthly_data=monthly_data, + final_portfolio_value=final_data.total_value, + total_income=final_data.cumulative_income, + total_shares=state['current_shares'].copy() + ) + + # Utility methods for analysis and comparison + def calculate_drip_vs_no_drip_comparison( + self, + portfolio_df: pd.DataFrame, + config: DripConfig + ) -> Dict[str, Any]: + """Calculate comparison between DRIP and no-DRIP scenarios""" + + # Calculate DRIP scenario + drip_result = self.calculate_drip_growth(portfolio_df, config) + + # Calculate no-DRIP scenario (dividends not reinvested) + no_drip_result = self._calculate_no_drip_scenario(portfolio_df, config) + + # Calculate comparison metrics + drip_advantage = drip_result.final_portfolio_value - no_drip_result['final_value'] + percentage_advantage = (drip_advantage / no_drip_result['final_value']) * 100 + + return { + 'drip_final_value': drip_result.final_portfolio_value, + 'no_drip_final_value': no_drip_result['final_value'], + 'drip_advantage': drip_advantage, + 'percentage_advantage': percentage_advantage, + 'total_dividends_reinvested': drip_result.total_income, + 'cash_dividends_no_drip': no_drip_result['total_dividends'] + } + + def _calculate_no_drip_scenario(self, portfolio_df: pd.DataFrame, config: DripConfig) -> Dict[str, float]: + """Calculate scenario where dividends are not reinvested""" + ticker_data = self._initialize_ticker_data(portfolio_df) + erosion_data = self.nav_erosion_service.analyze_etf_erosion_risk(portfolio_df["Ticker"].tolist()) + erosion_rates = { + result.ticker: { + "nav": result.estimated_nav_erosion / 100, + "yield": result.estimated_yield_erosion / 100 + } + for result in erosion_data.results + } + state = self._initialize_simulation_state(ticker_data) + + total_dividends = 0.0 + + for month in range(1, config.months + 1): + # Calculate dividends but don't reinvest + monthly_dividends = self._calculate_monthly_distributions( + month, state, ticker_data, + self._create_distribution_schedule(ticker_data, config.months) + ) + total_dividends += monthly_dividends + + # Apply erosion + self._apply_monthly_erosion(state, erosion_rates) + + final_value = sum( + state['current_shares'][ticker] * state['current_prices'][ticker] + for ticker in ticker_data.keys() + ) + + return { + 'final_value': final_value, + 'total_dividends': total_dividends + } \ No newline at end of file diff --git a/services/nav_erosion_service/__init__.py b/services/nav_erosion_service/__init__.py new file mode 100644 index 0000000..ef3a1b2 --- /dev/null +++ b/services/nav_erosion_service/__init__.py @@ -0,0 +1,8 @@ +""" +Nav Erosion Service package +""" + +from .service import NavErosionService +from .models import NavErosionResult + +__all__ = ['NavErosionService', 'NavErosionResult'] \ No newline at end of file