# Set page config first, before any other Streamlit commands st.set_page_config( page_title="ETF Analyzer", page_icon="📊", layout="wide", initial_sidebar_state="expanded" ) """ ETF Analyzer - Comprehensive ETF Analysis Tool This application provides in-depth analysis of ETFs using data from the Financial Modeling Prep API. It allows users to research, compare, and analyze ETFs before adding them to their portfolio simulations. """ import streamlit as st import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import requests import os import json from datetime import datetime, timedelta from pathlib import Path import hashlib import time from typing import Dict, List, Tuple, Any, Optional, Union import sys import yfinance as yf from dotenv import load_dotenv import logging # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # FMP API configuration FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', '')) FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" def test_fmp_connection(): """Test the FMP API connection and display status.""" try: if not FMP_API_KEY: return False, "No API key found" session = get_fmp_session() test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}" response = session.get(test_url) if response.status_code == 200: data = response.json() if data and isinstance(data, list) and len(data) > 0: return True, "Connected" return False, f"Error: {response.status_code}" except Exception as e: return False, f"Error: {str(e)}" # Add FMP connection status to the navigation bar st.sidebar.markdown("---") st.sidebar.subheader("FMP API Status") connection_status, message = test_fmp_connection() if connection_status: st.sidebar.success(f"✅ FMP API: {message}") else: st.sidebar.error(f"❌ FMP API: {message}") # --- Constants and Settings --- CACHE_DIR = Path("cache") CACHE_EXPIRATION_DAYS = 7 DEFAULT_CHART_HEIGHT = 500 MAX_ETF_COMPARISON = 5 API_RATE_LIMIT_DELAY = 0.5 # seconds between API calls to avoid rate limiting # --- Initialize Streamlit Page --- st.set_page_config( page_title="ETF Analyzer", page_icon="📊", layout="wide", initial_sidebar_state="expanded" ) # Add navigation in sidebar with st.sidebar: st.markdown("### Navigation") if st.button("🏠 ETF Suite Launcher", key="launcher_analyzer"): st.switch_page("pages/ETF_Suite_Launcher.py") if st.button("💼 Portfolio Builder", key="portfolio_analyzer"): st.switch_page("pages/ETF_Portfolio_Builder.py") # --- Functions --- def setup_cache_dir() -> Path: """Set up cache directory if it doesn't exist.""" CACHE_DIR.mkdir(exist_ok=True) return CACHE_DIR def generate_cache_key(endpoint: str, params: Dict = None) -> str: """Generate a unique cache key for a data request.""" if params is None: params = {} params_str = json.dumps(params, sort_keys=True) key_string = f"{endpoint}_{params_str}" return hashlib.md5(key_string.encode()).hexdigest() def get_cache_path(cache_key: str) -> Path: """Get the file path for a cache key.""" return CACHE_DIR / f"{cache_key}.json" def save_to_cache(cache_key: str, data: Any) -> None: """Save data to cache with timestamp.""" cache_file = get_cache_path(cache_key) cache_data = { "data": data, "timestamp": datetime.now().isoformat() } try: with open(cache_file, 'w') as f: json.dump(cache_data, f) except Exception as e: st.error(f"Error saving to cache: {str(e)}") def load_from_cache(cache_key: str) -> Tuple[Any, bool]: """Load data from cache if it exists and is not expired. Returns: Tuple of (data, is_valid) """ cache_file = get_cache_path(cache_key) if not cache_file.exists(): return None, False try: with open(cache_file, 'r') as f: cache_data = json.load(f) # Check if cache is expired timestamp = datetime.fromisoformat(cache_data["timestamp"]) if datetime.now() - timestamp > timedelta(days=CACHE_EXPIRATION_DAYS): return cache_data["data"], False # Expired but usable as fallback return cache_data["data"], True # Valid cache except Exception as e: st.error(f"Error loading from cache: {str(e)}") return None, False def fmp_request(endpoint: str, params: Dict = None, force_refresh: bool = False, debug_mode: bool = False) -> Dict: """Make a request to the FMP API with caching.""" if params is None: params = {} # Get API key api_key = os.environ.get("FMP_API_KEY") if not api_key: api_key = st.session_state.get("fmp_api_key") if not api_key: st.error("FMP API key not found. Please enter it in the sidebar.") return {"error": "API key not found. Please enter a valid API key in the sidebar."} # Add API key to parameters params["apikey"] = api_key # Debug mode - show API request details if debug_mode: st.write("API Key (first 4 chars):", api_key[:4] + "..." if api_key and len(api_key) > 4 else "None") # Generate cache key cache_key = generate_cache_key(endpoint, params) # Try to load from cache first if not forcing refresh if not force_refresh: cached_data, is_valid = load_from_cache(cache_key) if is_valid: if debug_mode: st.write("✅ Data loaded from cache") return cached_data # Make the API request base_url = "https://financialmodelingprep.com/api/v3" url = f"{base_url}/{endpoint}" if debug_mode: st.write("🌐 Making API request to:", url) st.write("Parameters:", {k: (v[:4] + "..." if k == "apikey" and v and len(v) > 4 else v) for k, v in params.items()}) try: # Add timeout to prevent hanging on API requests response = requests.get(url, params=params, timeout=10) # Add small delay to avoid rate limiting time.sleep(API_RATE_LIMIT_DELAY) if debug_mode: st.write("📡 API Response Status Code:", response.status_code) st.write("📡 Response Headers:", dict(response.headers)) if response.status_code == 200: try: data = response.json() if debug_mode: if isinstance(data, list): st.write(f"✅ Response is a list with {len(data)} items") if len(data) > 0: st.write("First item sample:", list(data[0].keys()) if isinstance(data[0], dict) else data[0]) elif isinstance(data, dict): st.write(f"✅ Response is a dictionary with {len(data)} keys") st.write("Keys:", list(data.keys())) # Check if the response is an empty list or empty object if isinstance(data, list) and len(data) == 0: return {"error": "No data available for this request", "empty": True} elif isinstance(data, dict) and len(data) == 0: return {"error": "No data available for this request", "empty": True} # Cache the response save_to_cache(cache_key, data) # Track API calls if counter exists if "api_calls" in st.session_state: st.session_state.api_calls += 1 return data except json.JSONDecodeError as e: error_msg = f"Failed to decode API response as JSON: {str(e)}" if debug_mode: st.error(error_msg) st.write("Raw response:", response.text[:500] + "..." if len(response.text) > 500 else response.text) return {"error": error_msg} else: error_msg = f"API request failed with status {response.status_code}" if hasattr(response, 'text'): error_msg += f": {response.text}" if debug_mode: st.error(error_msg) return {"error": error_msg, "status_code": response.status_code} except requests.exceptions.Timeout: error_msg = f"API request timed out for endpoint {endpoint}" if debug_mode: st.error(error_msg) return {"error": error_msg, "timeout": True} except requests.exceptions.RequestException as e: error_msg = f"API request error: {str(e)}" if debug_mode: st.error(error_msg) return {"error": error_msg} def get_etf_list(force_refresh: bool = False) -> pd.DataFrame: """Get list of all ETFs from FMP API.""" debug_mode = st.session_state.get("debug_mode", False) etf_list = fmp_request("etf/list", force_refresh=force_refresh, debug_mode=debug_mode) if isinstance(etf_list, dict) and "error" in etf_list: if debug_mode: st.error(f"Error getting ETF list: {etf_list['error']}") return pd.DataFrame() if not etf_list: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(etf_list) # Clean up and add columns if 'name' in df.columns: df['name'] = df['name'].str.strip() return df def get_etf_profile(symbol: str, force_refresh: bool = False) -> Dict: """Get ETF profile information.""" debug_mode = st.session_state.get("debug_mode", False) profile_data = fmp_request(f"etf/profile/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(profile_data, dict) and "error" in profile_data: return {"error": profile_data["error"], "symbol": symbol} if not profile_data or len(profile_data) == 0: return {"error": f"No profile data available for {symbol}", "symbol": symbol} return profile_data[0] # Return the first item def get_etf_holdings(symbol: str, force_refresh: bool = False) -> pd.DataFrame: """Get ETF holdings information.""" debug_mode = st.session_state.get("debug_mode", False) # Try the v3 endpoint first holdings_data = fmp_request(f"etf/holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(holdings_data, dict) and "error" in holdings_data: if debug_mode: st.info(f"Primary ETF holdings endpoint failed: {holdings_data['error']}. Trying alternative...") # Try alternative endpoint (v3 etf-holdings) alt_holdings = fmp_request(f"etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # If that fails, try v4 endpoint if not alt_holdings or isinstance(alt_holdings, dict) and ("error" in alt_holdings or "Error Message" in alt_holdings): if debug_mode: st.info("Alternative v3 ETF holdings endpoint failed. Trying v4...") alt_holdings = fmp_request(f"v4/etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if alt_holdings and not (isinstance(alt_holdings, dict) and ("error" in alt_holdings or "Error Message" in alt_holdings)): # Process the alternative format if isinstance(alt_holdings, dict) and "holdings" in alt_holdings: # v4 format holdings_list = alt_holdings["holdings"] if isinstance(holdings_list, list) and len(holdings_list) > 0: df = pd.DataFrame(holdings_list) if debug_mode: st.success("✅ Got ETF holdings from v4 endpoint") return df elif isinstance(alt_holdings, list) and len(alt_holdings) > 0: # Some other format with list df = pd.DataFrame(alt_holdings) if debug_mode: st.success("✅ Got ETF holdings from alternative endpoint") return df return pd.DataFrame({"error": ["No holdings data available or not accessible with your API subscription"]}) if not holdings_data or not isinstance(holdings_data, dict) or "etfHoldings" not in holdings_data: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(holdings_data["etfHoldings"]) return df def get_etf_sector_weightings(symbol: str, force_refresh: bool = False) -> pd.DataFrame: """Get ETF sector weightings.""" debug_mode = st.session_state.get("debug_mode", False) # Try the standard endpoint first sector_data = fmp_request(f"etf/sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(sector_data, dict) and "error" in sector_data: if debug_mode: st.info(f"Primary ETF sector endpoint failed: {sector_data['error']}. Trying alternative...") # Try alternative endpoint alt_sectors = fmp_request(f"etf-sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # If that fails, try v4 endpoint if not alt_sectors or isinstance(alt_sectors, dict) and ("error" in alt_sectors or "Error Message" in alt_sectors): if debug_mode: st.info("Alternative v3 ETF sector endpoint failed. Trying v4...") alt_sectors = fmp_request(f"v4/etf-sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if alt_sectors and not (isinstance(alt_sectors, dict) and ("error" in alt_sectors or "Error Message" in alt_sectors)): # Process the alternative format if isinstance(alt_sectors, dict) and "sectorWeightings" in alt_sectors: # v4 format weightings = alt_sectors["sectorWeightings"] if isinstance(weightings, list) and len(weightings) > 0: # Convert list of dicts to a dataframe df = pd.DataFrame(weightings) if debug_mode: st.success("✅ Got ETF sectors from v4 endpoint") return df elif isinstance(alt_sectors, list) and len(alt_sectors) > 0: # Direct list format df = pd.DataFrame(alt_sectors) if debug_mode: st.success("✅ Got ETF sectors from alternative endpoint") return df return pd.DataFrame({"error": ["No sector data available or not accessible with your API subscription"]}) if not sector_data: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(sector_data) return df def get_etf_dividend_history(symbol: str, force_refresh: bool = False) -> pd.DataFrame: """Get ETF dividend history.""" debug_mode = st.session_state.get("debug_mode", False) dividend_data = fmp_request(f"historical-price-full/stock_dividend/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(dividend_data, dict) and "error" in dividend_data: return pd.DataFrame({"error": [dividend_data["error"]]}) if not dividend_data or "historical" not in dividend_data: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(dividend_data["historical"]) # Convert date to datetime if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) return df def is_valid_etf(symbol: str, etf_list_df: pd.DataFrame = None) -> bool: """Check if a symbol exists in the ETF list.""" if etf_list_df is None: # Get ETF list only if not provided etf_list_df = get_etf_list() if etf_list_df.empty: # If we can't get the ETF list, we'll try the profile request directly return True # Check if symbol exists in the ETF list return symbol.upper() in etf_list_df['symbol'].str.upper().values def get_etf_profile_alternative(symbol: str, force_refresh: bool = False) -> Dict: """Get ETF profile information using alternative endpoints. This function tries multiple approaches to gather ETF data when the primary etf/profile endpoint is not available in the user's subscription. """ debug_mode = st.session_state.get("debug_mode", False) # Create a profile dictionary to store the data we collect profile = { "symbol": symbol, "name": f"{symbol} ETF", # Default name in case we can't get it "_source": [] # Track which endpoints provided data } # Method 1: Try stock/profile endpoint (sometimes works for ETFs) stock_profile = fmp_request(f"profile/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if isinstance(stock_profile, list) and len(stock_profile) > 0: if debug_mode: st.write("✅ Got ETF data from stock/profile endpoint") profile["_source"].append("profile/{symbol}") # Extract data from stock profile stock_data = stock_profile[0] profile.update({ "name": stock_data.get("companyName", f"{symbol} ETF"), "exchange": stock_data.get("exchange", ""), "currency": stock_data.get("currency", "USD"), "price": stock_data.get("price", 0), "changes": stock_data.get("changes", 0), "changesPercentage": stock_data.get("changesPercentage", 0), "description": stock_data.get("description", "No description available."), "industry": stock_data.get("industry", "ETF"), "website": stock_data.get("website", ""), "ceo": stock_data.get("ceo", ""), "sector": stock_data.get("sector", ""), "ipoDate": stock_data.get("ipoDate", ""), "image": stock_data.get("image", ""), "isEtf": True }) # Method 2: Try quote endpoint for price data quote_data = fmp_request(f"quote/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if isinstance(quote_data, list) and len(quote_data) > 0: if debug_mode: st.write("✅ Got ETF price data from quote endpoint") profile["_source"].append("quote/{symbol}") # Extract price data from quote quote = quote_data[0] profile.update({ "name": quote.get("name", profile.get("name", f"{symbol} ETF")), "price": quote.get("price", profile.get("price", 0)), "changes": quote.get("change", profile.get("changes", 0)), "changesPercentage": quote.get("changesPercentage", profile.get("changesPercentage", 0)), "dayLow": quote.get("dayLow", 0), "dayHigh": quote.get("dayHigh", 0), "yearLow": quote.get("yearLow", 0), "yearHigh": quote.get("yearHigh", 0), "marketCap": quote.get("marketCap", 0), "priceAvg50": quote.get("priceAvg50", 0), "priceAvg200": quote.get("priceAvg200", 0), "volume": quote.get("volume", 0), "avgVolume": quote.get("avgVolume", 0), "exchange": quote.get("exchange", profile.get("exchange", "")), "isEtf": True }) # Method 3: Try ETF holdings endpoint for additional ETF-specific data try: # First try the v3 endpoint holdings_data = fmp_request(f"etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # If that fails, try the v4 endpoint if not holdings_data or isinstance(holdings_data, dict) and "Error Message" in holdings_data: holdings_data = fmp_request(f"v4/etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if holdings_data and not (isinstance(holdings_data, dict) and "Error Message" in holdings_data): if debug_mode: st.write("✅ Got ETF holdings data from etf-holdings endpoint") profile["_source"].append("etf-holdings/{symbol}") # Extract any ETF-specific data from holdings response if isinstance(holdings_data, dict) and "etfName" in holdings_data: profile.update({ "name": holdings_data.get("etfName", profile.get("name", f"{symbol} ETF")), "assetClass": holdings_data.get("assetClass", ""), "aum": holdings_data.get("aum", 0), "expense": holdings_data.get("expense", 0) }) except Exception as e: if debug_mode: st.error(f"Error fetching holdings data: {str(e)}") # Method 4: Try ETF stats endpoint for additional data (enterprise subscription only) try: stats_data = fmp_request(f"etf-statistics/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) if stats_data and not (isinstance(stats_data, dict) and "Error Message" in stats_data): if debug_mode: st.write("✅ Got ETF stats data from etf-statistics endpoint") profile["_source"].append("etf-statistics/{symbol}") # Extract useful stats if available if isinstance(stats_data, list) and len(stats_data) > 0: stats = stats_data[0] # Extract stats that might be useful if "totalAssets" in stats and not profile.get("aum"): profile["aum"] = stats.get("totalAssets", 0) if "expenseRatio" in stats and not profile.get("expense"): profile["expense"] = stats.get("expenseRatio", 0) / 100 # Convert from percentage if "sharesOutstanding" in stats: profile["sharesOutstanding"] = stats.get("sharesOutstanding", 0) except Exception as e: if debug_mode: st.error(f"Error fetching stats data: {str(e)}") # Combine endpoint sources for debugging profile["_source"] = ", ".join(profile["_source"]) if profile["_source"] else "No valid endpoints" # If we've collected enough data, consider it a valid profile if "price" in profile and "name" in profile: return profile # If we couldn't get enough data, return an error return {"error": f"No profile data available for {symbol}", "symbol": symbol} def get_historical_prices(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame: """Get historical price data for an ETF. Args: symbol: ETF ticker symbol period: Time period ('1month', '3month', '6month', '1year', '5year', 'max') force_refresh: Whether to force refresh data from API Returns: DataFrame with historical price data """ debug_mode = st.session_state.get("debug_mode", False) # Map period to days for cache key period_days = { '1month': 30, '3month': 90, '6month': 180, '1year': 365, '5year': 1825, 'max': 3650 # Use a large number for max } # Generate cache key based on symbol and period endpoint = f"historical-price-full/{symbol}" params = {"timeseries": period_days.get(period, 365)} # Make API request price_data = fmp_request(endpoint, params=params, force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(price_data, dict) and "error" in price_data: return pd.DataFrame() if not price_data or "historical" not in price_data: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(price_data["historical"]) # Convert date to datetime if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date") return df def calculate_performance_metrics(price_df: pd.DataFrame) -> Dict: """Calculate key performance metrics from historical price data.""" if price_df.empty or "date" not in price_df.columns or "close" not in price_df.columns: return {} try: # Sort by date to ensure calculations are correct price_df = price_df.sort_values("date") # Calculate returns price_df["daily_return"] = price_df["close"].pct_change() # Current price current_price = price_df["close"].iloc[-1] # Calculate time-based returns returns = {} # Filter for different time periods last_date = price_df["date"].max() one_month_ago = last_date - pd.Timedelta(days=30) three_months_ago = last_date - pd.Timedelta(days=90) six_months_ago = last_date - pd.Timedelta(days=180) one_year_ago = last_date - pd.Timedelta(days=365) # Calculate returns for each period if len(price_df) > 1: # 1-month return month_df = price_df[price_df["date"] >= one_month_ago] if not month_df.empty and len(month_df) > 1: returns["1-month"] = (month_df["close"].iloc[-1] / month_df["close"].iloc[0] - 1) * 100 # 3-month return three_month_df = price_df[price_df["date"] >= three_months_ago] if not three_month_df.empty and len(three_month_df) > 1: returns["3-month"] = (three_month_df["close"].iloc[-1] / three_month_df["close"].iloc[0] - 1) * 100 # 6-month return six_month_df = price_df[price_df["date"] >= six_months_ago] if not six_month_df.empty and len(six_month_df) > 1: returns["6-month"] = (six_month_df["close"].iloc[-1] / six_month_df["close"].iloc[0] - 1) * 100 # 1-year return year_df = price_df[price_df["date"] >= one_year_ago] if not year_df.empty and len(year_df) > 1: returns["1-year"] = (year_df["close"].iloc[-1] / year_df["close"].iloc[0] - 1) * 100 # YTD return ytd_start = pd.Timestamp(last_date.year, 1, 1) ytd_df = price_df[price_df["date"] >= ytd_start] if not ytd_df.empty and len(ytd_df) > 1: returns["YTD"] = (ytd_df["close"].iloc[-1] / ytd_df["close"].iloc[0] - 1) * 100 # Calculate volatility (standard deviation of returns) if len(price_df) > 30: # Need enough data for meaningful volatility volatility = price_df["daily_return"].std() * (252 ** 0.5) * 100 # Annualized volatility else: volatility = None # Calculate max drawdown if len(price_df) > 2: price_df["cummax"] = price_df["close"].cummax() price_df["drawdown"] = (price_df["close"] / price_df["cummax"] - 1) * 100 max_drawdown = price_df["drawdown"].min() else: max_drawdown = None # Return all metrics metrics = { "current_price": current_price, "returns": returns, "volatility": volatility, "max_drawdown": max_drawdown } return metrics except Exception as e: st.error(f"Error calculating performance metrics: {str(e)}") return {} def get_nav_data(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame: """Get historical NAV (Net Asset Value) data for an ETF. Args: symbol: ETF ticker symbol period: Time period ('1month', '3month', '6month', '1year', '5year', 'max') force_refresh: Whether to force refresh data from API Returns: DataFrame with historical NAV data """ debug_mode = st.session_state.get("debug_mode", False) # Map period to days for cache key period_days = { '1month': 30, '3month': 90, '6month': 180, '1year': 365, '5year': 1825, 'max': 3650 # Use a large number for max } # Generate endpoint for NAV data endpoint = f"historical-nav/{symbol}" params = {"timeseries": period_days.get(period, 365)} # Make API request nav_data = fmp_request(endpoint, params=params, force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(nav_data, dict) and "error" in nav_data: if debug_mode: st.warning(f"NAV data not available via FMP API for {symbol}. Trying yfinance as fallback.") # Try to get NAV data using yfinance as fallback return get_nav_data_from_yfinance(symbol, period, debug_mode) if not nav_data or "historical" not in nav_data: if debug_mode: st.warning(f"NAV data not found in FMP response for {symbol}. Trying yfinance as fallback.") return get_nav_data_from_yfinance(symbol, period, debug_mode) # Convert to DataFrame df = pd.DataFrame(nav_data["historical"]) # Convert date to datetime if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date") return df def get_nav_data_from_yfinance(symbol: str, period: str = '1year', debug_mode: bool = False) -> pd.DataFrame: """Get NAV data for an ETF using yfinance as fallback. Args: symbol: ETF ticker symbol period: Time period ('1month', '3month', '6month', '1year', '5year', 'max') debug_mode: Whether to show debug information Returns: DataFrame with NAV data approximation """ try: # Map period to yfinance format yf_period_map = { '1month': '1mo', '3month': '3mo', '6month': '6mo', '1year': '1y', '5year': '5y', 'max': 'max' } yf_period = yf_period_map.get(period, '1y') if debug_mode: st.write(f"Fetching data for {symbol} using yfinance with period {yf_period}") # Create Ticker object ticker = yf.Ticker(symbol) # First try to get fund info which might contain NAV info = ticker.info # Get historical price data (we'll use this as a backup) hist_data = ticker.history(period=yf_period) if hist_data.empty: if debug_mode: st.warning(f"No historical data found in yfinance for {symbol}") return pd.DataFrame() # Prepare the DataFrame and ensure timezone consistency nav_df = hist_data.reset_index()[['Date', 'Close']].copy() nav_df.columns = ['date', 'nav'] # Convert date to naive datetime (remove timezone info) for consistency with FMP data nav_df['date'] = pd.to_datetime(nav_df['date']).dt.tz_localize(None) # Check if we have intraday NAV data available (indicated by "-IV" suffix) try: # Some ETFs have intraday NAV with IV suffix iv_symbol = f"{symbol}-IV" iv_ticker = yf.Ticker(iv_symbol) iv_data = iv_ticker.history(period=yf_period) if not iv_data.empty: if debug_mode: st.success(f"Found Intraday NAV data for {symbol} using {iv_symbol}") iv_df = iv_data.reset_index()[['Date', 'Close']].copy() iv_df.columns = ['date', 'nav'] # Remove timezone info for consistency iv_df['date'] = pd.to_datetime(iv_df['date']).dt.tz_localize(None) return iv_df except Exception as e: if debug_mode: st.warning(f"Error fetching IV data: {str(e)}") # If specific NAV data isn't available, use price as a proxy with a note if debug_mode: st.info(f"Using price data as proxy for NAV for {symbol}. Note that actual NAV may differ slightly.") return nav_df except Exception as e: if debug_mode: st.error(f"Error getting NAV data from yfinance: {str(e)}") return pd.DataFrame() def get_dividend_yield_history(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame: """Get historical dividend yield data for an ETF by combining price and dividend history. Args: symbol: ETF ticker symbol period: Time period ('1month', '3month', '6month', '1year', '5year', 'max') force_refresh: Whether to force refresh data from API Returns: DataFrame with historical yield data """ debug_mode = st.session_state.get("debug_mode", False) # Get dividend history dividend_data = get_etf_dividend_history(symbol, force_refresh=force_refresh) # Get price history price_data = get_historical_prices(symbol, period=period, force_refresh=force_refresh) if dividend_data.empty or price_data.empty: return pd.DataFrame() try: # Make sure dates are in datetime format dividend_data["date"] = pd.to_datetime(dividend_data["date"]) price_data["date"] = pd.to_datetime(price_data["date"]) # Sort both dataframes by date dividend_data = dividend_data.sort_values("date") price_data = price_data.sort_values("date") # Filter dividend data to match our period start_date = price_data["date"].min() dividend_data = dividend_data[dividend_data["date"] >= start_date] if dividend_data.empty: return pd.DataFrame() # Calculate TTM (trailing twelve month) dividend at each point in time result_df = pd.DataFrame() # For each price data point for date, row in price_data.iterrows(): price_date = row["date"] price = row["close"] # Get dividends in the previous 12 months one_year_before = price_date - pd.Timedelta(days=365) ttm_dividends = dividend_data[(dividend_data["date"] > one_year_before) & (dividend_data["date"] <= price_date)] ttm_dividend_sum = ttm_dividends["dividend"].sum() # Calculate yield if price > 0: dividend_yield = (ttm_dividend_sum / price) * 100 else: dividend_yield = 0 # Add to result dataframe result_df = pd.concat([result_df, pd.DataFrame({ "date": [price_date], "price": [price], "ttm_dividend": [ttm_dividend_sum], "dividend_yield": [dividend_yield] })]) return result_df except Exception as e: if debug_mode: st.error(f"Error calculating dividend yield history: {str(e)}") return pd.DataFrame() def calculate_nav_premium_discount(price_df: pd.DataFrame, nav_df: pd.DataFrame) -> pd.DataFrame: """Calculate premium/discount of price to NAV. Args: price_df: DataFrame with price history nav_df: DataFrame with NAV history Returns: DataFrame with premium/discount data """ if price_df.empty or nav_df.empty: return pd.DataFrame() try: # Create copies to avoid modifying original dataframes price_df_copy = price_df.copy() nav_df_copy = nav_df.copy() # Convert dates to datetime and remove timezone information price_df_copy["date"] = pd.to_datetime(price_df_copy["date"]).dt.tz_localize(None) nav_df_copy["date"] = pd.to_datetime(nav_df_copy["date"]).dt.tz_localize(None) # Merge price and NAV data on date merged_df = pd.merge( price_df_copy[["date", "close"]], nav_df_copy[["date", "nav"]], on="date", how="inner" ) if merged_df.empty: return pd.DataFrame() # Calculate premium/discount as percentage merged_df["premium_discount"] = ((merged_df["close"] / merged_df["nav"]) - 1) * 100 return merged_df except Exception as e: st.error(f"Error calculating NAV premium/discount: {str(e)}") st.info("Debug info: This error often occurs due to timezone differences in date formats. The application will try to handle this automatically.") # Alternative approach using concat if merge fails try: # Create a unique identifier for each date (as string) price_df_copy = price_df.copy() nav_df_copy = nav_df.copy() # Convert to string format (YYYY-MM-DD) to eliminate timezone issues price_df_copy["date_str"] = pd.to_datetime(price_df_copy["date"]).dt.strftime("%Y-%m-%d") nav_df_copy["date_str"] = pd.to_datetime(nav_df_copy["date"]).dt.strftime("%Y-%m-%d") # Prepare dataframes with consistent column names price_data = price_df_copy[["date_str", "close"]].rename(columns={"date_str": "date"}) nav_data = nav_df_copy[["date_str", "nav"]].rename(columns={"date_str": "date"}) # Use concat and groupby as an alternative to merge combined = pd.concat([price_data, nav_data]) result = combined.groupby("date").agg({"close": "first", "nav": "first"}).reset_index() # Filter to keep only rows with both price and NAV data result = result.dropna() if not result.empty: # Calculate premium/discount result["premium_discount"] = ((result["close"] / result["nav"]) - 1) * 100 # Convert date back to datetime for consistency result["date"] = pd.to_datetime(result["date"]) return result except Exception as fallback_error: st.error(f"Alternative approach also failed: {str(fallback_error)}") return pd.DataFrame() def calculate_yield_erosion(yield_df: pd.DataFrame) -> Dict: """Calculate yield erosion metrics from historical yield data. Args: yield_df: DataFrame with historical yield data Returns: Dictionary with yield erosion metrics """ if yield_df.empty: return {} try: # Make sure the DataFrame is sorted by date yield_df = yield_df.sort_values("date") # Calculate metrics current_yield = yield_df["dividend_yield"].iloc[-1] # Calculate average yields for different periods last_date = yield_df["date"].max() # Define time periods periods = { "1_month": 30, "3_month": 90, "6_month": 180, "1_year": 365 } # Calculate average yield for each period avg_yields = {} for period_name, days in periods.items(): period_start = last_date - pd.Timedelta(days=days) period_data = yield_df[yield_df["date"] >= period_start] if not period_data.empty: avg_yields[period_name] = period_data["dividend_yield"].mean() # Calculate yield erosion (current vs averages) yield_erosion = {} for period_name, avg_yield in avg_yields.items(): if avg_yield > 0: # Avoid division by zero erosion = ((current_yield / avg_yield) - 1) * 100 yield_erosion[period_name] = erosion # Calculate yield volatility yield_volatility = yield_df["dividend_yield"].std() return { "current_yield": current_yield, "avg_yields": avg_yields, "yield_erosion": yield_erosion, "yield_volatility": yield_volatility } except Exception as e: st.error(f"Error calculating yield erosion: {str(e)}") return {} def get_institutional_ownership(symbol: str, force_refresh: bool = False) -> pd.DataFrame: """Get institutional ownership data for an ETF. Args: symbol: ETF ticker symbol force_refresh: Whether to force refresh data from API Returns: DataFrame with institutional ownership data """ debug_mode = st.session_state.get("debug_mode", False) # Generate endpoint for institutional holders endpoint = f"institutional-holder/{symbol}" # Make API request holders_data = fmp_request(endpoint, force_refresh=force_refresh, debug_mode=debug_mode) # Check for error in response if isinstance(holders_data, dict) and "error" in holders_data: if debug_mode: st.warning(f"Institutional ownership data not available for {symbol}") return pd.DataFrame() if not holders_data or not isinstance(holders_data, list): return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(holders_data) # Add percentage column if not present if "percentage" not in df.columns and "sharesHeld" in df.columns and "sharesOutstanding" in df.columns: # Calculate percentage based on shares held and shares outstanding df["percentage"] = (df["sharesHeld"] / df["sharesOutstanding"]) * 100 return df def get_dividend_calendar(symbol: str, force_refresh: bool = False) -> pd.DataFrame: """Get dividend distribution calendar for an ETF. Args: symbol: ETF ticker symbol force_refresh: Whether to force refresh data from API Returns: DataFrame with dividend calendar data """ debug_mode = st.session_state.get("debug_mode", False) # First get dividend history dividend_history = get_etf_dividend_history(symbol, force_refresh=force_refresh) if dividend_history.empty: return pd.DataFrame() try: # Convert date to datetime if not already if "date" in dividend_history.columns: dividend_history["date"] = pd.to_datetime(dividend_history["date"]) # Extract month and day from dates dividend_history["month"] = dividend_history["date"].dt.month dividend_history["day"] = dividend_history["date"].dt.day dividend_history["year"] = dividend_history["date"].dt.year # Create month name for display dividend_history["month_name"] = dividend_history["date"].dt.strftime("%B") # Calculate frequency metrics month_counts = dividend_history["month"].value_counts() most_common_months = month_counts.index.tolist() # Determine distribution pattern if len(dividend_history) >= 4: # Look at intervals between payments dividend_history = dividend_history.sort_values("date") dividend_history["days_since_last"] = dividend_history["date"].diff().dt.days # Calculate average interval avg_interval = dividend_history["days_since_last"].mean() if avg_interval is not None and not pd.isna(avg_interval): if 25 <= avg_interval <= 35: pattern = "Monthly" elif 85 <= avg_interval <= 95: pattern = "Quarterly" elif 175 <= avg_interval <= 185: pattern = "Semi-Annual" elif 350 <= avg_interval <= 380: pattern = "Annual" else: pattern = "Irregular" else: pattern = "Insufficient data" else: pattern = "Insufficient data" # Add pattern to the dataframe dividend_history["distribution_pattern"] = pattern return dividend_history except Exception as e: if debug_mode: st.error(f"Error processing dividend calendar data: {str(e)}") return pd.DataFrame() def calculate_risk_adjusted_metrics(price_df: pd.DataFrame, risk_free_rate: float = 0.05) -> Dict: """Calculate risk-adjusted performance metrics. Args: price_df: DataFrame with price history risk_free_rate: Annualized risk-free rate (default: 5%) Returns: Dictionary with risk-adjusted metrics """ if price_df.empty or "date" not in price_df.columns or "close" not in price_df.columns: return {} try: # Sort by date to ensure calculations are correct price_df = price_df.sort_values("date") # Calculate daily returns price_df["daily_return"] = price_df["close"].pct_change() # Remove NaN values returns = price_df["daily_return"].dropna() if len(returns) < 30: # Need sufficient data return {} # Calculate annualized return total_days = (price_df["date"].max() - price_df["date"].min()).days if total_days <= 0: total_days = len(returns) # Fallback if dates are incorrect # Annualization factor annual_factor = 252 / total_days * len(returns) # Calculate metrics mean_daily_return = returns.mean() std_daily_return = returns.std() # Annualize returns and volatility annualized_return = (1 + mean_daily_return) ** 252 - 1 annualized_volatility = std_daily_return * (252 ** 0.5) # Convert annual risk-free rate to daily daily_rf = (1 + risk_free_rate) ** (1/252) - 1 # Calculate Sharpe Ratio if annualized_volatility != 0: sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility else: sharpe_ratio = None # Calculate Sortino Ratio (only considers downside volatility) downside_returns = returns[returns < 0] if len(downside_returns) > 0: downside_volatility = downside_returns.std() * (252 ** 0.5) if downside_volatility != 0: sortino_ratio = (annualized_return - risk_free_rate) / downside_volatility else: sortino_ratio = None else: sortino_ratio = None # Calculate maximum drawdown price_df["cummax"] = price_df["close"].cummax() price_df["drawdown"] = (price_df["close"] / price_df["cummax"] - 1) max_drawdown = price_df["drawdown"].min() # Calculate Calmar Ratio (return / max drawdown) if max_drawdown != 0: calmar_ratio = annualized_return / abs(max_drawdown) else: calmar_ratio = None # Return all metrics return { "sharpe_ratio": sharpe_ratio, "sortino_ratio": sortino_ratio, "calmar_ratio": calmar_ratio, "annualized_return": annualized_return * 100, # Convert to percentage "annualized_volatility": annualized_volatility * 100, # Convert to percentage "max_drawdown": max_drawdown * 100 # Convert to percentage } except Exception as e: st.error(f"Error calculating risk-adjusted metrics: {str(e)}") return {} def display_whale_analysis(symbol: str): """Display institutional ownership (whale investors) analysis.""" st.subheader(f"Major Institutional Holders (Whale Analysis)") with st.spinner("Loading institutional ownership data..."): holders_df = get_institutional_ownership( symbol, force_refresh=st.session_state.get("force_refresh", False) ) if holders_df.empty: st.warning("Institutional ownership data not available for this ETF.") st.info("⚠️ PREMIUM API FEATURE: Institutional ownership data (major holders, ownership percentages, etc.) requires the Enterprise tier of the FMP API subscription.") st.info("This premium data provides valuable insights into which institutions hold significant positions in this ETF and how concentrated the ownership is.") return # Calculate total percentage owned by institutions if "percentage" in holders_df.columns: total_institutional = holders_df["percentage"].sum() st.metric("Total Institutional Ownership", f"{total_institutional:.2f}%") # Check for concentration if len(holders_df) > 0 and "percentage" in holders_df.columns: # Sort by percentage owned holders_df = holders_df.sort_values("percentage", ascending=False) # Get top 5 holders top_holders = holders_df.head(5) # Calculate concentration metrics top_5_pct = top_holders["percentage"].sum() # Display concentration metrics col1, col2 = st.columns(2) with col1: st.metric("Top 5 Holders Concentration", f"{top_5_pct:.2f}%") with col2: st.metric("Number of Institutional Holders", f"{len(holders_df)}") # Show top holders st.subheader("Top Institutional Holders") # Format the display dataframe display_df = top_holders.copy() # Rename columns for better display column_mapping = { "holder": "Holder", "shares": "Shares", "sharesHeld": "Shares Held", "dateReported": "Date Reported", "percentage": "Percentage" } display_df = display_df.rename(columns={k: v for k, v in column_mapping.items() if k in display_df.columns}) # Format percentage column if "Percentage" in display_df.columns: display_df["Percentage"] = display_df["Percentage"].apply(lambda x: f"{x:.2f}%") # Display the table st.dataframe(display_df, use_container_width=True) # Create visualization if "percentage" in holders_df.columns: st.subheader("Ownership Distribution") # Calculate others category if len(holders_df) > 5: others_pct = holders_df.iloc[5:]["percentage"].sum() pie_data = top_holders.copy() # Use pd.concat instead of append (which is deprecated) others_df = pd.DataFrame([{"holder": "Others", "percentage": others_pct}]) pie_data = pd.concat([pie_data, others_df], ignore_index=True) else: pie_data = top_holders.copy() # Create pie chart fig = px.pie( pie_data, names="holder", values="percentage", title=f"Institutional Ownership Distribution for {symbol}" ) st.plotly_chart(fig, use_container_width=True) def display_dividend_calendar(symbol: str): """Display dividend distribution calendar.""" st.subheader(f"Dividend Distribution Calendar") with st.spinner("Loading dividend data..."): dividend_df = get_dividend_calendar( symbol, force_refresh=st.session_state.get("force_refresh", False) ) if dividend_df.empty: st.warning("Dividend data not available for this ETF.") st.info("This ETF may not pay dividends, or dividend history may be limited.") return # Check if we have distribution pattern if "distribution_pattern" in dividend_df.columns: pattern = dividend_df["distribution_pattern"].iloc[0] st.metric("Distribution Pattern", pattern) # Get last few years of data current_year = datetime.now().year recent_years = sorted(list(set(dividend_df["year"])))[-3:] # Last 3 years # Filter to recent years recent_df = dividend_df[dividend_df["year"].isin(recent_years)] if not recent_df.empty: # Calculate average dividend by month monthly_avg = recent_df.groupby("month")["dividend"].mean().reset_index() monthly_avg["month_name"] = monthly_avg["month"].apply(lambda x: datetime(2000, x, 1).strftime("%B")) # Sort by month monthly_avg = monthly_avg.sort_values("month") # Create bar chart of monthly distributions st.subheader("Dividend Distribution by Month") fig = px.bar( monthly_avg, x="month_name", y="dividend", title=f"Average Dividend Distribution by Month ({', '.join(map(str, recent_years))})", labels={"month_name": "Month", "dividend": "Dividend Amount ($)"} ) st.plotly_chart(fig, use_container_width=True) # Create calendar heatmap st.subheader("Dividend Distribution Calendar") # Prepare data for the calendar view calendar_df = recent_df.copy() # Add month-year field for grouping calendar_df["month_year"] = calendar_df["date"].dt.strftime("%b %Y") # Create year and month columns for the heatmap calendar_data = calendar_df.groupby(["year", "month"]).agg({ "dividend": "sum" }).reset_index() try: # Create a complete month-year grid with all possible combinations all_months = list(range(1, 13)) all_years = sorted(calendar_data["year"].unique()) # Ensure we have a complete grid by reindexing try: # First create the pivot table pivot_data = calendar_data.pivot_table( index="month", columns="year", values="dividend", fill_value=0 # Fill missing values with 0 ) # Reindex to ensure all 12 months are included pivot_data = pivot_data.reindex(all_months, fill_value=0) except Exception as pivot_error: st.warning(f"Error creating dividend calendar pivot: {str(pivot_error)}") # Create an empty DataFrame with the correct structure as a fallback pivot_data = pd.DataFrame(0, index=all_months, columns=all_years) # Get month labels month_labels = [datetime(2000, i, 1).strftime("%b") for i in range(1, 13)] # Create heatmap fig = px.imshow( pivot_data, labels=dict(x="Year", y="Month", color="Dividend Amount"), x=pivot_data.columns.tolist(), # Use actual columns from pivot y=month_labels, aspect="auto", title="Dividend Distribution Calendar" ) st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Unable to generate dividend calendar heatmap: {str(e)}") st.info("This could be due to limited dividend data. Try selecting an ETF with more dividend history.") # Show dividend history table st.subheader("Dividend History") # Format the display dataframe display_df = dividend_df.sort_values("date", ascending=False)[["date", "dividend"]].copy() display_df["date"] = display_df["date"].dt.strftime("%Y-%m-%d") display_df.columns = ["Date", "Dividend Amount ($)"] st.dataframe(display_df, use_container_width=True) def display_risk_adjusted_metrics(symbol: str, period_value: str, selected_period: str): """Display risk-adjusted performance metrics.""" st.subheader(f"Risk-Adjusted Performance") with st.spinner("Calculating risk metrics..."): # Get historical prices prices = get_historical_prices( symbol, period=period_value, force_refresh=st.session_state.get("force_refresh", False) ) if prices.empty: st.warning("Insufficient price data to calculate risk metrics.") return # Get risk-free rate (could be fetched from an API in a real app) risk_free_rate = 0.05 # Default to 5% # Calculate metrics metrics = calculate_risk_adjusted_metrics(prices, risk_free_rate) if not metrics: st.warning("Unable to calculate risk metrics with available data.") return # Display metrics in columns col1, col2, col3 = st.columns(3) with col1: st.metric( "Sharpe Ratio", f"{metrics.get('sharpe_ratio', 0):.2f}", help="Return per unit of risk (higher is better). Values above 1.0 are good." ) with col2: st.metric( "Sortino Ratio", f"{metrics.get('sortino_ratio', 0):.2f}", help="Return per unit of downside risk (higher is better)" ) with col3: st.metric( "Calmar Ratio", f"{metrics.get('calmar_ratio', 0):.2f}", help="Return relative to maximum drawdown (higher is better)" ) # Create chart comparing metrics metrics_df = pd.DataFrame({ "Metric": ["Annualized Return", "Annualized Volatility", "Maximum Drawdown"], "Value": [ metrics.get("annualized_return", 0), metrics.get("annualized_volatility", 0), abs(metrics.get("max_drawdown", 0)) ] }) st.subheader("Risk-Return Profile") fig = px.bar( metrics_df, x="Metric", y="Value", title=f"Risk-Return Profile ({selected_period})", labels={"Value": "Percentage (%)"} ) st.plotly_chart(fig, use_container_width=True) # Add explanation st.caption(""" **Interpreting Risk Metrics:** - **Sharpe Ratio**: Measures excess return per unit of risk. Higher values are better. - **Sortino Ratio**: Like Sharpe ratio, but only considers downside risk. Higher values are better. - **Calmar Ratio**: Measures return relative to maximum drawdown. Higher values indicate better risk-adjusted performance. """) def display_etf_analysis(symbol: str): """Display comprehensive ETF analysis.""" st.header(f"📊 ETF Analysis: {symbol}") # First check if this is a valid ETF etf_df = None if "etf_list_df" in st.session_state: etf_df = st.session_state.etf_list_df if not is_valid_etf(symbol, etf_df): st.error(f"{symbol} does not appear to be a valid ETF ticker in our database. Please select a different ticker.") st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search")) return # Get ETF profile with st.spinner("Loading ETF data..."): # First try the standard endpoint debug_mode = st.session_state.get("debug_mode", False) profile = get_etf_profile(symbol, force_refresh=st.session_state.get("force_refresh", False)) # If standard endpoint fails, try alternative approach if isinstance(profile, dict) and "error" in profile: if debug_mode: st.info("Primary ETF profile endpoint failed. Trying alternative methods...") profile = get_etf_profile_alternative(symbol, force_refresh=st.session_state.get("force_refresh", False)) # Check for error in profile if isinstance(profile, dict) and "error" in profile: st.error(f"Failed to load profile data for {symbol}: {profile['error']}") st.info("This could be due to an invalid API key, the ETF not being available in the FMP database, or a temporary API issue.") st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search")) return if not profile: st.error(f"Failed to load profile data for {symbol}. Please check your API key.") st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search")) return # Display profile information col1, col2 = st.columns([3, 1]) with col1: st.subheader(profile.get("name", "")) st.write(profile.get("description", "No description available.")) with col2: # Create metrics card st.metric("Price", f"${profile.get('price', 0):.2f}") if "aum" in profile: st.metric("AUM", f"${profile.get('aum', 0) / 1e9:.2f}B") if "expense" in profile: st.metric("Expense Ratio", f"{profile.get('expense', 0) * 100:.2f}%") # Create tabs for different types of analysis tabs = st.tabs([ "Overview", "Holdings", "Sector Allocation", "Dividend History", "Dividend Sustainability", "Institutional Ownership", "ESG Scores" ]) # Overview Tab with tabs[0]: # Add explanation in an expander with st.expander("📚 Understanding the Overview Tab", expanded=False): st.markdown(""" ### ETF Overview Explanation This tab provides essential background information about the ETF, helping you understand its basic characteristics and investment focus. **Key metrics to examine:** - **ETF Details**: Basic information like exchange, currency, AUM (Assets Under Management), and expense ratio - **Expense Ratio**: Lower is generally better; this directly impacts your returns (e.g., 0.03% vs 0.30% means 0.27% more return annually) - **AUM (Assets Under Management)**: Larger funds tend to have better liquidity and smaller bid-ask spreads - **Price Information**: Recent price movements and trading ranges **Why this matters**: Understanding the ETF's focus, size, and cost structure helps determine if it aligns with your investment goals and provides a foundation for deeper analysis. """) overview_col1, overview_col2 = st.columns(2) with overview_col1: st.subheader("ETF Details") # Prepare details dictionary with data that might be available details = { "Symbol": profile.get("symbol", ""), "Name": profile.get("name", ""), "Exchange": profile.get("exchange", ""), "Currency": profile.get("currency", "") } # Add fields if they exist if "aum" in profile and profile["aum"]: details["AUM"] = f"${profile.get('aum', 0) / 1e9:.2f}B" if profile.get("aum", 0) > 1e6 else f"${profile.get('aum', 0) / 1e6:.2f}M" if "expense" in profile and profile["expense"] is not None: details["Expense Ratio"] = f"{profile.get('expense', 0) * 100:.2f}%" if "pe" in profile and profile["pe"] is not None and profile["pe"] != 0: details["PE Ratio"] = profile.get("pe", "N/A") if "sharesOutstanding" in profile and profile["sharesOutstanding"]: details["Shares Outstanding"] = f"{profile.get('sharesOutstanding', 0) / 1e6:.2f}M" if "ipoDate" in profile and profile["ipoDate"]: details["IPO Date"] = profile.get("ipoDate", "N/A") # Add alternative fields that might be available from other endpoints if "assetClass" in profile: details["Asset Class"] = profile.get("assetClass", "") if "sector" in profile: details["Sector"] = profile.get("sector", "") if "industry" in profile: details["Industry"] = profile.get("industry", "") if "marketCap" in profile and profile["marketCap"]: details["Market Cap"] = f"${profile.get('marketCap', 0) / 1e9:.2f}B" if profile.get("marketCap", 0) > 1e9 else f"${profile.get('marketCap', 0) / 1e6:.2f}M" if "volume" in profile and profile["volume"]: details["Volume"] = f"{profile.get('volume', 0):,}" if "avgVolume" in profile and profile["avgVolume"]: details["Avg Volume"] = f"{profile.get('avgVolume', 0):,}" # Convert to DataFrame for display details_df = pd.DataFrame(list(details.items()), columns=["Metric", "Value"]) st.dataframe(details_df, use_container_width=True, hide_index=True) # Data source disclaimer st.caption("Note: Some fields may be unavailable based on your API subscription level.") with overview_col2: st.subheader("Price Information") price_data = {} # Add price data if available if "price" in profile: price_data["Price"] = f"${profile.get('price', 0):.2f}" if "changes" in profile: price_data["Change"] = f"{profile.get('changes', 0):.2f}" if "changesPercentage" in profile: price_data["Change %"] = f"{profile.get('changesPercentage', 0):.2f}%" if "dayLow" in profile and profile["dayLow"]: price_data["Day Low"] = f"${profile.get('dayLow', 0):.2f}" if "dayHigh" in profile and profile["dayHigh"]: price_data["Day High"] = f"${profile.get('dayHigh', 0):.2f}" if "yearLow" in profile and profile["yearLow"]: price_data["Year Low"] = f"${profile.get('yearLow', 0):.2f}" if "yearHigh" in profile and profile["yearHigh"]: price_data["Year High"] = f"${profile.get('yearHigh', 0):.2f}" if "priceAvg50" in profile and profile["priceAvg50"]: price_data["50-Day Avg"] = f"${profile.get('priceAvg50', 0):.2f}" if "priceAvg200" in profile and profile["priceAvg200"]: price_data["200-Day Avg"] = f"${profile.get('priceAvg200', 0):.2f}" # If no price data available, show a message if not price_data: st.info("Price information is not available with your current API subscription.") else: # Convert to DataFrame for display price_df = pd.DataFrame(list(price_data.items()), columns=["Metric", "Value"]) st.dataframe(price_df, use_container_width=True, hide_index=True) # If we have an image URL, display it if "image" in profile and profile["image"]: st.image(profile["image"], width=150) # Show source of data st.caption("Data from Financial Modeling Prep API") # Show the data source endpoints used if st.session_state.get("debug_mode", False): st.write("Data Source:") if profile.get("_source", None): st.code(profile["_source"]) else: st.code("etf/profile endpoint") # Holdings Tab with tabs[1]: # Add explanation in an expander with st.expander("📚 Understanding the Holdings Tab", expanded=False): st.markdown(""" ### ETF Holdings Explanation This tab shows you exactly what the ETF owns - the individual stocks, bonds, or other assets that make up the fund. **Key aspects to analyze:** - **Top Holdings**: The largest positions in the ETF, which have the most influence on performance - **Concentration**: If the top 10 holdings make up a large percentage (>50%), the ETF is highly concentrated - **Individual Securities**: Review the specific companies/assets to ensure they align with your investment thesis - **Weight Distribution**: How evenly the ETF spreads its investments across different securities **Why this matters**: Understanding what the ETF actually owns helps you assess its true exposure and risk profile. A technology ETF might own different types of tech companies (software, hardware, services) with varying risk profiles. """) with st.spinner("Loading holdings data..."): holdings = get_etf_holdings(symbol, force_refresh=st.session_state.get("force_refresh", False)) if not holdings.empty: # Check for error if "error" in holdings.columns: st.warning("Unable to load holdings data") st.error(holdings["error"].iloc[0]) # Ensure we have the necessary columns elif all(col in holdings.columns for col in ["asset", "weightPercentage"]): st.subheader("Top Holdings") # Sort by weight holdings = holdings.sort_values("weightPercentage", ascending=False) # Display top 10 holdings top_holdings = holdings.head(10) # Create bar chart fig = px.bar( top_holdings, x="asset", y="weightPercentage", title=f"Top 10 Holdings for {symbol}", labels={"asset": "Asset", "weightPercentage": "Weight (%)"} ) st.plotly_chart(fig, use_container_width=True) # Display full holdings table st.subheader("All Holdings") st.dataframe( holdings, use_container_width=True, height=400 ) else: st.warning("Holdings data is incomplete or in an unexpected format.") else: st.warning("No holdings data available for this ETF.") st.info("⚠️ PREMIUM API FEATURE: Detailed holdings data typically requires a paid FMP API subscription. Consider upgrading your plan to access this information.") # Sector Allocation Tab with tabs[2]: # Add explanation in an expander with st.expander("📚 Understanding the Sector Allocation Tab", expanded=False): st.markdown(""" ### Sector Allocation Explanation This tab breaks down the ETF's investments by economic sectors, showing you where the fund is most heavily invested. **What to look for:** - **Dominant Sectors**: Sectors with the largest allocations will have the greatest impact on performance - **Diversification**: How broadly the ETF spreads investments across different sectors - **Sector Bias**: Whether the ETF is overweight in certain sectors compared to the broader market - **Alignment with Economic Outlook**: Consider if the sector weightings align with your economic outlook (e.g., overweight technology during tech boom) **Why this matters**: Sector exposure is a key driver of returns and risks. During different economic cycles, sectors perform differently - technology might outperform during innovation booms, while utilities and consumer staples often do better during recessions. **Example interpretation**: An ETF with 40% technology exposure will behave very differently from one with 40% utilities exposure. """) with st.spinner("Loading sector data..."): sectors = get_etf_sector_weightings(symbol, force_refresh=st.session_state.get("force_refresh", False)) if not sectors.empty: # Check for error if "error" in sectors.columns: st.warning("Unable to load sector allocation data") st.error(sectors["error"].iloc[0]) else: st.subheader("Sector Allocation") # Create pie chart fig = px.pie( sectors, names=sectors.columns[0], values=sectors.columns[1], title=f"Sector Allocation for {symbol}" ) st.plotly_chart(fig, use_container_width=True) # Display sector table with fixed height to prevent double scrolling st.dataframe( sectors, use_container_width=True, height=300 ) else: st.warning("No sector weighting data available for this ETF.") st.info("⚠️ PREMIUM API FEATURE: Sector allocation data typically requires a paid FMP API subscription. Consider upgrading your plan to access this information.") # Dividend History Tab with tabs[3]: # Add explanation in an expander with st.expander("📚 Understanding the Dividend History Tab", expanded=False): st.markdown(""" ### Dividend History Explanation This tab shows the ETF's historical dividend payments, helping you understand its income generation capabilities. **Key metrics to analyze:** - **TTM (Trailing Twelve Month) Dividend**: Total dividends paid over the past year - **Dividend Yield**: Annual dividend as a percentage of current price - **Payment Trend**: Whether dividends are stable, growing, or declining over time - **Payment Frequency**: How often dividends are paid (monthly, quarterly, etc.) **Why this matters**: - For income investors, consistent and growing dividends are crucial - Dividend history reveals the ETF's income reliability and growth potential - Sudden drops in dividends may indicate underlying problems with the ETF's holdings **Interpreting the data**: Look for steady or increasing dividend payments over time. Declining dividends might signal financial stress in the underlying holdings. Also check if dividend amounts are consistent or vary significantly between payments. """) with st.spinner("Loading dividend history..."): dividends = get_etf_dividend_history(symbol, force_refresh=st.session_state.get("force_refresh", False)) if not dividends.empty: # Check for error if "error" in dividends.columns: st.warning("Unable to load dividend history data") st.error(dividends["error"].iloc[0]) # Ensure we have the necessary columns elif all(col in dividends.columns for col in ["date", "dividend"]): st.subheader("Dividend History") # Sort by date dividends = dividends.sort_values("date") # Calculate TTM dividend if len(dividends) > 0: current_date = datetime.now() one_year_ago = current_date - timedelta(days=365) ttm_dividends = dividends[dividends["date"] >= pd.Timestamp(one_year_ago)] ttm_dividend_sum = ttm_dividends["dividend"].sum() st.metric("TTM Dividend", f"${ttm_dividend_sum:.2f}") if "price" in profile: dividend_yield = (ttm_dividend_sum / profile["price"]) * 100 st.metric("Dividend Yield", f"{dividend_yield:.2f}%") # Create line chart fig = px.line( dividends, x="date", y="dividend", title=f"Dividend History for {symbol}", labels={"date": "Date", "dividend": "Dividend Amount ($)"} ) st.plotly_chart(fig, use_container_width=True) # Display dividend table st.dataframe( dividends.sort_values("date", ascending=False), use_container_width=True, height=400 ) else: st.warning("Dividend data is incomplete or in an unexpected format.") else: st.warning("No dividend history available for this ETF.") # Dividend Sustainability Tab with tabs[4]: # Add explanation in an expander with st.expander("📚 Understanding Dividend Sustainability", expanded=False): st.markdown(""" ### Dividend Sustainability Explanation This tab analyzes how sustainable the ETF's dividend payments are likely to be in the future. **Key metrics analyzed:** - **Sustainability Score**: Overall assessment of how likely dividends can be maintained or grown - **Payout Ratio**: Percentage of earnings paid as dividends (lower is generally more sustainable) - **Dividend Growth Rate**: How quickly dividends have increased over time - **Growth Consistency**: How reliable the dividend increases have been **How to interpret the ratings:** - **Highly Sustainable (80-100)**: Strong fundamentals supporting continued dividend growth - **Sustainable (60-80)**: Good prospects for maintaining current dividends - **Moderately Sustainable (40-60)**: May maintain dividends but growth potential is limited - **Questionable (20-40)**: Risk of dividend cuts if economic conditions worsen - **Unsustainable (<20)**: High probability of dividend reduction **Why this matters**: Investors relying on dividend income need to assess not just current yield, but the likelihood that those dividends will continue or grow in the future. High yields sometimes come with high risk of cuts. """) display_dividend_sustainability(symbol) # Institutional Ownership Tab with tabs[5]: # Add explanation in an expander with st.expander("📚 Understanding Institutional Ownership", expanded=False): st.markdown(""" ### Institutional Ownership Explanation This tab shows which large financial institutions (like pension funds, hedge funds, etc.) own shares of this ETF. **Key metrics to examine:** - **Total Institutional Ownership**: Percentage of the ETF owned by institutions (vs. retail investors) - **Top Holders**: Major institutional investors with the largest positions - **Concentration**: Whether ownership is spread widely or concentrated among a few large players - **Recent Changes**: If available, how institutional ownership has changed recently **Why this matters**: - **Higher institutional ownership** often indicates professional investor confidence - **Changes in institutional ownership** can signal shifting sentiment among professional investors - **Concentration risk**: If a few institutions own a large percentage, their selling could negatively impact price - **Liquidity considerations**: Highly institutional ETFs might have different liquidity characteristics **Typical patterns**: Broadly-diversified, established ETFs often have higher institutional ownership, while newer or more specialized ETFs may have lower institutional participation. """) display_whale_analysis(symbol) # ESG Scores Tab with tabs[6]: # Add explanation in an expander with st.expander("📚 Understanding ESG Scores", expanded=False): st.markdown(""" ### ESG (Environmental, Social, Governance) Explanation This tab evaluates the ETF's performance on environmental, social, and governance factors - increasingly important considerations for socially-conscious investing. **Three key components:** - **Environmental**: How the ETF's holdings impact the natural world (carbon emissions, resource use, pollution, etc.) - **Social**: How the ETF's holdings manage relationships with employees, suppliers, customers, and communities - **Governance**: Quality of the ETF's holdings' leadership, executive pay, audits, internal controls, and shareholder rights **Score interpretation:** - **70-100**: Excellent - Industry leaders in sustainability practices - **50-70**: Good - Above average ESG performance - **30-50**: Average - Typical ESG performance for the industry - **0-30**: Below Average to Poor - Significant ESG concerns or risks **Why this matters**: - **Risk management**: Companies with poor ESG practices often face greater regulatory, legal, and reputational risks - **Long-term perspective**: Strong ESG performance is increasingly linked to better long-term financial performance - **Values alignment**: Allows investors to align portfolios with personal values - **Future-proofing**: Companies addressing ESG concerns may be better positioned for future regulatory changes **Note**: ESG data is often aggregated from the ETF's underlying holdings and methodologies vary between providers. """) display_esg_analysis(symbol) def display_etf_search(): """Display ETF search interface.""" st.header("🔍 ETF Search") # Get ETF list with st.spinner("Loading ETF list..."): etf_df = get_etf_list(force_refresh=st.session_state.get("force_refresh", False)) # Store in session state for later validation st.session_state.etf_list_df = etf_df if etf_df.empty: st.error("Failed to load ETF list. Please check your API key.") return # Filter options col1, col2 = st.columns(2) with col1: search_query = st.text_input("Search ETFs by Name or Symbol") with col2: sort_by = st.selectbox( "Sort By", options=["Symbol", "Name"], index=0 ) # Filter the dataframe filtered_df = etf_df if search_query: filtered_df = filtered_df[ filtered_df["symbol"].str.contains(search_query, case=False) | filtered_df["name"].str.contains(search_query, case=False) ] # Sort the dataframe if sort_by == "Symbol": filtered_df = filtered_df.sort_values("symbol") else: filtered_df = filtered_df.sort_values("name") # Display results st.subheader(f"Found {len(filtered_df)} ETFs") # Create a more user-friendly display dataframe display_df = filtered_df[["symbol", "name", "exchange"]].copy() display_df.columns = ["Symbol", "Name", "Exchange"] # Display with selection selection = st.dataframe( display_df, use_container_width=True, height=400, column_config={ "Symbol": st.column_config.TextColumn("Symbol", width="small"), "Name": st.column_config.TextColumn("Name", width="large"), "Exchange": st.column_config.TextColumn("Exchange", width="medium") } ) # Allow user to select ETF for analysis selected_symbol = st.selectbox( "Select ETF for Analysis", options=[""] + filtered_df["symbol"].tolist(), format_func=lambda x: f"{x}: {filtered_df[filtered_df['symbol'] == x]['name'].iloc[0]}" if x else "Select an ETF" ) if selected_symbol: st.session_state.selected_etf = selected_symbol st.session_state.current_tab = "analysis" st.rerun() def display_comparison(): """Display ETF comparison interface with financial performance focus.""" st.header("🔄 ETF Performance Comparison") # Check if we have an API key first api_key = os.environ.get("FMP_API_KEY", st.session_state.get("fmp_api_key", "")) if not api_key: st.error("FMP API key not found. Please enter it in the sidebar.") return # Common ETFs for quick selection common_etfs = ["SPY", "VOO", "QQQ", "VTI", "IWM", "ARKK", "VIG", "SCHD"] # Try loading from cache or a fast endpoint try: with st.spinner("Verifying API connection..."): test_result = fmp_request("quote/SPY", debug_mode=st.session_state.get("debug_mode", False)) if isinstance(test_result, dict) and "error" in test_result: st.error("API connection test failed. Please check your API key.") st.info("Try using the 'Test API Connection' tool from the sidebar to diagnose issues.") return except Exception as e: st.error(f"Error verifying API connection: {str(e)}") return # Initialize comparison ETFs if not in session state if "comparison_etfs" not in st.session_state: st.session_state.comparison_etfs = [] # Time period selection time_periods = { "1 Month": "1month", "3 Months": "3month", "6 Months": "6month", "1 Year": "1year", "5 Years": "5year", "Max": "max" } # Sidebar for comparison settings st.sidebar.markdown("### Comparison Settings") selected_period = st.sidebar.selectbox( "Time Period", list(time_periods.keys()), index=3 # Default to 1 Year ) # Add analysis type selector analysis_types = [ "Performance Metrics", "NAV Premium/Discount", "Dividend Yield & Erosion", "Dividend Sustainability", "ESG Scores", "Institutional Ownership", "Dividend Calendar" ] selected_analysis = st.sidebar.radio("Analysis Type", analysis_types) period_value = time_periods[selected_period] # ETF Selection section st.subheader("Select ETFs to Compare") # Create 4 columns for quick selection of common ETFs cols = st.columns(4) for i, etf in enumerate(common_etfs): with cols[i % 4]: if st.button(etf, key=f"btn_{etf}", disabled=etf in st.session_state.comparison_etfs): st.session_state.comparison_etfs.append(etf) st.rerun() # Custom ETF input custom_col1, custom_col2 = st.columns([3, 1]) with custom_col1: custom_etf = st.text_input("Add custom ETF ticker:", "") with custom_col2: if st.button("Add", disabled=not custom_etf or len(st.session_state.comparison_etfs) >= 5): if custom_etf.upper() not in st.session_state.comparison_etfs: st.session_state.comparison_etfs.append(custom_etf.upper()) st.rerun() # Display selected ETFs and performance if st.session_state.comparison_etfs: if selected_analysis == "Performance Metrics": display_performance_comparison(period_value, selected_period) elif selected_analysis == "NAV Premium/Discount": display_nav_comparison(period_value, selected_period) elif selected_analysis == "Dividend Yield & Erosion": display_yield_erosion_comparison(period_value, selected_period) elif selected_analysis == "Dividend Sustainability": display_dividend_sustainability_comparison() elif selected_analysis == "ESG Scores": display_esg_comparison() elif selected_analysis == "Institutional Ownership": display_whale_analysis_comparison() elif selected_analysis == "Dividend Calendar": display_dividend_calendar_comparison() else: st.info("Select ETFs to compare their performance.") # Show example comparison st.subheader("Sample Comparison Chart") if selected_analysis == "Performance Metrics": st.image("https://i.imgur.com/JE2Zxsm.png", caption="Example of ETF performance comparison chart showing relative returns over time") st.write(""" The performance comparison provides critical metrics for investment decisions: - **Relative Performance**: See how ETFs perform against each other over time - **Volatility**: Measure of price fluctuation (lower is generally less risky) - **Maximum Drawdown**: Largest percentage drop from peak to trough (shows downside risk) - **Time-based Returns**: Performance over various time periods (1M, 3M, 6M, 1Y) """) elif selected_analysis == "NAV Premium/Discount": st.write(""" The NAV Premium/Discount analysis shows: - **NAV (Net Asset Value)**: The per-share value of the ETF's underlying assets - **Premium/Discount**: The percentage difference between market price and NAV - **Premium/Discount Trend**: How the relationship changes over time - **Premium/Discount Volatility**: The stability of the price-to-NAV relationship """) elif selected_analysis == "Dividend Yield & Erosion": st.write(""" The Dividend Yield & Erosion analysis shows: - **Current Yield**: The latest dividend yield based on TTM dividends - **Yield Trend**: How the yield has changed over time - **Yield Erosion**: Decline in yield compared to historical averages - **Yield Volatility**: How stable the yield has been """) elif selected_analysis == "Dividend Sustainability": st.write(""" The Dividend Sustainability analysis shows: - **Payout Ratio**: Average payout ratio of ETF holdings (<70% is sustainable) - **Dividend Growth Rate**: Annual growth rate of dividends (>5% signals quality) - **Growth Consistency**: Percentage of years with positive dividend growth - **Overall Sustainability**: Combined assessment of dividend sustainability """) elif selected_analysis == "ESG Scores": st.write(""" The ESG Score comparison shows: - **Environmental Score**: Impact on the environment and natural resources - **Social Score**: Relationships with employees, suppliers, customers, communities - **Governance Score**: Leadership, audits, internal controls, shareholder rights - **Overall ESG Score**: Combined assessment of environmental, social, and governance factors """) elif selected_analysis == "Institutional Ownership": st.write(""" The Institutional Ownership analysis shows: - **Major Holders**: Top institutional investors holding the ETF - **Ownership Concentration**: Percentage of ETF owned by top institutions - **Comparison of Whale Investors**: Compare institutional ownership patterns across ETFs - **Ownership Changes**: How institutional ownership has changed over time """) elif selected_analysis == "Dividend Calendar": st.write(""" The Dividend Calendar analysis shows: - **Distribution Schedule**: When each ETF typically pays dividends - **Distribution Pattern**: Monthly, quarterly, semi-annual, or annual payment patterns - **Payment Timing**: Compare when different ETFs make their dividend payments - **Distribution History**: Historical dividend payment records """) def display_performance_comparison(period_value, selected_period): """Display performance metrics comparison.""" st.subheader(f"ETF Performance Comparison ({selected_period})") # Fetch data and calculate metrics for all ETFs performance_data = {} price_history = {} with st.spinner("Loading performance data..."): for symbol in st.session_state.comparison_etfs: # Get historical prices prices = get_historical_prices( symbol, period=period_value, force_refresh=st.session_state.get("force_refresh", False) ) if not prices.empty: # Store price history for charts price_history[symbol] = prices # Calculate performance metrics performance_data[symbol] = calculate_performance_metrics(prices) # If we have data, display it if performance_data: # Create comparison table of returns returns_data = [] for symbol, metrics in performance_data.items(): row = {"Symbol": symbol} # Add returns for different time periods if "returns" in metrics: for period, value in metrics["returns"].items(): row[period] = f"{value:.2f}%" if value is not None else "N/A" # Add volatility and max drawdown row["Volatility"] = f"{metrics.get('volatility', 0):.2f}%" if metrics.get('volatility') is not None else "N/A" row["Max Drawdown"] = f"{metrics.get('max_drawdown', 0):.2f}%" if metrics.get('max_drawdown') is not None else "N/A" returns_data.append(row) # Create DataFrame and display if returns_data: returns_df = pd.DataFrame(returns_data) returns_df.set_index("Symbol", inplace=True) st.dataframe(returns_df, use_container_width=True) # Create price chart if price_history: st.subheader("Price Performance") # Prepare data for chart chart_data = pd.DataFrame() for symbol, prices in price_history.items(): if not prices.empty: # Normalize to percentage change from first day temp_df = prices[["date", "close"]].copy() base_price = temp_df["close"].iloc[0] temp_df["return"] = (temp_df["close"] / base_price - 1) * 100 temp_df["Symbol"] = symbol # Add to chart data chart_data = pd.concat([chart_data, temp_df]) if not chart_data.empty: # Create line chart of percentage returns fig = px.line( chart_data, x="date", y="return", color="Symbol", labels={ "date": "Date", "return": "Return (%)", "Symbol": "ETF" }, title=f"Relative Performance ({selected_period})", height=500 ) # Add reference line at 0% fig.add_hline(y=0, line_dash="dash", line_color="gray") st.plotly_chart(fig, use_container_width=True) # Allow removing ETFs from comparison st.subheader("Remove ETFs") remove_cols = st.columns(len(st.session_state.comparison_etfs)) for i, symbol in enumerate(st.session_state.comparison_etfs): with remove_cols[i]: if st.button(f"Remove {symbol}", key=f"remove_{symbol}"): st.session_state.comparison_etfs.remove(symbol) st.rerun() # Clear all button if st.button("Clear All"): st.session_state.comparison_etfs = [] st.rerun() else: st.warning("No performance data available for the selected ETFs.") def display_nav_comparison(period_value, selected_period): """Display NAV premium/discount comparison.""" st.subheader(f"ETF NAV Premium/Discount Analysis ({selected_period})") # Fetch data for all ETFs nav_data = {} price_data = {} premium_discount_data = {} with st.spinner("Loading NAV data..."): for symbol in st.session_state.comparison_etfs: # Get price history prices = get_historical_prices( symbol, period=period_value, force_refresh=st.session_state.get("force_refresh", False) ) # Get NAV history nav = get_nav_data( symbol, period=period_value, force_refresh=st.session_state.get("force_refresh", False) ) if not prices.empty: price_data[symbol] = prices # If NAV data is available, calculate premium/discount if not nav.empty: nav_data[symbol] = nav premium_discount = calculate_nav_premium_discount(prices, nav) if not premium_discount.empty: premium_discount_data[symbol] = premium_discount # If we have data, display it if premium_discount_data: # Create summary table summary_data = [] for symbol, pd_df in premium_discount_data.items(): if not pd_df.empty: # Calculate average, min, max premium/discount avg_pd = pd_df["premium_discount"].mean() min_pd = pd_df["premium_discount"].min() max_pd = pd_df["premium_discount"].max() current_pd = pd_df["premium_discount"].iloc[-1] volatility_pd = pd_df["premium_discount"].std() summary_data.append({ "Symbol": symbol, "Current P/D": f"{current_pd:.2f}%", "Avg P/D": f"{avg_pd:.2f}%", "Min P/D": f"{min_pd:.2f}%", "Max P/D": f"{max_pd:.2f}%", "P/D Volatility": f"{volatility_pd:.2f}%" }) # Create DataFrame and display if summary_data: summary_df = pd.DataFrame(summary_data) summary_df.set_index("Symbol", inplace=True) st.dataframe(summary_df, use_container_width=True) # Explanation of premium/discount st.info(""" **Premium/Discount (P/D) Interpretation:** - **Positive values**: ETF trading at a premium to NAV - **Negative values**: ETF trading at a discount to NAV - **Higher volatility**: Less consistent pricing relative to NAV """) # Create premium/discount chart if premium_discount_data: st.subheader("Premium/Discount Trend") # Prepare data for chart chart_data = pd.DataFrame() for symbol, pd_df in premium_discount_data.items(): if not pd_df.empty: temp_df = pd_df[["date", "premium_discount"]].copy() temp_df["Symbol"] = symbol # Add to chart data chart_data = pd.concat([chart_data, temp_df]) if not chart_data.empty: # Create line chart of premium/discount fig = px.line( chart_data, x="date", y="premium_discount", color="Symbol", labels={ "date": "Date", "premium_discount": "Premium/Discount (%)", "Symbol": "ETF" }, title=f"NAV Premium/Discount ({selected_period})", height=500 ) # Add reference line at 0% fig.add_hline(y=0, line_dash="dash", line_color="gray") st.plotly_chart(fig, use_container_width=True) # Show NAV vs Price charts for each ETF for symbol in premium_discount_data.keys(): if symbol in price_data and symbol in nav_data: st.subheader(f"{symbol}: NAV vs. Price") # Prepare data price_df = price_data[symbol] nav_df = nav_data[symbol] # Merge data merged_df = pd.merge( price_df[["date", "close"]], nav_df[["date", "nav"]], on="date", how="inner" ) if not merged_df.empty: # Create figure with secondary y-axis fig = go.Figure() # Add price line fig.add_trace( go.Scatter( x=merged_df["date"], y=merged_df["close"], name="Price", line=dict(color="blue") ) ) # Add NAV line fig.add_trace( go.Scatter( x=merged_df["date"], y=merged_df["nav"], name="NAV", line=dict(color="red") ) ) # Update layout fig.update_layout( title=f"{symbol}: Price vs. NAV", xaxis_title="Date", yaxis_title="Value ($)", height=400, legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ) ) st.plotly_chart(fig, use_container_width=True) else: st.warning(""" NAV data is not available for the selected ETFs. """) st.info(""" ⚠️ PREMIUM API FEATURE: NAV data is typically only available with higher-tier FMP API subscriptions. However, this application now attempts to fetch NAV data from Yahoo Finance as a fallback when FMP data is unavailable. For some ETFs, Yahoo Finance provides: - Direct NAV data through the "-IV" suffix ticker - Estimated NAV based on the ETF's price (less accurate but still useful) If you're still not seeing data, try enabling debug mode in the sidebar to see more information about the data retrieval process. """) # Allow removing ETFs from comparison if st.session_state.comparison_etfs: st.subheader("Remove ETFs") remove_cols = st.columns(len(st.session_state.comparison_etfs)) for i, symbol in enumerate(st.session_state.comparison_etfs): with remove_cols[i]: if st.button(f"Remove {symbol}", key=f"remove_nav_{symbol}"): st.session_state.comparison_etfs.remove(symbol) st.rerun() # Clear all button if st.button("Clear All NAV"): st.session_state.comparison_etfs = [] st.rerun() def display_yield_erosion_comparison(period_value, selected_period): """Display dividend yield and erosion comparison.""" st.subheader(f"ETF Dividend Yield & Erosion Analysis ({selected_period})") # Fetch data for all ETFs yield_data = {} yield_metrics = {} with st.spinner("Loading dividend data..."): for symbol in st.session_state.comparison_etfs: # Get yield history yields = get_dividend_yield_history( symbol, period=period_value, force_refresh=st.session_state.get("force_refresh", False) ) if not yields.empty: # Store yield history yield_data[symbol] = yields # Calculate yield erosion metrics yield_metrics[symbol] = calculate_yield_erosion(yields) # If we have data, display it if yield_metrics: # Create summary table summary_data = [] for symbol, metrics in yield_metrics.items(): if metrics: row = {"Symbol": symbol} # Add current yield row["Current Yield"] = f"{metrics.get('current_yield', 0):.2f}%" # Add yield erosion metrics if "yield_erosion" in metrics: for period, value in metrics["yield_erosion"].items(): period_name = period.replace("_", "-").title() row[f"Erosion ({period_name})"] = f"{value:.2f}%" # Add yield volatility if "yield_volatility" in metrics: row["Yield Volatility"] = f"{metrics.get('yield_volatility', 0):.2f}%" summary_data.append(row) # Create DataFrame and display if summary_data: summary_df = pd.DataFrame(summary_data) summary_df.set_index("Symbol", inplace=True) st.dataframe(summary_df, use_container_width=True) # Explanation of yield erosion st.info(""" **Yield Erosion Interpretation:** - **Positive values**: Yield has increased compared to historical average - **Negative values**: Yield has decreased (eroded) compared to historical average - **Higher volatility**: Less consistent yield over time """) # Create yield trend chart if yield_data: st.subheader("Dividend Yield Trend") # Prepare data for chart chart_data = pd.DataFrame() for symbol, yields_df in yield_data.items(): if not yields_df.empty: temp_df = yields_df[["date", "dividend_yield"]].copy() temp_df["Symbol"] = symbol # Add to chart data chart_data = pd.concat([chart_data, temp_df]) if not chart_data.empty: # Create line chart of dividend yields fig = px.line( chart_data, x="date", y="dividend_yield", color="Symbol", labels={ "date": "Date", "dividend_yield": "Dividend Yield (%)", "Symbol": "ETF" }, title=f"Dividend Yield Trend ({selected_period})", height=500 ) st.plotly_chart(fig, use_container_width=True) # Show individual yield charts for each ETF for symbol, yields_df in yield_data.items(): if not yields_df.empty: st.subheader(f"{symbol}: Dividend Yield Components") # Create figure with secondary y-axis fig = make_subplots(specs=[[{"secondary_y": True}]]) # Add price line fig.add_trace( go.Scatter( x=yields_df["date"], y=yields_df["price"], name="Price", line=dict(color="blue") ), secondary_y=False ) # Add TTM dividend line fig.add_trace( go.Scatter( x=yields_df["date"], y=yields_df["ttm_dividend"], name="TTM Dividend", line=dict(color="green") ), secondary_y=True ) # Add yield line fig.add_trace( go.Scatter( x=yields_df["date"], y=yields_df["dividend_yield"], name="Yield (%)", line=dict(color="red", dash="dash") ), secondary_y=True ) # Update layout fig.update_layout( title=f"{symbol}: Price, TTM Dividend, and Yield", height=400, legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ) ) # Update y-axis labels fig.update_yaxes(title_text="Price ($)", secondary_y=False) fig.update_yaxes(title_text="Dividend/Yield", secondary_y=True) st.plotly_chart(fig, use_container_width=True) else: st.warning(""" Dividend yield data is not available for the selected ETFs. This could be because: 1. The selected ETFs don't pay dividends 2. There isn't enough dividend history in the time period selected 3. The API subscription level doesn't provide access to dividend data Try selecting different ETFs or a longer time period. """) # Allow removing ETFs from comparison if st.session_state.comparison_etfs: st.subheader("Remove ETFs") remove_cols = st.columns(len(st.session_state.comparison_etfs)) for i, symbol in enumerate(st.session_state.comparison_etfs): with remove_cols[i]: if st.button(f"Remove {symbol}", key=f"remove_yield_{symbol}"): st.session_state.comparison_etfs.remove(symbol) st.rerun() # Clear all button if st.button("Clear All Yield"): st.session_state.comparison_etfs = [] st.rerun() def display_export(): """Display export options.""" st.header("📤 Export ETF Data") st.info("This feature will allow you to export ETF data to CSV, PDF, or directly to the ETF Portfolio Builder application.") # Placeholder for future implementation st.warning("Export functionality will be implemented in a future update.") def test_api_connection(): """Test the connection to the FMP API.""" st.header("🔍 API Connection Test") # Get API key api_key = os.environ.get("FMP_API_KEY") if not api_key: api_key = st.session_state.get("fmp_api_key") if not api_key: st.error("FMP API key not found. Please enter it in the sidebar.") return st.write("Testing connection to Financial Modeling Prep API...") # Try a simple API endpoint first with st.spinner("Testing API with ETF list endpoint..."): result = fmp_request("etf/list", debug_mode=True) if isinstance(result, dict) and "error" in result: st.error(f"❌ API Test Failed: {result['error']}") elif isinstance(result, list): st.success(f"✅ API Test Succeeded! Retrieved {len(result)} ETFs") # Show sample of the results if len(result) > 0: st.write("Sample ETF data:") sample_df = pd.DataFrame(result[:5]) st.dataframe(sample_df) else: st.warning("⚠️ Unexpected API response format") st.json(result) # Try a specific ETF profile test_ticker = "SPY" # S&P 500 ETF - should exist in any ETF database with st.spinner(f"Testing API with ETF profile for {test_ticker}..."): result = fmp_request(f"etf/profile/{test_ticker}", debug_mode=True) if isinstance(result, dict) and "error" in result: st.error(f"❌ API Test Failed: {result['error']}") elif isinstance(result, list) and len(result) > 0: st.success(f"✅ API Test Succeeded! Retrieved profile for {test_ticker}") # Show the profile data st.write(f"{test_ticker} Profile Data:") profile = result[0] st.json(profile) else: st.warning("⚠️ Unexpected API response format") st.json(result) # Try searching for MSTY ticker test_ticker_msty = "MSTY" with st.spinner(f"Testing search for {test_ticker_msty}..."): # Get ETF list etf_list = fmp_request("etf/list", debug_mode=True) if isinstance(etf_list, list): # Check if MSTY is in the list etf_list_df = pd.DataFrame(etf_list) if 'symbol' in etf_list_df.columns: if test_ticker_msty in etf_list_df['symbol'].values: st.success(f"✅ {test_ticker_msty} is a valid ETF in the FMP database") # Try to get profile msty_profile = fmp_request(f"etf/profile/{test_ticker_msty}", debug_mode=True) if isinstance(msty_profile, list) and len(msty_profile) > 0: st.success(f"✅ Retrieved profile for {test_ticker_msty}") st.json(msty_profile[0]) else: st.error(f"❌ {test_ticker_msty} is in the ETF list but profile couldn't be retrieved") else: st.warning(f"⚠️ {test_ticker_msty} is NOT found in the FMP ETF database") # Show closest matches st.write("Closest matching tickers:") closest = etf_list_df[etf_list_df['symbol'].str.contains(test_ticker_msty[:2], case=False)] if not closest.empty: st.dataframe(closest[['symbol', 'name']]) else: st.write("No similar tickers found") else: st.error("ETF list doesn't contain symbol column") else: st.error("Failed to retrieve ETF list for validation") # Show API connection instructions st.subheader("📋 Troubleshooting Steps") st.markdown(""" If the tests above failed, please check: 1. **API Key**: Make sure your FMP API key is correct and active - Verify at [FMP Dashboard](https://financialmodelingprep.com/developer/docs/) - Ensure you have an active subscription that includes ETF data 2. **Internet Connection**: Check that you can access financialmodelingprep.com 3. **API Limits**: You might have exceeded your API call limit for the day 4. **Firewall/Network**: Ensure your network allows API calls to external services """) def display_whale_analysis_comparison(): """Display institutional ownership comparison for selected ETFs.""" st.subheader("Institutional Ownership Comparison") if not st.session_state.comparison_etfs: st.warning("Please select at least one ETF to analyze.") return # Fetch data for all ETFs institutional_data = {} ownership_concentration = [] with st.spinner("Loading institutional ownership data..."): for symbol in st.session_state.comparison_etfs: # Get institutional ownership data holders_df = get_institutional_ownership( symbol, force_refresh=st.session_state.get("force_refresh", False) ) # Store data if available if not holders_df.empty and "percentage" in holders_df.columns: institutional_data[symbol] = holders_df # Calculate metrics total_institutional = holders_df["percentage"].sum() # Get top 5 holders percentage holders_df_sorted = holders_df.sort_values("percentage", ascending=False) top_5_pct = holders_df.head(5)["percentage"].sum() if len(holders_df) >= 5 else holders_df["percentage"].sum() # Store concentration data ownership_concentration.append({ "Symbol": symbol, "Total Institutional": total_institutional, "Top 5 Concentration": top_5_pct, "Number of Institutions": len(holders_df) }) # Display concentration metrics comparison if ownership_concentration: st.subheader("Ownership Concentration") concentration_df = pd.DataFrame(ownership_concentration) concentration_df.set_index("Symbol", inplace=True) # Format percentages concentration_df["Total Institutional"] = concentration_df["Total Institutional"].apply(lambda x: f"{x:.2f}%") concentration_df["Top 5 Concentration"] = concentration_df["Top 5 Concentration"].apply(lambda x: f"{x:.2f}%") st.dataframe(concentration_df, use_container_width=True) # Create bar chart comparing institutional ownership chart_data = pd.DataFrame(ownership_concentration) fig = px.bar( chart_data, x="Symbol", y="Total Institutional", title="Total Institutional Ownership Percentage", labels={"Total Institutional": "Ownership %"} ) st.plotly_chart(fig, use_container_width=True) # Compare top holders across ETFs st.subheader("Top 3 Institutional Holders by ETF") for symbol, holders_df in institutional_data.items(): if not holders_df.empty and "percentage" in holders_df.columns: st.write(f"**{symbol}**") # Display top 3 holders top_holders = holders_df.sort_values("percentage", ascending=False).head(3) # Format display dataframe display_df = top_holders.copy() # Rename columns for better display column_mapping = { "holder": "Holder", "shares": "Shares", "sharesHeld": "Shares Held", "dateReported": "Date Reported", "percentage": "Percentage" } display_df = display_df.rename(columns={k: v for k, v in column_mapping.items() if k in display_df.columns}) # Format percentage column if "Percentage" in display_df.columns: display_df["Percentage"] = display_df["Percentage"].apply(lambda x: f"{x:.2f}%") st.dataframe(display_df, use_container_width=True) else: st.warning("No institutional ownership data available for the selected ETFs.") st.info("This data may require a premium API subscription.") def display_dividend_calendar_comparison(): """Display dividend calendar comparison for selected ETFs.""" st.subheader("Dividend Distribution Calendar Comparison") if not st.session_state.comparison_etfs: st.warning("Please select at least one ETF to analyze.") return # Fetch data for all ETFs dividend_data = {} distribution_patterns = {} with st.spinner("Loading dividend calendar data..."): for symbol in st.session_state.comparison_etfs: # Get dividend calendar data dividend_df = get_dividend_calendar( symbol, force_refresh=st.session_state.get("force_refresh", False) ) # Store data if available if not dividend_df.empty: dividend_data[symbol] = dividend_df # Store distribution pattern if "distribution_pattern" in dividend_df.columns: pattern = dividend_df["distribution_pattern"].iloc[0] distribution_patterns[symbol] = pattern else: distribution_patterns[symbol] = "Unknown" # Display distribution patterns comparison if distribution_patterns: st.subheader("Distribution Patterns") # Create DataFrame for display patterns_df = pd.DataFrame([ {"Symbol": symbol, "Distribution Pattern": pattern} for symbol, pattern in distribution_patterns.items() ]) patterns_df.set_index("Symbol", inplace=True) st.dataframe(patterns_df, use_container_width=True) # Create chart to visualize monthly distribution patterns st.subheader("Monthly Distribution Patterns") # Prepare data for chart monthly_data = [] # Get latest 2 years of data current_year = datetime.now().year min_year = current_year - 2 for symbol, df in dividend_data.items(): if "date" in df.columns and "month" in df.columns and "dividend" in df.columns: # Filter to recent data recent_df = df[df["year"] >= min_year] if not recent_df.empty: # Calculate average dividend by month monthly_avg = recent_df.groupby("month")["dividend"].mean().reset_index() # Make sure all months are represented all_months = pd.DataFrame({"month": range(1, 13)}) monthly_avg = pd.merge(all_months, monthly_avg, on="month", how="left") monthly_avg["dividend"] = monthly_avg["dividend"].fillna(0) # Add symbol column monthly_avg["Symbol"] = symbol # Add to monthly data monthly_data.append(monthly_avg) if monthly_data: # Combine all monthly data combined_monthly = pd.concat(monthly_data) # Add month name combined_monthly["month_name"] = combined_monthly["month"].apply( lambda x: datetime(2000, x, 1).strftime("%b") ) # Create bar chart fig = px.bar( combined_monthly, x="month", y="dividend", color="Symbol", barmode="group", labels={"month": "Month", "dividend": "Avg Dividend Amount ($)"}, title="Average Monthly Dividend Distribution", category_orders={"month": list(range(1, 13))} ) # Update x-axis to show month names fig.update_layout( xaxis=dict( tickmode="array", tickvals=list(range(1, 13)), ticktext=[datetime(2000, m, 1).strftime("%b") for m in range(1, 13)] ) ) st.plotly_chart(fig, use_container_width=True) # Create a heatmap grid for each ETF st.subheader("Dividend Calendar Heatmaps") for symbol, df in dividend_data.items(): if "date" in df.columns and "month" in df.columns and "year" in df.columns and "dividend" in df.columns: # Filter to recent data recent_df = df[df["year"] >= min_year] if not recent_df.empty: st.write(f"**{symbol} Dividend Calendar**") try: # Group by year and month calendar_data = recent_df.groupby(["year", "month"]).agg({ "dividend": "sum" }).reset_index() # Create a complete month-year grid with all possible combinations all_years = sorted(calendar_data["year"].unique()) # Ensure we have a complete grid by reindexing pivot_data = calendar_data.pivot_table( index="month", columns="year", values="dividend", fill_value=0 # Fill missing values with 0 ) # Reindex to ensure all 12 months are included pivot_data = pivot_data.reindex(list(range(1, 13)), fill_value=0) # Get month labels month_labels = [datetime(2000, i, 1).strftime("%b") for i in range(1, 13)] # Create heatmap fig = px.imshow( pivot_data, labels=dict(x="Year", y="Month", color="Dividend Amount"), x=pivot_data.columns.tolist(), y=month_labels, aspect="auto", title=f"{symbol} Dividend Distribution Calendar" ) st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Unable to generate calendar for {symbol}: {str(e)}") # Display recent dividend payments st.subheader("Recent Dividend Payments") for symbol, df in dividend_data.items(): if "date" in df.columns and "dividend" in df.columns: st.write(f"**{symbol}**") # Display recent dividends recent_dividends = df.sort_values("date", ascending=False).head(5) # Format display dataframe display_df = recent_dividends[["date", "dividend"]].copy() display_df["date"] = display_df["date"].dt.strftime("%Y-%m-%d") display_df.columns = ["Date", "Dividend Amount ($)"] st.dataframe(display_df, use_container_width=True) else: st.warning("No dividend calendar data available for the selected ETFs.") st.info("Some ETFs may not pay dividends, or dividend history may be limited.") def get_dividend_sustainability(symbol: str, force_refresh: bool = False) -> Dict: """Calculate dividend sustainability metrics for an ETF. Args: symbol: ETF ticker symbol force_refresh: Whether to force refresh data from API Returns: Dictionary with dividend sustainability metrics """ debug_mode = st.session_state.get("debug_mode", False) # Get ETF holdings first holdings = get_etf_holdings(symbol, force_refresh=force_refresh) # Get dividend history dividend_history = get_etf_dividend_history(symbol, force_refresh=force_refresh) results = { "symbol": symbol, "payout_ratio": None, "dividend_growth_rate": None, "growth_years": 0, "dividend_consistency": None } # Calculate dividend growth rate if we have sufficient dividend history if not dividend_history.empty and "date" in dividend_history.columns and "dividend" in dividend_history.columns: try: # Sort by date dividend_history = dividend_history.sort_values("date") # Convert date to datetime if not already dividend_history["date"] = pd.to_datetime(dividend_history["date"]) # Add year column dividend_history["year"] = dividend_history["date"].dt.year # Calculate annual dividends annual_dividends = dividend_history.groupby("year")["dividend"].sum().reset_index() if len(annual_dividends) >= 3: # Need at least 3 years for meaningful growth rate # Calculate year-over-year growth rates annual_dividends["growth_rate"] = annual_dividends["dividend"].pct_change() # Remove first year (which has NaN growth rate) annual_dividends = annual_dividends.dropna() # Calculate average growth rate avg_growth_rate = annual_dividends["growth_rate"].mean() * 100 # Get number of years with data growth_years = len(annual_dividends) # Calculate consistency (percentage of years with positive growth) positive_growth_years = (annual_dividends["growth_rate"] > 0).sum() consistency = (positive_growth_years / len(annual_dividends)) * 100 results["dividend_growth_rate"] = avg_growth_rate results["growth_years"] = growth_years results["dividend_consistency"] = consistency if debug_mode: st.write(f"Annual dividends for {symbol}:", annual_dividends) except Exception as e: if debug_mode: st.error(f"Error calculating dividend growth rate: {str(e)}") # Calculate average payout ratio for holdings if available if not holdings.empty and "asset" in holdings.columns: try: # Filter to top holdings that represent majority of ETF if "weightPercentage" in holdings.columns: sorted_holdings = holdings.sort_values("weightPercentage", ascending=False) top_holdings = sorted_holdings.head(10) # Top 10 holdings else: top_holdings = holdings.head(10) # Get tickers of top holdings if "asset" in top_holdings.columns: tickers = top_holdings["asset"].tolist() # Calculate payout ratios for each ticker payout_ratios = [] for ticker in tickers: # Strip any exchange or extra information from ticker ticker = ticker.split(':')[-1].split(' ')[0] # Get financial data financial_data = fmp_request( f"key-metrics-ttm/{ticker}", force_refresh=force_refresh, debug_mode=debug_mode ) if isinstance(financial_data, list) and len(financial_data) > 0: if "payoutRatioTTM" in financial_data[0]: payout_ratio = financial_data[0]["payoutRatioTTM"] if payout_ratio is not None and payout_ratio < 2: # Filter out extreme values payout_ratios.append(payout_ratio * 100) # Convert to percentage # Calculate average payout ratio if we have data if payout_ratios: avg_payout_ratio = sum(payout_ratios) / len(payout_ratios) results["payout_ratio"] = avg_payout_ratio if debug_mode: st.write(f"Payout ratios for {symbol} holdings:", payout_ratios) except Exception as e: if debug_mode: st.error(f"Error calculating payout ratio: {str(e)}") return results def assess_dividend_sustainability(metrics: Dict) -> Dict: """Assess dividend sustainability based on metrics. Args: metrics: Dictionary with dividend sustainability metrics Returns: Dictionary with sustainability assessments """ assessment = { "sustainability_score": 0, "payout_ratio_assessment": "No Data", "growth_rate_assessment": "No Data", "consistency_assessment": "No Data", "overall_assessment": "No Data" } score = 0 max_score = 0 # Assess payout ratio (lower is better) if metrics.get("payout_ratio") is not None: max_score += 1 payout_ratio = metrics["payout_ratio"] if payout_ratio < 30: assessment["payout_ratio_assessment"] = "Excellent" score += 1 elif payout_ratio < 50: assessment["payout_ratio_assessment"] = "Good" score += 0.75 elif payout_ratio < 70: assessment["payout_ratio_assessment"] = "Fair" score += 0.5 elif payout_ratio < 90: assessment["payout_ratio_assessment"] = "Caution" score += 0.25 else: assessment["payout_ratio_assessment"] = "High Risk" score += 0 # Assess dividend growth rate (higher is better) if metrics.get("dividend_growth_rate") is not None: max_score += 1 growth_rate = metrics["dividend_growth_rate"] if growth_rate > 10: assessment["growth_rate_assessment"] = "Excellent" score += 1 elif growth_rate > 5: assessment["growth_rate_assessment"] = "Good" score += 0.75 elif growth_rate > 0: assessment["growth_rate_assessment"] = "Fair" score += 0.5 elif growth_rate > -5: assessment["growth_rate_assessment"] = "Caution" score += 0.25 else: assessment["growth_rate_assessment"] = "Declining" score += 0 # Assess consistency (higher is better) if metrics.get("dividend_consistency") is not None: max_score += 1 consistency = metrics["dividend_consistency"] if consistency > 90: assessment["consistency_assessment"] = "Excellent" score += 1 elif consistency > 75: assessment["consistency_assessment"] = "Good" score += 0.75 elif consistency > 50: assessment["consistency_assessment"] = "Fair" score += 0.5 elif consistency > 25: assessment["consistency_assessment"] = "Inconsistent" score += 0.25 else: assessment["consistency_assessment"] = "Unreliable" score += 0 # Calculate overall sustainability score if max_score > 0: sustainability_score = (score / max_score) * 100 assessment["sustainability_score"] = sustainability_score # Overall assessment based on sustainability score if sustainability_score > 80: assessment["overall_assessment"] = "Highly Sustainable" elif sustainability_score > 60: assessment["overall_assessment"] = "Sustainable" elif sustainability_score > 40: assessment["overall_assessment"] = "Moderately Sustainable" elif sustainability_score > 20: assessment["overall_assessment"] = "Questionable Sustainability" else: assessment["overall_assessment"] = "Unsustainable" return assessment def display_dividend_sustainability(symbol: str): """Display dividend sustainability analysis.""" st.subheader("Dividend Sustainability Analysis") with st.spinner("Calculating dividend sustainability metrics..."): metrics = get_dividend_sustainability( symbol, force_refresh=st.session_state.get("force_refresh", False) ) assessment = assess_dividend_sustainability(metrics) # Display results has_data = (metrics.get("payout_ratio") is not None or metrics.get("dividend_growth_rate") is not None or metrics.get("dividend_consistency") is not None) if not has_data: st.warning("No dividend sustainability data available for this ETF.") st.info("⚠️ PREMIUM API FEATURE: Dividend sustainability analysis requires both dividend history data and holdings data with financial metrics. These detailed analytics typically require a paid FMP API subscription.") st.info("Without a premium subscription, the app is unable to calculate payout ratios and growth consistency metrics needed for sustainability analysis.") return # Create columns for metrics col1, col2 = st.columns(2) with col1: # Sustainability score gauge chart if assessment["sustainability_score"] > 0: fig = go.Figure(go.Indicator( mode="gauge+number", value=assessment["sustainability_score"], domain={'x': [0, 1], 'y': [0, 1]}, title={'text': "Sustainability Score"}, gauge={ 'axis': {'range': [0, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 20], 'color': "red"}, {'range': [20, 40], 'color': "orange"}, {'range': [40, 60], 'color': "yellow"}, {'range': [60, 80], 'color': "lightgreen"}, {'range': [80, 100], 'color': "green"} ], 'threshold': { 'line': {'color': "black", 'width': 4}, 'thickness': 0.75, 'value': assessment["sustainability_score"] } } )) fig.update_layout( height=250, margin=dict(l=20, r=20, t=50, b=20), ) st.plotly_chart(fig, use_container_width=True) # Overall assessment st.metric( "Overall Assessment", assessment["overall_assessment"] ) with col2: # Metrics table metrics_data = [] if metrics.get("payout_ratio") is not None: metrics_data.append({ "Metric": "Average Payout Ratio", "Value": f"{metrics['payout_ratio']:.2f}%", "Assessment": assessment["payout_ratio_assessment"] }) if metrics.get("dividend_growth_rate") is not None: metrics_data.append({ "Metric": f"{metrics['growth_years']}-Year Dividend Growth Rate", "Value": f"{metrics['dividend_growth_rate']:.2f}%", "Assessment": assessment["growth_rate_assessment"] }) if metrics.get("dividend_consistency") is not None: metrics_data.append({ "Metric": "Dividend Growth Consistency", "Value": f"{metrics['dividend_consistency']:.2f}%", "Assessment": assessment["consistency_assessment"] }) if metrics_data: st.dataframe( pd.DataFrame(metrics_data), use_container_width=True, hide_index=True ) # Add detailed explanation st.write(""" ### Understanding Dividend Sustainability The sustainability score evaluates how likely the ETF can maintain or grow its dividend payments over time. A higher score indicates better sustainability. **Interpreting the Score:** - **80-100**: Highly Sustainable - Strong likelihood of continued dividend growth - **60-80**: Sustainable - Good prospects for maintaining dividends - **40-60**: Moderately Sustainable - May maintain dividends but with limited growth - **20-40**: Questionable Sustainability - Risk of dividend cuts - **0-20**: Unsustainable - High probability of dividend reduction **Key Metrics:** - **Payout Ratio**: Percentage of earnings paid as dividends - Below 30%: Excellent (very safe) - 30-50%: Good (safe) - 50-70%: Fair (sustainable) - 70-90%: Caution (potentially unsustainable) - Above 90%: High Risk (likely unsustainable) - **Dividend Growth Rate**: Annual growth rate of dividend payments - Above 10%: Excellent growth - 5-10%: Good growth - 0-5%: Fair growth - Below 0%: Declining dividends - **Growth Consistency**: Percentage of years with positive dividend growth - Higher percentages indicate more reliable dividend growth """) # Display annual dividend growth chart if we have the data if metrics.get("growth_years", 0) >= 3: with st.spinner("Generating dividend growth chart..."): # Get dividend history again to generate the chart dividend_history = get_etf_dividend_history( symbol, force_refresh=st.session_state.get("force_refresh", False) ) if not dividend_history.empty: try: # Sort by date dividend_history = dividend_history.sort_values("date") # Convert date to datetime if not already dividend_history["date"] = pd.to_datetime(dividend_history["date"]) # Add year column dividend_history["year"] = dividend_history["date"].dt.year # Calculate annual dividends annual_dividends = dividend_history.groupby("year")["dividend"].sum().reset_index() # Calculate YoY growth rates annual_dividends["growth_rate"] = annual_dividends["dividend"].pct_change() * 100 annual_dividends["growth_rate"] = annual_dividends["growth_rate"].round(2) # Create figure with two y-axes fig = make_subplots(specs=[[{"secondary_y": True}]]) # Add annual dividends as bars fig.add_trace( go.Bar( x=annual_dividends["year"], y=annual_dividends["dividend"], name="Annual Dividend", marker_color="blue" ), secondary_y=False ) # Add growth rates as a line (excluding first year which has NaN growth) growth_df = annual_dividends.dropna() if not growth_df.empty: fig.add_trace( go.Scatter( x=growth_df["year"], y=growth_df["growth_rate"], name="YoY Growth Rate", marker_color="red", mode="lines+markers" ), secondary_y=True ) # Update layout fig.update_layout( title=f"Annual Dividends and Growth Rates for {symbol}", xaxis_title="Year", legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ) ) # Update y-axes titles fig.update_yaxes(title_text="Annual Dividend ($)", secondary_y=False) fig.update_yaxes(title_text="YoY Growth Rate (%)", secondary_y=True) # Display chart st.subheader("Dividend Growth History") st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"Error generating dividend growth chart: {str(e)}") def display_dividend_sustainability_comparison(): """Display dividend sustainability comparison for selected ETFs.""" st.subheader("Dividend Sustainability Comparison") if not st.session_state.comparison_etfs: st.warning("Please select at least one ETF to analyze.") return # Fetch data for all ETFs sustainability_data = {} assessment_data = [] with st.spinner("Calculating dividend sustainability metrics..."): for symbol in st.session_state.comparison_etfs: # Get sustainability metrics metrics = get_dividend_sustainability( symbol, force_refresh=st.session_state.get("force_refresh", False) ) # Store metrics if available if metrics and (metrics.get("payout_ratio") is not None or metrics.get("dividend_growth_rate") is not None or metrics.get("dividend_consistency") is not None): sustainability_data[symbol] = metrics # Get assessment assessment = assess_dividend_sustainability(metrics) # Create assessment summary summary = { "Symbol": symbol, "Sustainability Score": f"{assessment.get('sustainability_score', 0):.1f}", "Overall Assessment": assessment.get("overall_assessment", "No Data") } if metrics.get("payout_ratio") is not None: summary["Payout Ratio"] = f"{metrics['payout_ratio']:.2f}%" summary["Payout Assessment"] = assessment.get("payout_ratio_assessment", "No Data") if metrics.get("dividend_growth_rate") is not None: summary["Growth Rate"] = f"{metrics['dividend_growth_rate']:.2f}%" summary["Growth Assessment"] = assessment.get("growth_rate_assessment", "No Data") if metrics.get("dividend_consistency") is not None: summary["Consistency"] = f"{metrics['dividend_consistency']:.2f}%" summary["Consistency Assessment"] = assessment.get("consistency_assessment", "No Data") assessment_data.append(summary) # Display assessment comparison table if assessment_data: st.subheader("Dividend Sustainability Assessment") assessment_df = pd.DataFrame(assessment_data) assessment_df.set_index("Symbol", inplace=True) st.dataframe(assessment_df, use_container_width=True) # Create bar chart comparing sustainability scores score_data = [] for symbol in sustainability_data.keys(): assessment = assess_dividend_sustainability(sustainability_data[symbol]) score = assessment.get("sustainability_score", 0) if score > 0: score_data.append({ "Symbol": symbol, "Sustainability Score": score }) if score_data: score_df = pd.DataFrame(score_data) # Create bar chart fig = px.bar( score_df, x="Symbol", y="Sustainability Score", title="Dividend Sustainability Score Comparison", color="Sustainability Score", color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"], range_color=[0, 100] ) st.plotly_chart(fig, use_container_width=True) # Create comparison charts # 1. Payout ratio comparison payout_data = [] for symbol, metrics in sustainability_data.items(): if metrics.get("payout_ratio") is not None: payout_data.append({ "Symbol": symbol, "Payout Ratio": metrics["payout_ratio"] }) if payout_data and len(payout_data) > 1: # Only show if we have multiple ETFs st.subheader("Payout Ratio Comparison") payout_df = pd.DataFrame(payout_data) # Create bar chart fig = px.bar( payout_df, x="Symbol", y="Payout Ratio", title="Average Payout Ratio of Holdings", color="Payout Ratio", color_continuous_scale=["green", "lightgreen", "yellow", "orange", "red"], range_color=[0, 100] ) # Add reference lines for different thresholds fig.add_hline(y=30, line_dash="dash", line_color="green", annotation_text="Excellent (<30%)", annotation_position="bottom right") fig.add_hline(y=70, line_dash="dash", line_color="orange", annotation_text="Warning (>70%)", annotation_position="bottom right") st.plotly_chart(fig, use_container_width=True) # 2. Growth rate comparison growth_data = [] for symbol, metrics in sustainability_data.items(): if metrics.get("dividend_growth_rate") is not None: growth_data.append({ "Symbol": symbol, "Growth Rate": metrics["dividend_growth_rate"], "Years": metrics["growth_years"] }) if growth_data and len(growth_data) > 1: # Only show if we have multiple ETFs st.subheader("Dividend Growth Rate Comparison") growth_df = pd.DataFrame(growth_data) # Create bar chart fig = px.bar( growth_df, x="Symbol", y="Growth Rate", title="Average Annual Dividend Growth Rate", color="Growth Rate", color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"], text="Years", hover_data=["Years"] ) # Add reference line for 0% growth fig.add_hline(y=0, line_dash="dash", line_color="gray") # Add reference line for good growth rate fig.add_hline(y=5, line_dash="dash", line_color="green", annotation_text="Good Growth (>5%)", annotation_position="bottom right") st.plotly_chart(fig, use_container_width=True) else: st.warning("No dividend sustainability data available for the selected ETFs.") st.info(""" This could be due to: 1. The selected ETFs don't have sufficient dividend history 2. Holdings data is not available for the ETFs 3. Financial data for the ETF holdings is not accessible Try selecting ETFs with longer dividend history or more accessible holding data. """) def get_esg_score(symbol: str, force_refresh: bool = False) -> Dict: """Get ESG (Environmental, Social, Governance) scores for an ETF. Args: symbol: ETF ticker symbol force_refresh: Whether to force refresh data from API Returns: Dictionary with ESG score data """ debug_mode = st.session_state.get("debug_mode", False) # Try to get ESG score from FMP API esg_data = fmp_request(f"esg-score/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode) # Initialize results dictionary results = { "symbol": symbol, "esg_score": None, "environmental_score": None, "social_score": None, "governance_score": None, "year": None, "peer_comparison": None } # Check for error or empty response if isinstance(esg_data, dict) and "error" in esg_data: if debug_mode: st.warning(f"ESG score data not available via direct API for {symbol}") # Try alternative approach - get ESG scores from holdings results = get_esg_from_holdings(symbol, force_refresh, debug_mode) return results # Process ESG data if available if isinstance(esg_data, list) and len(esg_data) > 0: try: # Get most recent ESG data recent_esg = esg_data[0] # Extract ESG scores if "totalEsg" in recent_esg: results["esg_score"] = recent_esg["totalEsg"] if "environmentalScore" in recent_esg: results["environmental_score"] = recent_esg["environmentalScore"] if "socialScore" in recent_esg: results["social_score"] = recent_esg["socialScore"] if "governanceScore" in recent_esg: results["governance_score"] = recent_esg["governanceScore"] if "year" in recent_esg: results["year"] = recent_esg["year"] # Include peer comparison if available if "peerGroup" in recent_esg: results["peer_comparison"] = { "group": recent_esg.get("peerGroup"), "avg_esg": recent_esg.get("peerEsgScorePerformance", 0), "percentile": recent_esg.get("percentile", 0) } return results except Exception as e: if debug_mode: st.error(f"Error processing ESG data: {str(e)}") # Try alternative approach return get_esg_from_holdings(symbol, force_refresh, debug_mode) # If no direct ESG data, try getting ESG from holdings return get_esg_from_holdings(symbol, force_refresh, debug_mode) def get_esg_from_holdings(symbol: str, force_refresh: bool, debug_mode: bool) -> Dict: """Get ESG scores by aggregating data from ETF holdings. Args: symbol: ETF ticker symbol force_refresh: Whether to force refresh data from API debug_mode: Whether to show debug information Returns: Dictionary with aggregated ESG score data """ # Initialize results dictionary results = { "symbol": symbol, "esg_score": None, "environmental_score": None, "social_score": None, "governance_score": None, "year": datetime.now().year, # Use current year for aggregated data "is_aggregated": True # Flag to indicate this is aggregated from holdings } # Get ETF holdings holdings = get_etf_holdings(symbol, force_refresh=force_refresh) if holdings.empty or "asset" not in holdings.columns: return results try: # Filter to top holdings that represent majority of ETF if "weightPercentage" in holdings.columns: sorted_holdings = holdings.sort_values("weightPercentage", ascending=False) top_holdings = sorted_holdings.head(10) # Top 10 holdings else: top_holdings = holdings.head(10) # Get tickers of top holdings if "asset" in top_holdings.columns: tickers = top_holdings["asset"].tolist() # Initialize lists to store ESG scores esg_scores = [] env_scores = [] social_scores = [] gov_scores = [] # Get weights if available weights = [] if "weightPercentage" in top_holdings.columns: weights = top_holdings["weightPercentage"].tolist() # Normalize weights to sum to 1 total_weight = sum(weights) if total_weight > 0: weights = [w / total_weight for w in weights] # If no weights, use equal weighting if not weights: weights = [1 / len(tickers)] * len(tickers) # Collect ESG scores for each ticker for i, ticker in enumerate(tickers): # Strip any exchange or extra information from ticker ticker = ticker.split(':')[-1].split(' ')[0] # Get ESG data for the ticker ticker_esg = fmp_request( f"esg-score/{ticker}", force_refresh=force_refresh, debug_mode=debug_mode ) if isinstance(ticker_esg, list) and len(ticker_esg) > 0: recent_esg = ticker_esg[0] # Extract ESG scores and apply weight if "totalEsg" in recent_esg: esg_scores.append(recent_esg["totalEsg"] * weights[i]) if "environmentalScore" in recent_esg: env_scores.append(recent_esg["environmentalScore"] * weights[i]) if "socialScore" in recent_esg: social_scores.append(recent_esg["socialScore"] * weights[i]) if "governanceScore" in recent_esg: gov_scores.append(recent_esg["governanceScore"] * weights[i]) # Calculate weighted average scores if esg_scores: results["esg_score"] = sum(esg_scores) if env_scores: results["environmental_score"] = sum(env_scores) if social_scores: results["social_score"] = sum(social_scores) if gov_scores: results["governance_score"] = sum(gov_scores) return results except Exception as e: if debug_mode: st.error(f"Error calculating ESG scores from holdings: {str(e)}") return results def assess_esg_score(esg_data: Dict) -> Dict: """Assess ESG scores based on standard industry thresholds. Args: esg_data: Dictionary with ESG score data Returns: Dictionary with ESG score assessments """ assessment = { "esg_rating": "No Data", "environmental_rating": "No Data", "social_rating": "No Data", "governance_rating": "No Data", "overall_assessment": "No Data" } # Rate overall ESG score if esg_data.get("esg_score") is not None: esg_score = esg_data["esg_score"] if esg_score >= 70: assessment["esg_rating"] = "Excellent" elif esg_score >= 60: assessment["esg_rating"] = "Very Good" elif esg_score >= 50: assessment["esg_rating"] = "Good" elif esg_score >= 40: assessment["esg_rating"] = "Average" elif esg_score >= 30: assessment["esg_rating"] = "Below Average" else: assessment["esg_rating"] = "Poor" # Rate environmental score if esg_data.get("environmental_score") is not None: env_score = esg_data["environmental_score"] if env_score >= 70: assessment["environmental_rating"] = "Excellent" elif env_score >= 60: assessment["environmental_rating"] = "Very Good" elif env_score >= 50: assessment["environmental_rating"] = "Good" elif env_score >= 40: assessment["environmental_rating"] = "Average" elif env_score >= 30: assessment["environmental_rating"] = "Below Average" else: assessment["environmental_rating"] = "Poor" # Rate social score if esg_data.get("social_score") is not None: social_score = esg_data["social_score"] if social_score >= 70: assessment["social_rating"] = "Excellent" elif social_score >= 60: assessment["social_rating"] = "Very Good" elif social_score >= 50: assessment["social_rating"] = "Good" elif social_score >= 40: assessment["social_rating"] = "Average" elif social_score >= 30: assessment["social_rating"] = "Below Average" else: assessment["social_rating"] = "Poor" # Rate governance score if esg_data.get("governance_score") is not None: gov_score = esg_data["governance_score"] if gov_score >= 70: assessment["governance_rating"] = "Excellent" elif gov_score >= 60: assessment["governance_rating"] = "Very Good" elif gov_score >= 50: assessment["governance_rating"] = "Good" elif gov_score >= 40: assessment["governance_rating"] = "Average" elif gov_score >= 30: assessment["governance_rating"] = "Below Average" else: assessment["governance_rating"] = "Poor" # Overall assessment based on ESG rating if assessment["esg_rating"] != "No Data": assessment["overall_assessment"] = assessment["esg_rating"] return assessment def display_esg_analysis(symbol: str): """Display ESG analysis for an ETF.""" st.subheader("ESG (Environmental, Social, Governance) Analysis") with st.spinner("Loading ESG data..."): esg_data = get_esg_score( symbol, force_refresh=st.session_state.get("force_refresh", False) ) assessment = assess_esg_score(esg_data) # Display results has_data = (esg_data.get("esg_score") is not None or esg_data.get("environmental_score") is not None or esg_data.get("social_score") is not None or esg_data.get("governance_score") is not None) if not has_data: st.warning("No ESG data available for this ETF.") st.info("⚠️ PREMIUM API FEATURE: Environmental, Social, and Governance (ESG) scores require the Professional or Enterprise tier of the FMP API subscription.") st.info("ESG data provides insights into sustainability practices, social responsibility, and governance quality of the ETF's holdings. This data is increasingly important for socially conscious investors.") return # Check if data is aggregated if esg_data.get("is_aggregated", False): st.info("ESG data is aggregated from top holdings and may not represent the official ESG score for this ETF.") # Create columns for overall score and components col1, col2 = st.columns([1, 2]) with col1: # Overall ESG score gauge chart if esg_data.get("esg_score") is not None: fig = go.Figure(go.Indicator( mode="gauge+number", value=esg_data["esg_score"], domain={'x': [0, 1], 'y': [0, 1]}, title={'text': "ESG Score"}, gauge={ 'axis': {'range': [0, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 30], 'color': "red"}, {'range': [30, 50], 'color': "orange"}, {'range': [50, 70], 'color': "lightgreen"}, {'range': [70, 100], 'color': "green"} ], 'threshold': { 'line': {'color': "black", 'width': 4}, 'thickness': 0.75, 'value': esg_data["esg_score"] } } )) fig.update_layout( height=250, margin=dict(l=20, r=20, t=50, b=20), ) st.plotly_chart(fig, use_container_width=True) # Overall rating st.metric( "ESG Rating", assessment["overall_assessment"] ) # Data year if esg_data.get("year") is not None: st.caption(f"Data Year: {esg_data['year']}") with col2: # ESG component scores component_data = [] components = [ ("Environmental", esg_data.get("environmental_score"), assessment.get("environmental_rating")), ("Social", esg_data.get("social_score"), assessment.get("social_rating")), ("Governance", esg_data.get("governance_score"), assessment.get("governance_rating")) ] for name, score, rating in components: if score is not None: component_data.append({ "Component": name, "Score": score, "Rating": rating }) # Create horizontal bar chart for components if component_data: fig = px.bar( pd.DataFrame(component_data), y="Component", x="Score", color="Score", color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"], range_color=[0, 100], labels={"Score": "Score (0-100)"}, title="ESG Component Scores", text="Rating", orientation="h" ) # Update layout fig.update_layout( yaxis=dict(autorange="reversed"), # Reverse y-axis for better reading height=250, margin=dict(l=20, r=20, t=50, b=20) ) st.plotly_chart(fig, use_container_width=True) # Add explanation st.caption(""" **ESG Score Interpretation:** - **Environmental**: Evaluates resource use, emissions, innovation, and environmental impact - **Social**: Assesses workforce, human rights, community, and product responsibility - **Governance**: Reviews management structure, policies, and shareholder relations - **Overall ESG Score**: Combined metric (scale 0-100, higher is better) """) # Show peer comparison if available if esg_data.get("peer_comparison") is not None: st.subheader("Peer Comparison") peer = esg_data["peer_comparison"] peer_col1, peer_col2 = st.columns(2) with peer_col1: st.metric( "Peer Group", peer.get("group", "Not Available") ) with peer_col2: st.metric( "Percentile Rank", f"{peer.get('percentile', 0):.0f}%", help="Higher percentile means better ESG performance relative to peers" ) # Create comparison chart if we have peer average if peer.get("avg_esg") is not None and esg_data.get("esg_score") is not None: peer_data = pd.DataFrame([ {"Entity": symbol, "ESG Score": esg_data["esg_score"]}, {"Entity": "Peer Average", "ESG Score": peer["avg_esg"]} ]) fig = px.bar( peer_data, x="Entity", y="ESG Score", color="Entity", title="ESG Score vs. Peer Average", text="ESG Score", text_auto=".1f" ) st.plotly_chart(fig, use_container_width=True) def display_esg_comparison(): """Display ESG score comparison for selected ETFs.""" st.subheader("ESG Score Comparison") if not st.session_state.comparison_etfs: st.warning("Please select at least one ETF to analyze.") return # Fetch ESG data for all ETFs esg_data = {} assessment_data = [] with st.spinner("Fetching ESG data..."): for symbol in st.session_state.comparison_etfs: # Get ESG metrics metrics = get_esg_score( symbol, force_refresh=st.session_state.get("force_refresh", False) ) # Store metrics if available if metrics and (metrics.get("esg_score") is not None or metrics.get("environmental_score") is not None or metrics.get("social_score") is not None or metrics.get("governance_score") is not None): esg_data[symbol] = metrics # Get assessment assessment = assess_esg_score(metrics) # Create assessment summary summary = { "Symbol": symbol, "ESG Score": metrics.get("esg_score"), "ESG Rating": assessment.get("esg_rating", "No Data") } if metrics.get("environmental_score") is not None: summary["Environmental"] = metrics["environmental_score"] summary["Env. Rating"] = assessment.get("environmental_rating", "No Data") if metrics.get("social_score") is not None: summary["Social"] = metrics["social_score"] summary["Social Rating"] = assessment.get("social_rating", "No Data") if metrics.get("governance_score") is not None: summary["Governance"] = metrics["governance_score"] summary["Gov. Rating"] = assessment.get("governance_rating", "No Data") if metrics.get("is_aggregated", False): summary["Data Source"] = "Aggregated" else: summary["Data Source"] = "Direct" assessment_data.append(summary) # Display assessment comparison table if assessment_data: st.subheader("ESG Score Assessment") assessment_df = pd.DataFrame(assessment_data) # Format numeric columns numeric_cols = ["ESG Score", "Environmental", "Social", "Governance"] for col in numeric_cols: if col in assessment_df.columns: assessment_df[col] = assessment_df[col].apply(lambda x: f"{x:.1f}" if x is not None else "N/A") # Set index to Symbol if "Symbol" in assessment_df.columns: assessment_df.set_index("Symbol", inplace=True) st.dataframe(assessment_df, use_container_width=True) # Create bar chart comparing overall ESG scores score_data = [] for symbol, metrics in esg_data.items(): if metrics.get("esg_score") is not None: score_data.append({ "Symbol": symbol, "ESG Score": metrics["esg_score"] }) if score_data and len(score_data) > 0: score_df = pd.DataFrame(score_data) # Create bar chart fig = px.bar( score_df, x="Symbol", y="ESG Score", title="ESG Score Comparison", color="ESG Score", color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"], range_color=[0, 100] ) # Add reference lines for different ESG thresholds fig.add_hline(y=70, line_dash="dash", line_color="green", annotation_text="Excellent (>70)", annotation_position="bottom right") fig.add_hline(y=50, line_dash="dash", line_color="gold", annotation_text="Good (>50)", annotation_position="bottom right") fig.add_hline(y=30, line_dash="dash", line_color="orange", annotation_text="Below Average (<30)", annotation_position="bottom right") st.plotly_chart(fig, use_container_width=True) # Create component comparison if we have multiple ETFs if len(esg_data) > 1: # Prepare data for ESG components comparison components = ["Environmental", "Social", "Governance"] component_data = [] for symbol, metrics in esg_data.items(): for component in components: component_key = component.lower() + "_score" if metrics.get(component_key) is not None: component_data.append({ "Symbol": symbol, "Component": component, "Score": metrics[component_key] }) if component_data: st.subheader("ESG Component Comparison") # Create grouped bar chart component_df = pd.DataFrame(component_data) fig = px.bar( component_df, x="Symbol", y="Score", color="Component", barmode="group", title="ESG Component Comparison", labels={"Score": "Score (0-100)"}, text="Score", text_auto=".1f" ) # Add reference line for good score fig.add_hline(y=50, line_dash="dash", line_color="gray") st.plotly_chart(fig, use_container_width=True) # Create radar chart for ESG component comparison radar_data = [] for symbol in esg_data.keys(): symbol_data = {"Symbol": symbol} for component in components: component_key = component.lower() + "_score" if esg_data[symbol].get(component_key) is not None: symbol_data[component] = esg_data[symbol][component_key] else: symbol_data[component] = 0 radar_data.append(symbol_data) if radar_data: # Create radar chart radar_df = pd.DataFrame(radar_data) # Fill NaN values with 0 radar_df = radar_df.fillna(0) fig = go.Figure() for i, row in radar_df.iterrows(): symbol = row["Symbol"] fig.add_trace(go.Scatterpolar( r=[row.get(c, 0) for c in components], theta=components, fill="toself", name=symbol )) fig.update_layout( polar=dict( radialaxis=dict( visible=True, range=[0, 100] ) ), title="ESG Component Radar Chart", showlegend=True ) st.plotly_chart(fig, use_container_width=True) else: st.warning("No ESG data available for the selected ETFs.") st.info(""" This could be due to: 1. ESG data is not available for the selected ETFs 2. Your API subscription level does not include ESG data Try selecting different ETFs or check your API subscription level. """) # --- Main Application --- def main(): # Initialize cache directory setup_cache_dir() # Title and Description st.title("📊 ETF Analyzer") st.write("Comprehensive ETF Analysis Tool for Investment Research") # Sidebar st.sidebar.header("Settings") # API Key Input api_key = st.sidebar.text_input( "FMP API Key", value=st.session_state.get("fmp_api_key", ""), type="password", help="Enter your Financial Modeling Prep API key." ) # If API key provided, update in session state and environment if api_key: st.session_state.fmp_api_key = api_key os.environ["FMP_API_KEY"] = api_key # Force refresh toggle st.session_state.force_refresh = st.sidebar.checkbox( "Force refresh data (ignore cache)", value=st.session_state.get("force_refresh", False), help="When enabled, always fetch fresh data from APIs" ) # Debug mode toggle st.session_state.debug_mode = st.sidebar.checkbox( "Debug Mode", value=st.session_state.get("debug_mode", False), help="Show detailed API request and response information" ) # Track API calls if "api_calls" not in st.session_state: st.session_state.api_calls = 0 st.sidebar.write(f"API calls this session: {st.session_state.api_calls}") # Navigation st.sidebar.header("Navigation") # Initialize current tab if not in session state if "current_tab" not in st.session_state: st.session_state.current_tab = "search" # Navigation buttons if st.sidebar.button("🔍 ETF Search", key="nav_search"): st.session_state.current_tab = "search" st.rerun() if st.sidebar.button("📊 ETF Analysis", key="nav_analysis"): if "selected_etf" in st.session_state: st.session_state.current_tab = "analysis" st.rerun() else: st.sidebar.warning("Please select an ETF first.") if st.sidebar.button("🔄 ETF Comparison", key="nav_comparison"): st.session_state.current_tab = "comparison" st.rerun() if st.sidebar.button("📤 Export Data", key="nav_export"): st.session_state.current_tab = "export" st.rerun() # Add API test button if st.sidebar.button("🔌 Test API Connection", key="nav_test_api"): st.session_state.current_tab = "test_api" st.rerun() # Display the selected tab content if st.session_state.current_tab == "search": display_etf_search() elif st.session_state.current_tab == "analysis" and "selected_etf" in st.session_state: display_etf_analysis(st.session_state.selected_etf) elif st.session_state.current_tab == "comparison": display_comparison() elif st.session_state.current_tab == "export": display_export() elif st.session_state.current_tab == "test_api": test_api_connection() else: display_etf_search() if __name__ == "__main__": main()