ETF_Suite_Portal/pages/ETF_Analyzer.py

4382 lines
178 KiB
Python

# Set page config first, before any other Streamlit commands
st.set_page_config(
page_title="ETF Analyzer",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
"""
ETF Analyzer - Comprehensive ETF Analysis Tool
This application provides in-depth analysis of ETFs using data from the Financial Modeling Prep API.
It allows users to research, compare, and analyze ETFs before adding them to their portfolio simulations.
"""
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests
import os
import json
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import time
from typing import Dict, List, Tuple, Any, Optional, Union
import sys
import yfinance as yf
from dotenv import load_dotenv
import logging
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FMP API configuration
FMP_API_KEY = st.session_state.get('fmp_api_key', os.getenv('FMP_API_KEY', ''))
FMP_BASE_URL = "https://financialmodelingprep.com/api/v3"
def test_fmp_connection():
"""Test the FMP API connection and display status."""
try:
if not FMP_API_KEY:
return False, "No API key found"
session = get_fmp_session()
test_url = f"{FMP_BASE_URL}/profile/AAPL?apikey={FMP_API_KEY}"
response = session.get(test_url)
if response.status_code == 200:
data = response.json()
if data and isinstance(data, list) and len(data) > 0:
return True, "Connected"
return False, f"Error: {response.status_code}"
except Exception as e:
return False, f"Error: {str(e)}"
# Add FMP connection status to the navigation bar
st.sidebar.markdown("---")
st.sidebar.subheader("FMP API Status")
connection_status, message = test_fmp_connection()
if connection_status:
st.sidebar.success(f"✅ FMP API: {message}")
else:
st.sidebar.error(f"❌ FMP API: {message}")
# --- Constants and Settings ---
CACHE_DIR = Path("cache")
CACHE_EXPIRATION_DAYS = 7
DEFAULT_CHART_HEIGHT = 500
MAX_ETF_COMPARISON = 5
API_RATE_LIMIT_DELAY = 0.5 # seconds between API calls to avoid rate limiting
# --- Initialize Streamlit Page ---
st.set_page_config(
page_title="ETF Analyzer",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# Add navigation in sidebar
with st.sidebar:
st.markdown("### Navigation")
if st.button("🏠 ETF Suite Launcher", key="launcher_analyzer"):
st.switch_page("pages/ETF_Suite_Launcher.py")
if st.button("💼 Portfolio Builder", key="portfolio_analyzer"):
st.switch_page("pages/ETF_Portfolio_Builder.py")
# --- Functions ---
def setup_cache_dir() -> Path:
"""Set up cache directory if it doesn't exist."""
CACHE_DIR.mkdir(exist_ok=True)
return CACHE_DIR
def generate_cache_key(endpoint: str, params: Dict = None) -> str:
"""Generate a unique cache key for a data request."""
if params is None:
params = {}
params_str = json.dumps(params, sort_keys=True)
key_string = f"{endpoint}_{params_str}"
return hashlib.md5(key_string.encode()).hexdigest()
def get_cache_path(cache_key: str) -> Path:
"""Get the file path for a cache key."""
return CACHE_DIR / f"{cache_key}.json"
def save_to_cache(cache_key: str, data: Any) -> None:
"""Save data to cache with timestamp."""
cache_file = get_cache_path(cache_key)
cache_data = {
"data": data,
"timestamp": datetime.now().isoformat()
}
try:
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
except Exception as e:
st.error(f"Error saving to cache: {str(e)}")
def load_from_cache(cache_key: str) -> Tuple[Any, bool]:
"""Load data from cache if it exists and is not expired.
Returns:
Tuple of (data, is_valid)
"""
cache_file = get_cache_path(cache_key)
if not cache_file.exists():
return None, False
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is expired
timestamp = datetime.fromisoformat(cache_data["timestamp"])
if datetime.now() - timestamp > timedelta(days=CACHE_EXPIRATION_DAYS):
return cache_data["data"], False # Expired but usable as fallback
return cache_data["data"], True # Valid cache
except Exception as e:
st.error(f"Error loading from cache: {str(e)}")
return None, False
def fmp_request(endpoint: str, params: Dict = None, force_refresh: bool = False, debug_mode: bool = False) -> Dict:
"""Make a request to the FMP API with caching."""
if params is None:
params = {}
# Get API key
api_key = os.environ.get("FMP_API_KEY")
if not api_key:
api_key = st.session_state.get("fmp_api_key")
if not api_key:
st.error("FMP API key not found. Please enter it in the sidebar.")
return {"error": "API key not found. Please enter a valid API key in the sidebar."}
# Add API key to parameters
params["apikey"] = api_key
# Debug mode - show API request details
if debug_mode:
st.write("API Key (first 4 chars):", api_key[:4] + "..." if api_key and len(api_key) > 4 else "None")
# Generate cache key
cache_key = generate_cache_key(endpoint, params)
# Try to load from cache first if not forcing refresh
if not force_refresh:
cached_data, is_valid = load_from_cache(cache_key)
if is_valid:
if debug_mode:
st.write("✅ Data loaded from cache")
return cached_data
# Make the API request
base_url = "https://financialmodelingprep.com/api/v3"
url = f"{base_url}/{endpoint}"
if debug_mode:
st.write("🌐 Making API request to:", url)
st.write("Parameters:", {k: (v[:4] + "..." if k == "apikey" and v and len(v) > 4 else v) for k, v in params.items()})
try:
# Add timeout to prevent hanging on API requests
response = requests.get(url, params=params, timeout=10)
# Add small delay to avoid rate limiting
time.sleep(API_RATE_LIMIT_DELAY)
if debug_mode:
st.write("📡 API Response Status Code:", response.status_code)
st.write("📡 Response Headers:", dict(response.headers))
if response.status_code == 200:
try:
data = response.json()
if debug_mode:
if isinstance(data, list):
st.write(f"✅ Response is a list with {len(data)} items")
if len(data) > 0:
st.write("First item sample:", list(data[0].keys()) if isinstance(data[0], dict) else data[0])
elif isinstance(data, dict):
st.write(f"✅ Response is a dictionary with {len(data)} keys")
st.write("Keys:", list(data.keys()))
# Check if the response is an empty list or empty object
if isinstance(data, list) and len(data) == 0:
return {"error": "No data available for this request", "empty": True}
elif isinstance(data, dict) and len(data) == 0:
return {"error": "No data available for this request", "empty": True}
# Cache the response
save_to_cache(cache_key, data)
# Track API calls if counter exists
if "api_calls" in st.session_state:
st.session_state.api_calls += 1
return data
except json.JSONDecodeError as e:
error_msg = f"Failed to decode API response as JSON: {str(e)}"
if debug_mode:
st.error(error_msg)
st.write("Raw response:", response.text[:500] + "..." if len(response.text) > 500 else response.text)
return {"error": error_msg}
else:
error_msg = f"API request failed with status {response.status_code}"
if hasattr(response, 'text'):
error_msg += f": {response.text}"
if debug_mode:
st.error(error_msg)
return {"error": error_msg, "status_code": response.status_code}
except requests.exceptions.Timeout:
error_msg = f"API request timed out for endpoint {endpoint}"
if debug_mode:
st.error(error_msg)
return {"error": error_msg, "timeout": True}
except requests.exceptions.RequestException as e:
error_msg = f"API request error: {str(e)}"
if debug_mode:
st.error(error_msg)
return {"error": error_msg}
def get_etf_list(force_refresh: bool = False) -> pd.DataFrame:
"""Get list of all ETFs from FMP API."""
debug_mode = st.session_state.get("debug_mode", False)
etf_list = fmp_request("etf/list", force_refresh=force_refresh, debug_mode=debug_mode)
if isinstance(etf_list, dict) and "error" in etf_list:
if debug_mode:
st.error(f"Error getting ETF list: {etf_list['error']}")
return pd.DataFrame()
if not etf_list:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(etf_list)
# Clean up and add columns
if 'name' in df.columns:
df['name'] = df['name'].str.strip()
return df
def get_etf_profile(symbol: str, force_refresh: bool = False) -> Dict:
"""Get ETF profile information."""
debug_mode = st.session_state.get("debug_mode", False)
profile_data = fmp_request(f"etf/profile/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(profile_data, dict) and "error" in profile_data:
return {"error": profile_data["error"], "symbol": symbol}
if not profile_data or len(profile_data) == 0:
return {"error": f"No profile data available for {symbol}", "symbol": symbol}
return profile_data[0] # Return the first item
def get_etf_holdings(symbol: str, force_refresh: bool = False) -> pd.DataFrame:
"""Get ETF holdings information."""
debug_mode = st.session_state.get("debug_mode", False)
# Try the v3 endpoint first
holdings_data = fmp_request(f"etf/holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(holdings_data, dict) and "error" in holdings_data:
if debug_mode:
st.info(f"Primary ETF holdings endpoint failed: {holdings_data['error']}. Trying alternative...")
# Try alternative endpoint (v3 etf-holdings)
alt_holdings = fmp_request(f"etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# If that fails, try v4 endpoint
if not alt_holdings or isinstance(alt_holdings, dict) and ("error" in alt_holdings or "Error Message" in alt_holdings):
if debug_mode:
st.info("Alternative v3 ETF holdings endpoint failed. Trying v4...")
alt_holdings = fmp_request(f"v4/etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if alt_holdings and not (isinstance(alt_holdings, dict) and ("error" in alt_holdings or "Error Message" in alt_holdings)):
# Process the alternative format
if isinstance(alt_holdings, dict) and "holdings" in alt_holdings:
# v4 format
holdings_list = alt_holdings["holdings"]
if isinstance(holdings_list, list) and len(holdings_list) > 0:
df = pd.DataFrame(holdings_list)
if debug_mode:
st.success("✅ Got ETF holdings from v4 endpoint")
return df
elif isinstance(alt_holdings, list) and len(alt_holdings) > 0:
# Some other format with list
df = pd.DataFrame(alt_holdings)
if debug_mode:
st.success("✅ Got ETF holdings from alternative endpoint")
return df
return pd.DataFrame({"error": ["No holdings data available or not accessible with your API subscription"]})
if not holdings_data or not isinstance(holdings_data, dict) or "etfHoldings" not in holdings_data:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(holdings_data["etfHoldings"])
return df
def get_etf_sector_weightings(symbol: str, force_refresh: bool = False) -> pd.DataFrame:
"""Get ETF sector weightings."""
debug_mode = st.session_state.get("debug_mode", False)
# Try the standard endpoint first
sector_data = fmp_request(f"etf/sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(sector_data, dict) and "error" in sector_data:
if debug_mode:
st.info(f"Primary ETF sector endpoint failed: {sector_data['error']}. Trying alternative...")
# Try alternative endpoint
alt_sectors = fmp_request(f"etf-sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# If that fails, try v4 endpoint
if not alt_sectors or isinstance(alt_sectors, dict) and ("error" in alt_sectors or "Error Message" in alt_sectors):
if debug_mode:
st.info("Alternative v3 ETF sector endpoint failed. Trying v4...")
alt_sectors = fmp_request(f"v4/etf-sector-weightings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if alt_sectors and not (isinstance(alt_sectors, dict) and ("error" in alt_sectors or "Error Message" in alt_sectors)):
# Process the alternative format
if isinstance(alt_sectors, dict) and "sectorWeightings" in alt_sectors:
# v4 format
weightings = alt_sectors["sectorWeightings"]
if isinstance(weightings, list) and len(weightings) > 0:
# Convert list of dicts to a dataframe
df = pd.DataFrame(weightings)
if debug_mode:
st.success("✅ Got ETF sectors from v4 endpoint")
return df
elif isinstance(alt_sectors, list) and len(alt_sectors) > 0:
# Direct list format
df = pd.DataFrame(alt_sectors)
if debug_mode:
st.success("✅ Got ETF sectors from alternative endpoint")
return df
return pd.DataFrame({"error": ["No sector data available or not accessible with your API subscription"]})
if not sector_data:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(sector_data)
return df
def get_etf_dividend_history(symbol: str, force_refresh: bool = False) -> pd.DataFrame:
"""Get ETF dividend history."""
debug_mode = st.session_state.get("debug_mode", False)
dividend_data = fmp_request(f"historical-price-full/stock_dividend/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(dividend_data, dict) and "error" in dividend_data:
return pd.DataFrame({"error": [dividend_data["error"]]})
if not dividend_data or "historical" not in dividend_data:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(dividend_data["historical"])
# Convert date to datetime
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
return df
def is_valid_etf(symbol: str, etf_list_df: pd.DataFrame = None) -> bool:
"""Check if a symbol exists in the ETF list."""
if etf_list_df is None:
# Get ETF list only if not provided
etf_list_df = get_etf_list()
if etf_list_df.empty:
# If we can't get the ETF list, we'll try the profile request directly
return True
# Check if symbol exists in the ETF list
return symbol.upper() in etf_list_df['symbol'].str.upper().values
def get_etf_profile_alternative(symbol: str, force_refresh: bool = False) -> Dict:
"""Get ETF profile information using alternative endpoints.
This function tries multiple approaches to gather ETF data when the primary
etf/profile endpoint is not available in the user's subscription.
"""
debug_mode = st.session_state.get("debug_mode", False)
# Create a profile dictionary to store the data we collect
profile = {
"symbol": symbol,
"name": f"{symbol} ETF", # Default name in case we can't get it
"_source": [] # Track which endpoints provided data
}
# Method 1: Try stock/profile endpoint (sometimes works for ETFs)
stock_profile = fmp_request(f"profile/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if isinstance(stock_profile, list) and len(stock_profile) > 0:
if debug_mode:
st.write("✅ Got ETF data from stock/profile endpoint")
profile["_source"].append("profile/{symbol}")
# Extract data from stock profile
stock_data = stock_profile[0]
profile.update({
"name": stock_data.get("companyName", f"{symbol} ETF"),
"exchange": stock_data.get("exchange", ""),
"currency": stock_data.get("currency", "USD"),
"price": stock_data.get("price", 0),
"changes": stock_data.get("changes", 0),
"changesPercentage": stock_data.get("changesPercentage", 0),
"description": stock_data.get("description", "No description available."),
"industry": stock_data.get("industry", "ETF"),
"website": stock_data.get("website", ""),
"ceo": stock_data.get("ceo", ""),
"sector": stock_data.get("sector", ""),
"ipoDate": stock_data.get("ipoDate", ""),
"image": stock_data.get("image", ""),
"isEtf": True
})
# Method 2: Try quote endpoint for price data
quote_data = fmp_request(f"quote/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if isinstance(quote_data, list) and len(quote_data) > 0:
if debug_mode:
st.write("✅ Got ETF price data from quote endpoint")
profile["_source"].append("quote/{symbol}")
# Extract price data from quote
quote = quote_data[0]
profile.update({
"name": quote.get("name", profile.get("name", f"{symbol} ETF")),
"price": quote.get("price", profile.get("price", 0)),
"changes": quote.get("change", profile.get("changes", 0)),
"changesPercentage": quote.get("changesPercentage", profile.get("changesPercentage", 0)),
"dayLow": quote.get("dayLow", 0),
"dayHigh": quote.get("dayHigh", 0),
"yearLow": quote.get("yearLow", 0),
"yearHigh": quote.get("yearHigh", 0),
"marketCap": quote.get("marketCap", 0),
"priceAvg50": quote.get("priceAvg50", 0),
"priceAvg200": quote.get("priceAvg200", 0),
"volume": quote.get("volume", 0),
"avgVolume": quote.get("avgVolume", 0),
"exchange": quote.get("exchange", profile.get("exchange", "")),
"isEtf": True
})
# Method 3: Try ETF holdings endpoint for additional ETF-specific data
try:
# First try the v3 endpoint
holdings_data = fmp_request(f"etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# If that fails, try the v4 endpoint
if not holdings_data or isinstance(holdings_data, dict) and "Error Message" in holdings_data:
holdings_data = fmp_request(f"v4/etf-holdings/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if holdings_data and not (isinstance(holdings_data, dict) and "Error Message" in holdings_data):
if debug_mode:
st.write("✅ Got ETF holdings data from etf-holdings endpoint")
profile["_source"].append("etf-holdings/{symbol}")
# Extract any ETF-specific data from holdings response
if isinstance(holdings_data, dict) and "etfName" in holdings_data:
profile.update({
"name": holdings_data.get("etfName", profile.get("name", f"{symbol} ETF")),
"assetClass": holdings_data.get("assetClass", ""),
"aum": holdings_data.get("aum", 0),
"expense": holdings_data.get("expense", 0)
})
except Exception as e:
if debug_mode:
st.error(f"Error fetching holdings data: {str(e)}")
# Method 4: Try ETF stats endpoint for additional data (enterprise subscription only)
try:
stats_data = fmp_request(f"etf-statistics/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
if stats_data and not (isinstance(stats_data, dict) and "Error Message" in stats_data):
if debug_mode:
st.write("✅ Got ETF stats data from etf-statistics endpoint")
profile["_source"].append("etf-statistics/{symbol}")
# Extract useful stats if available
if isinstance(stats_data, list) and len(stats_data) > 0:
stats = stats_data[0]
# Extract stats that might be useful
if "totalAssets" in stats and not profile.get("aum"):
profile["aum"] = stats.get("totalAssets", 0)
if "expenseRatio" in stats and not profile.get("expense"):
profile["expense"] = stats.get("expenseRatio", 0) / 100 # Convert from percentage
if "sharesOutstanding" in stats:
profile["sharesOutstanding"] = stats.get("sharesOutstanding", 0)
except Exception as e:
if debug_mode:
st.error(f"Error fetching stats data: {str(e)}")
# Combine endpoint sources for debugging
profile["_source"] = ", ".join(profile["_source"]) if profile["_source"] else "No valid endpoints"
# If we've collected enough data, consider it a valid profile
if "price" in profile and "name" in profile:
return profile
# If we couldn't get enough data, return an error
return {"error": f"No profile data available for {symbol}", "symbol": symbol}
def get_historical_prices(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame:
"""Get historical price data for an ETF.
Args:
symbol: ETF ticker symbol
period: Time period ('1month', '3month', '6month', '1year', '5year', 'max')
force_refresh: Whether to force refresh data from API
Returns:
DataFrame with historical price data
"""
debug_mode = st.session_state.get("debug_mode", False)
# Map period to days for cache key
period_days = {
'1month': 30,
'3month': 90,
'6month': 180,
'1year': 365,
'5year': 1825,
'max': 3650 # Use a large number for max
}
# Generate cache key based on symbol and period
endpoint = f"historical-price-full/{symbol}"
params = {"timeseries": period_days.get(period, 365)}
# Make API request
price_data = fmp_request(endpoint, params=params, force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(price_data, dict) and "error" in price_data:
return pd.DataFrame()
if not price_data or "historical" not in price_data:
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(price_data["historical"])
# Convert date to datetime
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
return df
def calculate_performance_metrics(price_df: pd.DataFrame) -> Dict:
"""Calculate key performance metrics from historical price data."""
if price_df.empty or "date" not in price_df.columns or "close" not in price_df.columns:
return {}
try:
# Sort by date to ensure calculations are correct
price_df = price_df.sort_values("date")
# Calculate returns
price_df["daily_return"] = price_df["close"].pct_change()
# Current price
current_price = price_df["close"].iloc[-1]
# Calculate time-based returns
returns = {}
# Filter for different time periods
last_date = price_df["date"].max()
one_month_ago = last_date - pd.Timedelta(days=30)
three_months_ago = last_date - pd.Timedelta(days=90)
six_months_ago = last_date - pd.Timedelta(days=180)
one_year_ago = last_date - pd.Timedelta(days=365)
# Calculate returns for each period
if len(price_df) > 1:
# 1-month return
month_df = price_df[price_df["date"] >= one_month_ago]
if not month_df.empty and len(month_df) > 1:
returns["1-month"] = (month_df["close"].iloc[-1] / month_df["close"].iloc[0] - 1) * 100
# 3-month return
three_month_df = price_df[price_df["date"] >= three_months_ago]
if not three_month_df.empty and len(three_month_df) > 1:
returns["3-month"] = (three_month_df["close"].iloc[-1] / three_month_df["close"].iloc[0] - 1) * 100
# 6-month return
six_month_df = price_df[price_df["date"] >= six_months_ago]
if not six_month_df.empty and len(six_month_df) > 1:
returns["6-month"] = (six_month_df["close"].iloc[-1] / six_month_df["close"].iloc[0] - 1) * 100
# 1-year return
year_df = price_df[price_df["date"] >= one_year_ago]
if not year_df.empty and len(year_df) > 1:
returns["1-year"] = (year_df["close"].iloc[-1] / year_df["close"].iloc[0] - 1) * 100
# YTD return
ytd_start = pd.Timestamp(last_date.year, 1, 1)
ytd_df = price_df[price_df["date"] >= ytd_start]
if not ytd_df.empty and len(ytd_df) > 1:
returns["YTD"] = (ytd_df["close"].iloc[-1] / ytd_df["close"].iloc[0] - 1) * 100
# Calculate volatility (standard deviation of returns)
if len(price_df) > 30: # Need enough data for meaningful volatility
volatility = price_df["daily_return"].std() * (252 ** 0.5) * 100 # Annualized volatility
else:
volatility = None
# Calculate max drawdown
if len(price_df) > 2:
price_df["cummax"] = price_df["close"].cummax()
price_df["drawdown"] = (price_df["close"] / price_df["cummax"] - 1) * 100
max_drawdown = price_df["drawdown"].min()
else:
max_drawdown = None
# Return all metrics
metrics = {
"current_price": current_price,
"returns": returns,
"volatility": volatility,
"max_drawdown": max_drawdown
}
return metrics
except Exception as e:
st.error(f"Error calculating performance metrics: {str(e)}")
return {}
def get_nav_data(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame:
"""Get historical NAV (Net Asset Value) data for an ETF.
Args:
symbol: ETF ticker symbol
period: Time period ('1month', '3month', '6month', '1year', '5year', 'max')
force_refresh: Whether to force refresh data from API
Returns:
DataFrame with historical NAV data
"""
debug_mode = st.session_state.get("debug_mode", False)
# Map period to days for cache key
period_days = {
'1month': 30,
'3month': 90,
'6month': 180,
'1year': 365,
'5year': 1825,
'max': 3650 # Use a large number for max
}
# Generate endpoint for NAV data
endpoint = f"historical-nav/{symbol}"
params = {"timeseries": period_days.get(period, 365)}
# Make API request
nav_data = fmp_request(endpoint, params=params, force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(nav_data, dict) and "error" in nav_data:
if debug_mode:
st.warning(f"NAV data not available via FMP API for {symbol}. Trying yfinance as fallback.")
# Try to get NAV data using yfinance as fallback
return get_nav_data_from_yfinance(symbol, period, debug_mode)
if not nav_data or "historical" not in nav_data:
if debug_mode:
st.warning(f"NAV data not found in FMP response for {symbol}. Trying yfinance as fallback.")
return get_nav_data_from_yfinance(symbol, period, debug_mode)
# Convert to DataFrame
df = pd.DataFrame(nav_data["historical"])
# Convert date to datetime
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
return df
def get_nav_data_from_yfinance(symbol: str, period: str = '1year', debug_mode: bool = False) -> pd.DataFrame:
"""Get NAV data for an ETF using yfinance as fallback.
Args:
symbol: ETF ticker symbol
period: Time period ('1month', '3month', '6month', '1year', '5year', 'max')
debug_mode: Whether to show debug information
Returns:
DataFrame with NAV data approximation
"""
try:
# Map period to yfinance format
yf_period_map = {
'1month': '1mo',
'3month': '3mo',
'6month': '6mo',
'1year': '1y',
'5year': '5y',
'max': 'max'
}
yf_period = yf_period_map.get(period, '1y')
if debug_mode:
st.write(f"Fetching data for {symbol} using yfinance with period {yf_period}")
# Create Ticker object
ticker = yf.Ticker(symbol)
# First try to get fund info which might contain NAV
info = ticker.info
# Get historical price data (we'll use this as a backup)
hist_data = ticker.history(period=yf_period)
if hist_data.empty:
if debug_mode:
st.warning(f"No historical data found in yfinance for {symbol}")
return pd.DataFrame()
# Prepare the DataFrame and ensure timezone consistency
nav_df = hist_data.reset_index()[['Date', 'Close']].copy()
nav_df.columns = ['date', 'nav']
# Convert date to naive datetime (remove timezone info) for consistency with FMP data
nav_df['date'] = pd.to_datetime(nav_df['date']).dt.tz_localize(None)
# Check if we have intraday NAV data available (indicated by "-IV" suffix)
try:
# Some ETFs have intraday NAV with IV suffix
iv_symbol = f"{symbol}-IV"
iv_ticker = yf.Ticker(iv_symbol)
iv_data = iv_ticker.history(period=yf_period)
if not iv_data.empty:
if debug_mode:
st.success(f"Found Intraday NAV data for {symbol} using {iv_symbol}")
iv_df = iv_data.reset_index()[['Date', 'Close']].copy()
iv_df.columns = ['date', 'nav']
# Remove timezone info for consistency
iv_df['date'] = pd.to_datetime(iv_df['date']).dt.tz_localize(None)
return iv_df
except Exception as e:
if debug_mode:
st.warning(f"Error fetching IV data: {str(e)}")
# If specific NAV data isn't available, use price as a proxy with a note
if debug_mode:
st.info(f"Using price data as proxy for NAV for {symbol}. Note that actual NAV may differ slightly.")
return nav_df
except Exception as e:
if debug_mode:
st.error(f"Error getting NAV data from yfinance: {str(e)}")
return pd.DataFrame()
def get_dividend_yield_history(symbol: str, period: str = '1year', force_refresh: bool = False) -> pd.DataFrame:
"""Get historical dividend yield data for an ETF by combining price and dividend history.
Args:
symbol: ETF ticker symbol
period: Time period ('1month', '3month', '6month', '1year', '5year', 'max')
force_refresh: Whether to force refresh data from API
Returns:
DataFrame with historical yield data
"""
debug_mode = st.session_state.get("debug_mode", False)
# Get dividend history
dividend_data = get_etf_dividend_history(symbol, force_refresh=force_refresh)
# Get price history
price_data = get_historical_prices(symbol, period=period, force_refresh=force_refresh)
if dividend_data.empty or price_data.empty:
return pd.DataFrame()
try:
# Make sure dates are in datetime format
dividend_data["date"] = pd.to_datetime(dividend_data["date"])
price_data["date"] = pd.to_datetime(price_data["date"])
# Sort both dataframes by date
dividend_data = dividend_data.sort_values("date")
price_data = price_data.sort_values("date")
# Filter dividend data to match our period
start_date = price_data["date"].min()
dividend_data = dividend_data[dividend_data["date"] >= start_date]
if dividend_data.empty:
return pd.DataFrame()
# Calculate TTM (trailing twelve month) dividend at each point in time
result_df = pd.DataFrame()
# For each price data point
for date, row in price_data.iterrows():
price_date = row["date"]
price = row["close"]
# Get dividends in the previous 12 months
one_year_before = price_date - pd.Timedelta(days=365)
ttm_dividends = dividend_data[(dividend_data["date"] > one_year_before) &
(dividend_data["date"] <= price_date)]
ttm_dividend_sum = ttm_dividends["dividend"].sum()
# Calculate yield
if price > 0:
dividend_yield = (ttm_dividend_sum / price) * 100
else:
dividend_yield = 0
# Add to result dataframe
result_df = pd.concat([result_df, pd.DataFrame({
"date": [price_date],
"price": [price],
"ttm_dividend": [ttm_dividend_sum],
"dividend_yield": [dividend_yield]
})])
return result_df
except Exception as e:
if debug_mode:
st.error(f"Error calculating dividend yield history: {str(e)}")
return pd.DataFrame()
def calculate_nav_premium_discount(price_df: pd.DataFrame, nav_df: pd.DataFrame) -> pd.DataFrame:
"""Calculate premium/discount of price to NAV.
Args:
price_df: DataFrame with price history
nav_df: DataFrame with NAV history
Returns:
DataFrame with premium/discount data
"""
if price_df.empty or nav_df.empty:
return pd.DataFrame()
try:
# Create copies to avoid modifying original dataframes
price_df_copy = price_df.copy()
nav_df_copy = nav_df.copy()
# Convert dates to datetime and remove timezone information
price_df_copy["date"] = pd.to_datetime(price_df_copy["date"]).dt.tz_localize(None)
nav_df_copy["date"] = pd.to_datetime(nav_df_copy["date"]).dt.tz_localize(None)
# Merge price and NAV data on date
merged_df = pd.merge(
price_df_copy[["date", "close"]],
nav_df_copy[["date", "nav"]],
on="date",
how="inner"
)
if merged_df.empty:
return pd.DataFrame()
# Calculate premium/discount as percentage
merged_df["premium_discount"] = ((merged_df["close"] / merged_df["nav"]) - 1) * 100
return merged_df
except Exception as e:
st.error(f"Error calculating NAV premium/discount: {str(e)}")
st.info("Debug info: This error often occurs due to timezone differences in date formats. The application will try to handle this automatically.")
# Alternative approach using concat if merge fails
try:
# Create a unique identifier for each date (as string)
price_df_copy = price_df.copy()
nav_df_copy = nav_df.copy()
# Convert to string format (YYYY-MM-DD) to eliminate timezone issues
price_df_copy["date_str"] = pd.to_datetime(price_df_copy["date"]).dt.strftime("%Y-%m-%d")
nav_df_copy["date_str"] = pd.to_datetime(nav_df_copy["date"]).dt.strftime("%Y-%m-%d")
# Prepare dataframes with consistent column names
price_data = price_df_copy[["date_str", "close"]].rename(columns={"date_str": "date"})
nav_data = nav_df_copy[["date_str", "nav"]].rename(columns={"date_str": "date"})
# Use concat and groupby as an alternative to merge
combined = pd.concat([price_data, nav_data])
result = combined.groupby("date").agg({"close": "first", "nav": "first"}).reset_index()
# Filter to keep only rows with both price and NAV data
result = result.dropna()
if not result.empty:
# Calculate premium/discount
result["premium_discount"] = ((result["close"] / result["nav"]) - 1) * 100
# Convert date back to datetime for consistency
result["date"] = pd.to_datetime(result["date"])
return result
except Exception as fallback_error:
st.error(f"Alternative approach also failed: {str(fallback_error)}")
return pd.DataFrame()
def calculate_yield_erosion(yield_df: pd.DataFrame) -> Dict:
"""Calculate yield erosion metrics from historical yield data.
Args:
yield_df: DataFrame with historical yield data
Returns:
Dictionary with yield erosion metrics
"""
if yield_df.empty:
return {}
try:
# Make sure the DataFrame is sorted by date
yield_df = yield_df.sort_values("date")
# Calculate metrics
current_yield = yield_df["dividend_yield"].iloc[-1]
# Calculate average yields for different periods
last_date = yield_df["date"].max()
# Define time periods
periods = {
"1_month": 30,
"3_month": 90,
"6_month": 180,
"1_year": 365
}
# Calculate average yield for each period
avg_yields = {}
for period_name, days in periods.items():
period_start = last_date - pd.Timedelta(days=days)
period_data = yield_df[yield_df["date"] >= period_start]
if not period_data.empty:
avg_yields[period_name] = period_data["dividend_yield"].mean()
# Calculate yield erosion (current vs averages)
yield_erosion = {}
for period_name, avg_yield in avg_yields.items():
if avg_yield > 0: # Avoid division by zero
erosion = ((current_yield / avg_yield) - 1) * 100
yield_erosion[period_name] = erosion
# Calculate yield volatility
yield_volatility = yield_df["dividend_yield"].std()
return {
"current_yield": current_yield,
"avg_yields": avg_yields,
"yield_erosion": yield_erosion,
"yield_volatility": yield_volatility
}
except Exception as e:
st.error(f"Error calculating yield erosion: {str(e)}")
return {}
def get_institutional_ownership(symbol: str, force_refresh: bool = False) -> pd.DataFrame:
"""Get institutional ownership data for an ETF.
Args:
symbol: ETF ticker symbol
force_refresh: Whether to force refresh data from API
Returns:
DataFrame with institutional ownership data
"""
debug_mode = st.session_state.get("debug_mode", False)
# Generate endpoint for institutional holders
endpoint = f"institutional-holder/{symbol}"
# Make API request
holders_data = fmp_request(endpoint, force_refresh=force_refresh, debug_mode=debug_mode)
# Check for error in response
if isinstance(holders_data, dict) and "error" in holders_data:
if debug_mode:
st.warning(f"Institutional ownership data not available for {symbol}")
return pd.DataFrame()
if not holders_data or not isinstance(holders_data, list):
return pd.DataFrame()
# Convert to DataFrame
df = pd.DataFrame(holders_data)
# Add percentage column if not present
if "percentage" not in df.columns and "sharesHeld" in df.columns and "sharesOutstanding" in df.columns:
# Calculate percentage based on shares held and shares outstanding
df["percentage"] = (df["sharesHeld"] / df["sharesOutstanding"]) * 100
return df
def get_dividend_calendar(symbol: str, force_refresh: bool = False) -> pd.DataFrame:
"""Get dividend distribution calendar for an ETF.
Args:
symbol: ETF ticker symbol
force_refresh: Whether to force refresh data from API
Returns:
DataFrame with dividend calendar data
"""
debug_mode = st.session_state.get("debug_mode", False)
# First get dividend history
dividend_history = get_etf_dividend_history(symbol, force_refresh=force_refresh)
if dividend_history.empty:
return pd.DataFrame()
try:
# Convert date to datetime if not already
if "date" in dividend_history.columns:
dividend_history["date"] = pd.to_datetime(dividend_history["date"])
# Extract month and day from dates
dividend_history["month"] = dividend_history["date"].dt.month
dividend_history["day"] = dividend_history["date"].dt.day
dividend_history["year"] = dividend_history["date"].dt.year
# Create month name for display
dividend_history["month_name"] = dividend_history["date"].dt.strftime("%B")
# Calculate frequency metrics
month_counts = dividend_history["month"].value_counts()
most_common_months = month_counts.index.tolist()
# Determine distribution pattern
if len(dividend_history) >= 4:
# Look at intervals between payments
dividend_history = dividend_history.sort_values("date")
dividend_history["days_since_last"] = dividend_history["date"].diff().dt.days
# Calculate average interval
avg_interval = dividend_history["days_since_last"].mean()
if avg_interval is not None and not pd.isna(avg_interval):
if 25 <= avg_interval <= 35:
pattern = "Monthly"
elif 85 <= avg_interval <= 95:
pattern = "Quarterly"
elif 175 <= avg_interval <= 185:
pattern = "Semi-Annual"
elif 350 <= avg_interval <= 380:
pattern = "Annual"
else:
pattern = "Irregular"
else:
pattern = "Insufficient data"
else:
pattern = "Insufficient data"
# Add pattern to the dataframe
dividend_history["distribution_pattern"] = pattern
return dividend_history
except Exception as e:
if debug_mode:
st.error(f"Error processing dividend calendar data: {str(e)}")
return pd.DataFrame()
def calculate_risk_adjusted_metrics(price_df: pd.DataFrame, risk_free_rate: float = 0.05) -> Dict:
"""Calculate risk-adjusted performance metrics.
Args:
price_df: DataFrame with price history
risk_free_rate: Annualized risk-free rate (default: 5%)
Returns:
Dictionary with risk-adjusted metrics
"""
if price_df.empty or "date" not in price_df.columns or "close" not in price_df.columns:
return {}
try:
# Sort by date to ensure calculations are correct
price_df = price_df.sort_values("date")
# Calculate daily returns
price_df["daily_return"] = price_df["close"].pct_change()
# Remove NaN values
returns = price_df["daily_return"].dropna()
if len(returns) < 30: # Need sufficient data
return {}
# Calculate annualized return
total_days = (price_df["date"].max() - price_df["date"].min()).days
if total_days <= 0:
total_days = len(returns) # Fallback if dates are incorrect
# Annualization factor
annual_factor = 252 / total_days * len(returns)
# Calculate metrics
mean_daily_return = returns.mean()
std_daily_return = returns.std()
# Annualize returns and volatility
annualized_return = (1 + mean_daily_return) ** 252 - 1
annualized_volatility = std_daily_return * (252 ** 0.5)
# Convert annual risk-free rate to daily
daily_rf = (1 + risk_free_rate) ** (1/252) - 1
# Calculate Sharpe Ratio
if annualized_volatility != 0:
sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility
else:
sharpe_ratio = None
# Calculate Sortino Ratio (only considers downside volatility)
downside_returns = returns[returns < 0]
if len(downside_returns) > 0:
downside_volatility = downside_returns.std() * (252 ** 0.5)
if downside_volatility != 0:
sortino_ratio = (annualized_return - risk_free_rate) / downside_volatility
else:
sortino_ratio = None
else:
sortino_ratio = None
# Calculate maximum drawdown
price_df["cummax"] = price_df["close"].cummax()
price_df["drawdown"] = (price_df["close"] / price_df["cummax"] - 1)
max_drawdown = price_df["drawdown"].min()
# Calculate Calmar Ratio (return / max drawdown)
if max_drawdown != 0:
calmar_ratio = annualized_return / abs(max_drawdown)
else:
calmar_ratio = None
# Return all metrics
return {
"sharpe_ratio": sharpe_ratio,
"sortino_ratio": sortino_ratio,
"calmar_ratio": calmar_ratio,
"annualized_return": annualized_return * 100, # Convert to percentage
"annualized_volatility": annualized_volatility * 100, # Convert to percentage
"max_drawdown": max_drawdown * 100 # Convert to percentage
}
except Exception as e:
st.error(f"Error calculating risk-adjusted metrics: {str(e)}")
return {}
def display_whale_analysis(symbol: str):
"""Display institutional ownership (whale investors) analysis."""
st.subheader(f"Major Institutional Holders (Whale Analysis)")
with st.spinner("Loading institutional ownership data..."):
holders_df = get_institutional_ownership(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
if holders_df.empty:
st.warning("Institutional ownership data not available for this ETF.")
st.info("⚠️ PREMIUM API FEATURE: Institutional ownership data (major holders, ownership percentages, etc.) requires the Enterprise tier of the FMP API subscription.")
st.info("This premium data provides valuable insights into which institutions hold significant positions in this ETF and how concentrated the ownership is.")
return
# Calculate total percentage owned by institutions
if "percentage" in holders_df.columns:
total_institutional = holders_df["percentage"].sum()
st.metric("Total Institutional Ownership", f"{total_institutional:.2f}%")
# Check for concentration
if len(holders_df) > 0 and "percentage" in holders_df.columns:
# Sort by percentage owned
holders_df = holders_df.sort_values("percentage", ascending=False)
# Get top 5 holders
top_holders = holders_df.head(5)
# Calculate concentration metrics
top_5_pct = top_holders["percentage"].sum()
# Display concentration metrics
col1, col2 = st.columns(2)
with col1:
st.metric("Top 5 Holders Concentration", f"{top_5_pct:.2f}%")
with col2:
st.metric("Number of Institutional Holders", f"{len(holders_df)}")
# Show top holders
st.subheader("Top Institutional Holders")
# Format the display dataframe
display_df = top_holders.copy()
# Rename columns for better display
column_mapping = {
"holder": "Holder",
"shares": "Shares",
"sharesHeld": "Shares Held",
"dateReported": "Date Reported",
"percentage": "Percentage"
}
display_df = display_df.rename(columns={k: v for k, v in column_mapping.items() if k in display_df.columns})
# Format percentage column
if "Percentage" in display_df.columns:
display_df["Percentage"] = display_df["Percentage"].apply(lambda x: f"{x:.2f}%")
# Display the table
st.dataframe(display_df, use_container_width=True)
# Create visualization
if "percentage" in holders_df.columns:
st.subheader("Ownership Distribution")
# Calculate others category
if len(holders_df) > 5:
others_pct = holders_df.iloc[5:]["percentage"].sum()
pie_data = top_holders.copy()
# Use pd.concat instead of append (which is deprecated)
others_df = pd.DataFrame([{"holder": "Others", "percentage": others_pct}])
pie_data = pd.concat([pie_data, others_df], ignore_index=True)
else:
pie_data = top_holders.copy()
# Create pie chart
fig = px.pie(
pie_data,
names="holder",
values="percentage",
title=f"Institutional Ownership Distribution for {symbol}"
)
st.plotly_chart(fig, use_container_width=True)
def display_dividend_calendar(symbol: str):
"""Display dividend distribution calendar."""
st.subheader(f"Dividend Distribution Calendar")
with st.spinner("Loading dividend data..."):
dividend_df = get_dividend_calendar(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
if dividend_df.empty:
st.warning("Dividend data not available for this ETF.")
st.info("This ETF may not pay dividends, or dividend history may be limited.")
return
# Check if we have distribution pattern
if "distribution_pattern" in dividend_df.columns:
pattern = dividend_df["distribution_pattern"].iloc[0]
st.metric("Distribution Pattern", pattern)
# Get last few years of data
current_year = datetime.now().year
recent_years = sorted(list(set(dividend_df["year"])))[-3:] # Last 3 years
# Filter to recent years
recent_df = dividend_df[dividend_df["year"].isin(recent_years)]
if not recent_df.empty:
# Calculate average dividend by month
monthly_avg = recent_df.groupby("month")["dividend"].mean().reset_index()
monthly_avg["month_name"] = monthly_avg["month"].apply(lambda x: datetime(2000, x, 1).strftime("%B"))
# Sort by month
monthly_avg = monthly_avg.sort_values("month")
# Create bar chart of monthly distributions
st.subheader("Dividend Distribution by Month")
fig = px.bar(
monthly_avg,
x="month_name",
y="dividend",
title=f"Average Dividend Distribution by Month ({', '.join(map(str, recent_years))})",
labels={"month_name": "Month", "dividend": "Dividend Amount ($)"}
)
st.plotly_chart(fig, use_container_width=True)
# Create calendar heatmap
st.subheader("Dividend Distribution Calendar")
# Prepare data for the calendar view
calendar_df = recent_df.copy()
# Add month-year field for grouping
calendar_df["month_year"] = calendar_df["date"].dt.strftime("%b %Y")
# Create year and month columns for the heatmap
calendar_data = calendar_df.groupby(["year", "month"]).agg({
"dividend": "sum"
}).reset_index()
try:
# Create a complete month-year grid with all possible combinations
all_months = list(range(1, 13))
all_years = sorted(calendar_data["year"].unique())
# Ensure we have a complete grid by reindexing
try:
# First create the pivot table
pivot_data = calendar_data.pivot_table(
index="month",
columns="year",
values="dividend",
fill_value=0 # Fill missing values with 0
)
# Reindex to ensure all 12 months are included
pivot_data = pivot_data.reindex(all_months, fill_value=0)
except Exception as pivot_error:
st.warning(f"Error creating dividend calendar pivot: {str(pivot_error)}")
# Create an empty DataFrame with the correct structure as a fallback
pivot_data = pd.DataFrame(0, index=all_months, columns=all_years)
# Get month labels
month_labels = [datetime(2000, i, 1).strftime("%b") for i in range(1, 13)]
# Create heatmap
fig = px.imshow(
pivot_data,
labels=dict(x="Year", y="Month", color="Dividend Amount"),
x=pivot_data.columns.tolist(), # Use actual columns from pivot
y=month_labels,
aspect="auto",
title="Dividend Distribution Calendar"
)
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"Unable to generate dividend calendar heatmap: {str(e)}")
st.info("This could be due to limited dividend data. Try selecting an ETF with more dividend history.")
# Show dividend history table
st.subheader("Dividend History")
# Format the display dataframe
display_df = dividend_df.sort_values("date", ascending=False)[["date", "dividend"]].copy()
display_df["date"] = display_df["date"].dt.strftime("%Y-%m-%d")
display_df.columns = ["Date", "Dividend Amount ($)"]
st.dataframe(display_df, use_container_width=True)
def display_risk_adjusted_metrics(symbol: str, period_value: str, selected_period: str):
"""Display risk-adjusted performance metrics."""
st.subheader(f"Risk-Adjusted Performance")
with st.spinner("Calculating risk metrics..."):
# Get historical prices
prices = get_historical_prices(
symbol,
period=period_value,
force_refresh=st.session_state.get("force_refresh", False)
)
if prices.empty:
st.warning("Insufficient price data to calculate risk metrics.")
return
# Get risk-free rate (could be fetched from an API in a real app)
risk_free_rate = 0.05 # Default to 5%
# Calculate metrics
metrics = calculate_risk_adjusted_metrics(prices, risk_free_rate)
if not metrics:
st.warning("Unable to calculate risk metrics with available data.")
return
# Display metrics in columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
"Sharpe Ratio",
f"{metrics.get('sharpe_ratio', 0):.2f}",
help="Return per unit of risk (higher is better). Values above 1.0 are good."
)
with col2:
st.metric(
"Sortino Ratio",
f"{metrics.get('sortino_ratio', 0):.2f}",
help="Return per unit of downside risk (higher is better)"
)
with col3:
st.metric(
"Calmar Ratio",
f"{metrics.get('calmar_ratio', 0):.2f}",
help="Return relative to maximum drawdown (higher is better)"
)
# Create chart comparing metrics
metrics_df = pd.DataFrame({
"Metric": ["Annualized Return", "Annualized Volatility", "Maximum Drawdown"],
"Value": [
metrics.get("annualized_return", 0),
metrics.get("annualized_volatility", 0),
abs(metrics.get("max_drawdown", 0))
]
})
st.subheader("Risk-Return Profile")
fig = px.bar(
metrics_df,
x="Metric",
y="Value",
title=f"Risk-Return Profile ({selected_period})",
labels={"Value": "Percentage (%)"}
)
st.plotly_chart(fig, use_container_width=True)
# Add explanation
st.caption("""
**Interpreting Risk Metrics:**
- **Sharpe Ratio**: Measures excess return per unit of risk. Higher values are better.
- **Sortino Ratio**: Like Sharpe ratio, but only considers downside risk. Higher values are better.
- **Calmar Ratio**: Measures return relative to maximum drawdown. Higher values indicate better risk-adjusted performance.
""")
def display_etf_analysis(symbol: str):
"""Display comprehensive ETF analysis."""
st.header(f"📊 ETF Analysis: {symbol}")
# First check if this is a valid ETF
etf_df = None
if "etf_list_df" in st.session_state:
etf_df = st.session_state.etf_list_df
if not is_valid_etf(symbol, etf_df):
st.error(f"{symbol} does not appear to be a valid ETF ticker in our database. Please select a different ticker.")
st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search"))
return
# Get ETF profile
with st.spinner("Loading ETF data..."):
# First try the standard endpoint
debug_mode = st.session_state.get("debug_mode", False)
profile = get_etf_profile(symbol, force_refresh=st.session_state.get("force_refresh", False))
# If standard endpoint fails, try alternative approach
if isinstance(profile, dict) and "error" in profile:
if debug_mode:
st.info("Primary ETF profile endpoint failed. Trying alternative methods...")
profile = get_etf_profile_alternative(symbol, force_refresh=st.session_state.get("force_refresh", False))
# Check for error in profile
if isinstance(profile, dict) and "error" in profile:
st.error(f"Failed to load profile data for {symbol}: {profile['error']}")
st.info("This could be due to an invalid API key, the ETF not being available in the FMP database, or a temporary API issue.")
st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search"))
return
if not profile:
st.error(f"Failed to load profile data for {symbol}. Please check your API key.")
st.button("← Back to Search", on_click=lambda: setattr(st.session_state, "current_tab", "search"))
return
# Display profile information
col1, col2 = st.columns([3, 1])
with col1:
st.subheader(profile.get("name", ""))
st.write(profile.get("description", "No description available."))
with col2:
# Create metrics card
st.metric("Price", f"${profile.get('price', 0):.2f}")
if "aum" in profile:
st.metric("AUM", f"${profile.get('aum', 0) / 1e9:.2f}B")
if "expense" in profile:
st.metric("Expense Ratio", f"{profile.get('expense', 0) * 100:.2f}%")
# Create tabs for different types of analysis
tabs = st.tabs([
"Overview",
"Holdings",
"Sector Allocation",
"Dividend History",
"Dividend Sustainability",
"Institutional Ownership",
"ESG Scores"
])
# Overview Tab
with tabs[0]:
# Add explanation in an expander
with st.expander("📚 Understanding the Overview Tab", expanded=False):
st.markdown("""
### ETF Overview Explanation
This tab provides essential background information about the ETF, helping you understand its basic characteristics and investment focus.
**Key metrics to examine:**
- **ETF Details**: Basic information like exchange, currency, AUM (Assets Under Management), and expense ratio
- **Expense Ratio**: Lower is generally better; this directly impacts your returns (e.g., 0.03% vs 0.30% means 0.27% more return annually)
- **AUM (Assets Under Management)**: Larger funds tend to have better liquidity and smaller bid-ask spreads
- **Price Information**: Recent price movements and trading ranges
**Why this matters**: Understanding the ETF's focus, size, and cost structure helps determine if it aligns with your investment goals and provides a foundation for deeper analysis.
""")
overview_col1, overview_col2 = st.columns(2)
with overview_col1:
st.subheader("ETF Details")
# Prepare details dictionary with data that might be available
details = {
"Symbol": profile.get("symbol", ""),
"Name": profile.get("name", ""),
"Exchange": profile.get("exchange", ""),
"Currency": profile.get("currency", "")
}
# Add fields if they exist
if "aum" in profile and profile["aum"]:
details["AUM"] = f"${profile.get('aum', 0) / 1e9:.2f}B" if profile.get("aum", 0) > 1e6 else f"${profile.get('aum', 0) / 1e6:.2f}M"
if "expense" in profile and profile["expense"] is not None:
details["Expense Ratio"] = f"{profile.get('expense', 0) * 100:.2f}%"
if "pe" in profile and profile["pe"] is not None and profile["pe"] != 0:
details["PE Ratio"] = profile.get("pe", "N/A")
if "sharesOutstanding" in profile and profile["sharesOutstanding"]:
details["Shares Outstanding"] = f"{profile.get('sharesOutstanding', 0) / 1e6:.2f}M"
if "ipoDate" in profile and profile["ipoDate"]:
details["IPO Date"] = profile.get("ipoDate", "N/A")
# Add alternative fields that might be available from other endpoints
if "assetClass" in profile:
details["Asset Class"] = profile.get("assetClass", "")
if "sector" in profile:
details["Sector"] = profile.get("sector", "")
if "industry" in profile:
details["Industry"] = profile.get("industry", "")
if "marketCap" in profile and profile["marketCap"]:
details["Market Cap"] = f"${profile.get('marketCap', 0) / 1e9:.2f}B" if profile.get("marketCap", 0) > 1e9 else f"${profile.get('marketCap', 0) / 1e6:.2f}M"
if "volume" in profile and profile["volume"]:
details["Volume"] = f"{profile.get('volume', 0):,}"
if "avgVolume" in profile and profile["avgVolume"]:
details["Avg Volume"] = f"{profile.get('avgVolume', 0):,}"
# Convert to DataFrame for display
details_df = pd.DataFrame(list(details.items()), columns=["Metric", "Value"])
st.dataframe(details_df, use_container_width=True, hide_index=True)
# Data source disclaimer
st.caption("Note: Some fields may be unavailable based on your API subscription level.")
with overview_col2:
st.subheader("Price Information")
price_data = {}
# Add price data if available
if "price" in profile:
price_data["Price"] = f"${profile.get('price', 0):.2f}"
if "changes" in profile:
price_data["Change"] = f"{profile.get('changes', 0):.2f}"
if "changesPercentage" in profile:
price_data["Change %"] = f"{profile.get('changesPercentage', 0):.2f}%"
if "dayLow" in profile and profile["dayLow"]:
price_data["Day Low"] = f"${profile.get('dayLow', 0):.2f}"
if "dayHigh" in profile and profile["dayHigh"]:
price_data["Day High"] = f"${profile.get('dayHigh', 0):.2f}"
if "yearLow" in profile and profile["yearLow"]:
price_data["Year Low"] = f"${profile.get('yearLow', 0):.2f}"
if "yearHigh" in profile and profile["yearHigh"]:
price_data["Year High"] = f"${profile.get('yearHigh', 0):.2f}"
if "priceAvg50" in profile and profile["priceAvg50"]:
price_data["50-Day Avg"] = f"${profile.get('priceAvg50', 0):.2f}"
if "priceAvg200" in profile and profile["priceAvg200"]:
price_data["200-Day Avg"] = f"${profile.get('priceAvg200', 0):.2f}"
# If no price data available, show a message
if not price_data:
st.info("Price information is not available with your current API subscription.")
else:
# Convert to DataFrame for display
price_df = pd.DataFrame(list(price_data.items()), columns=["Metric", "Value"])
st.dataframe(price_df, use_container_width=True, hide_index=True)
# If we have an image URL, display it
if "image" in profile and profile["image"]:
st.image(profile["image"], width=150)
# Show source of data
st.caption("Data from Financial Modeling Prep API")
# Show the data source endpoints used
if st.session_state.get("debug_mode", False):
st.write("Data Source:")
if profile.get("_source", None):
st.code(profile["_source"])
else:
st.code("etf/profile endpoint")
# Holdings Tab
with tabs[1]:
# Add explanation in an expander
with st.expander("📚 Understanding the Holdings Tab", expanded=False):
st.markdown("""
### ETF Holdings Explanation
This tab shows you exactly what the ETF owns - the individual stocks, bonds, or other assets that make up the fund.
**Key aspects to analyze:**
- **Top Holdings**: The largest positions in the ETF, which have the most influence on performance
- **Concentration**: If the top 10 holdings make up a large percentage (>50%), the ETF is highly concentrated
- **Individual Securities**: Review the specific companies/assets to ensure they align with your investment thesis
- **Weight Distribution**: How evenly the ETF spreads its investments across different securities
**Why this matters**: Understanding what the ETF actually owns helps you assess its true exposure and risk profile. A technology ETF might own different types of tech companies (software, hardware, services) with varying risk profiles.
""")
with st.spinner("Loading holdings data..."):
holdings = get_etf_holdings(symbol, force_refresh=st.session_state.get("force_refresh", False))
if not holdings.empty:
# Check for error
if "error" in holdings.columns:
st.warning("Unable to load holdings data")
st.error(holdings["error"].iloc[0])
# Ensure we have the necessary columns
elif all(col in holdings.columns for col in ["asset", "weightPercentage"]):
st.subheader("Top Holdings")
# Sort by weight
holdings = holdings.sort_values("weightPercentage", ascending=False)
# Display top 10 holdings
top_holdings = holdings.head(10)
# Create bar chart
fig = px.bar(
top_holdings,
x="asset",
y="weightPercentage",
title=f"Top 10 Holdings for {symbol}",
labels={"asset": "Asset", "weightPercentage": "Weight (%)"}
)
st.plotly_chart(fig, use_container_width=True)
# Display full holdings table
st.subheader("All Holdings")
st.dataframe(
holdings,
use_container_width=True,
height=400
)
else:
st.warning("Holdings data is incomplete or in an unexpected format.")
else:
st.warning("No holdings data available for this ETF.")
st.info("⚠️ PREMIUM API FEATURE: Detailed holdings data typically requires a paid FMP API subscription. Consider upgrading your plan to access this information.")
# Sector Allocation Tab
with tabs[2]:
# Add explanation in an expander
with st.expander("📚 Understanding the Sector Allocation Tab", expanded=False):
st.markdown("""
### Sector Allocation Explanation
This tab breaks down the ETF's investments by economic sectors, showing you where the fund is most heavily invested.
**What to look for:**
- **Dominant Sectors**: Sectors with the largest allocations will have the greatest impact on performance
- **Diversification**: How broadly the ETF spreads investments across different sectors
- **Sector Bias**: Whether the ETF is overweight in certain sectors compared to the broader market
- **Alignment with Economic Outlook**: Consider if the sector weightings align with your economic outlook (e.g., overweight technology during tech boom)
**Why this matters**: Sector exposure is a key driver of returns and risks. During different economic cycles, sectors perform differently - technology might outperform during innovation booms, while utilities and consumer staples often do better during recessions.
**Example interpretation**: An ETF with 40% technology exposure will behave very differently from one with 40% utilities exposure.
""")
with st.spinner("Loading sector data..."):
sectors = get_etf_sector_weightings(symbol, force_refresh=st.session_state.get("force_refresh", False))
if not sectors.empty:
# Check for error
if "error" in sectors.columns:
st.warning("Unable to load sector allocation data")
st.error(sectors["error"].iloc[0])
else:
st.subheader("Sector Allocation")
# Create pie chart
fig = px.pie(
sectors,
names=sectors.columns[0],
values=sectors.columns[1],
title=f"Sector Allocation for {symbol}"
)
st.plotly_chart(fig, use_container_width=True)
# Display sector table with fixed height to prevent double scrolling
st.dataframe(
sectors,
use_container_width=True,
height=300
)
else:
st.warning("No sector weighting data available for this ETF.")
st.info("⚠️ PREMIUM API FEATURE: Sector allocation data typically requires a paid FMP API subscription. Consider upgrading your plan to access this information.")
# Dividend History Tab
with tabs[3]:
# Add explanation in an expander
with st.expander("📚 Understanding the Dividend History Tab", expanded=False):
st.markdown("""
### Dividend History Explanation
This tab shows the ETF's historical dividend payments, helping you understand its income generation capabilities.
**Key metrics to analyze:**
- **TTM (Trailing Twelve Month) Dividend**: Total dividends paid over the past year
- **Dividend Yield**: Annual dividend as a percentage of current price
- **Payment Trend**: Whether dividends are stable, growing, or declining over time
- **Payment Frequency**: How often dividends are paid (monthly, quarterly, etc.)
**Why this matters**:
- For income investors, consistent and growing dividends are crucial
- Dividend history reveals the ETF's income reliability and growth potential
- Sudden drops in dividends may indicate underlying problems with the ETF's holdings
**Interpreting the data**: Look for steady or increasing dividend payments over time. Declining dividends might signal financial stress in the underlying holdings. Also check if dividend amounts are consistent or vary significantly between payments.
""")
with st.spinner("Loading dividend history..."):
dividends = get_etf_dividend_history(symbol, force_refresh=st.session_state.get("force_refresh", False))
if not dividends.empty:
# Check for error
if "error" in dividends.columns:
st.warning("Unable to load dividend history data")
st.error(dividends["error"].iloc[0])
# Ensure we have the necessary columns
elif all(col in dividends.columns for col in ["date", "dividend"]):
st.subheader("Dividend History")
# Sort by date
dividends = dividends.sort_values("date")
# Calculate TTM dividend
if len(dividends) > 0:
current_date = datetime.now()
one_year_ago = current_date - timedelta(days=365)
ttm_dividends = dividends[dividends["date"] >= pd.Timestamp(one_year_ago)]
ttm_dividend_sum = ttm_dividends["dividend"].sum()
st.metric("TTM Dividend", f"${ttm_dividend_sum:.2f}")
if "price" in profile:
dividend_yield = (ttm_dividend_sum / profile["price"]) * 100
st.metric("Dividend Yield", f"{dividend_yield:.2f}%")
# Create line chart
fig = px.line(
dividends,
x="date",
y="dividend",
title=f"Dividend History for {symbol}",
labels={"date": "Date", "dividend": "Dividend Amount ($)"}
)
st.plotly_chart(fig, use_container_width=True)
# Display dividend table
st.dataframe(
dividends.sort_values("date", ascending=False),
use_container_width=True,
height=400
)
else:
st.warning("Dividend data is incomplete or in an unexpected format.")
else:
st.warning("No dividend history available for this ETF.")
# Dividend Sustainability Tab
with tabs[4]:
# Add explanation in an expander
with st.expander("📚 Understanding Dividend Sustainability", expanded=False):
st.markdown("""
### Dividend Sustainability Explanation
This tab analyzes how sustainable the ETF's dividend payments are likely to be in the future.
**Key metrics analyzed:**
- **Sustainability Score**: Overall assessment of how likely dividends can be maintained or grown
- **Payout Ratio**: Percentage of earnings paid as dividends (lower is generally more sustainable)
- **Dividend Growth Rate**: How quickly dividends have increased over time
- **Growth Consistency**: How reliable the dividend increases have been
**How to interpret the ratings:**
- **Highly Sustainable (80-100)**: Strong fundamentals supporting continued dividend growth
- **Sustainable (60-80)**: Good prospects for maintaining current dividends
- **Moderately Sustainable (40-60)**: May maintain dividends but growth potential is limited
- **Questionable (20-40)**: Risk of dividend cuts if economic conditions worsen
- **Unsustainable (<20)**: High probability of dividend reduction
**Why this matters**: Investors relying on dividend income need to assess not just current yield, but the likelihood that those dividends will continue or grow in the future. High yields sometimes come with high risk of cuts.
""")
display_dividend_sustainability(symbol)
# Institutional Ownership Tab
with tabs[5]:
# Add explanation in an expander
with st.expander("📚 Understanding Institutional Ownership", expanded=False):
st.markdown("""
### Institutional Ownership Explanation
This tab shows which large financial institutions (like pension funds, hedge funds, etc.) own shares of this ETF.
**Key metrics to examine:**
- **Total Institutional Ownership**: Percentage of the ETF owned by institutions (vs. retail investors)
- **Top Holders**: Major institutional investors with the largest positions
- **Concentration**: Whether ownership is spread widely or concentrated among a few large players
- **Recent Changes**: If available, how institutional ownership has changed recently
**Why this matters**:
- **Higher institutional ownership** often indicates professional investor confidence
- **Changes in institutional ownership** can signal shifting sentiment among professional investors
- **Concentration risk**: If a few institutions own a large percentage, their selling could negatively impact price
- **Liquidity considerations**: Highly institutional ETFs might have different liquidity characteristics
**Typical patterns**: Broadly-diversified, established ETFs often have higher institutional ownership, while newer or more specialized ETFs may have lower institutional participation.
""")
display_whale_analysis(symbol)
# ESG Scores Tab
with tabs[6]:
# Add explanation in an expander
with st.expander("📚 Understanding ESG Scores", expanded=False):
st.markdown("""
### ESG (Environmental, Social, Governance) Explanation
This tab evaluates the ETF's performance on environmental, social, and governance factors - increasingly important considerations for socially-conscious investing.
**Three key components:**
- **Environmental**: How the ETF's holdings impact the natural world (carbon emissions, resource use, pollution, etc.)
- **Social**: How the ETF's holdings manage relationships with employees, suppliers, customers, and communities
- **Governance**: Quality of the ETF's holdings' leadership, executive pay, audits, internal controls, and shareholder rights
**Score interpretation:**
- **70-100**: Excellent - Industry leaders in sustainability practices
- **50-70**: Good - Above average ESG performance
- **30-50**: Average - Typical ESG performance for the industry
- **0-30**: Below Average to Poor - Significant ESG concerns or risks
**Why this matters**:
- **Risk management**: Companies with poor ESG practices often face greater regulatory, legal, and reputational risks
- **Long-term perspective**: Strong ESG performance is increasingly linked to better long-term financial performance
- **Values alignment**: Allows investors to align portfolios with personal values
- **Future-proofing**: Companies addressing ESG concerns may be better positioned for future regulatory changes
**Note**: ESG data is often aggregated from the ETF's underlying holdings and methodologies vary between providers.
""")
display_esg_analysis(symbol)
def display_etf_search():
"""Display ETF search interface."""
st.header("🔍 ETF Search")
# Get ETF list
with st.spinner("Loading ETF list..."):
etf_df = get_etf_list(force_refresh=st.session_state.get("force_refresh", False))
# Store in session state for later validation
st.session_state.etf_list_df = etf_df
if etf_df.empty:
st.error("Failed to load ETF list. Please check your API key.")
return
# Filter options
col1, col2 = st.columns(2)
with col1:
search_query = st.text_input("Search ETFs by Name or Symbol")
with col2:
sort_by = st.selectbox(
"Sort By",
options=["Symbol", "Name"],
index=0
)
# Filter the dataframe
filtered_df = etf_df
if search_query:
filtered_df = filtered_df[
filtered_df["symbol"].str.contains(search_query, case=False) |
filtered_df["name"].str.contains(search_query, case=False)
]
# Sort the dataframe
if sort_by == "Symbol":
filtered_df = filtered_df.sort_values("symbol")
else:
filtered_df = filtered_df.sort_values("name")
# Display results
st.subheader(f"Found {len(filtered_df)} ETFs")
# Create a more user-friendly display dataframe
display_df = filtered_df[["symbol", "name", "exchange"]].copy()
display_df.columns = ["Symbol", "Name", "Exchange"]
# Display with selection
selection = st.dataframe(
display_df,
use_container_width=True,
height=400,
column_config={
"Symbol": st.column_config.TextColumn("Symbol", width="small"),
"Name": st.column_config.TextColumn("Name", width="large"),
"Exchange": st.column_config.TextColumn("Exchange", width="medium")
}
)
# Allow user to select ETF for analysis
selected_symbol = st.selectbox(
"Select ETF for Analysis",
options=[""] + filtered_df["symbol"].tolist(),
format_func=lambda x: f"{x}: {filtered_df[filtered_df['symbol'] == x]['name'].iloc[0]}" if x else "Select an ETF"
)
if selected_symbol:
st.session_state.selected_etf = selected_symbol
st.session_state.current_tab = "analysis"
st.rerun()
def display_comparison():
"""Display ETF comparison interface with financial performance focus."""
st.header("🔄 ETF Performance Comparison")
# Check if we have an API key first
api_key = os.environ.get("FMP_API_KEY", st.session_state.get("fmp_api_key", ""))
if not api_key:
st.error("FMP API key not found. Please enter it in the sidebar.")
return
# Common ETFs for quick selection
common_etfs = ["SPY", "VOO", "QQQ", "VTI", "IWM", "ARKK", "VIG", "SCHD"]
# Try loading from cache or a fast endpoint
try:
with st.spinner("Verifying API connection..."):
test_result = fmp_request("quote/SPY", debug_mode=st.session_state.get("debug_mode", False))
if isinstance(test_result, dict) and "error" in test_result:
st.error("API connection test failed. Please check your API key.")
st.info("Try using the 'Test API Connection' tool from the sidebar to diagnose issues.")
return
except Exception as e:
st.error(f"Error verifying API connection: {str(e)}")
return
# Initialize comparison ETFs if not in session state
if "comparison_etfs" not in st.session_state:
st.session_state.comparison_etfs = []
# Time period selection
time_periods = {
"1 Month": "1month",
"3 Months": "3month",
"6 Months": "6month",
"1 Year": "1year",
"5 Years": "5year",
"Max": "max"
}
# Sidebar for comparison settings
st.sidebar.markdown("### Comparison Settings")
selected_period = st.sidebar.selectbox(
"Time Period",
list(time_periods.keys()),
index=3 # Default to 1 Year
)
# Add analysis type selector
analysis_types = [
"Performance Metrics",
"NAV Premium/Discount",
"Dividend Yield & Erosion",
"Dividend Sustainability",
"ESG Scores",
"Institutional Ownership",
"Dividend Calendar"
]
selected_analysis = st.sidebar.radio("Analysis Type", analysis_types)
period_value = time_periods[selected_period]
# ETF Selection section
st.subheader("Select ETFs to Compare")
# Create 4 columns for quick selection of common ETFs
cols = st.columns(4)
for i, etf in enumerate(common_etfs):
with cols[i % 4]:
if st.button(etf, key=f"btn_{etf}",
disabled=etf in st.session_state.comparison_etfs):
st.session_state.comparison_etfs.append(etf)
st.rerun()
# Custom ETF input
custom_col1, custom_col2 = st.columns([3, 1])
with custom_col1:
custom_etf = st.text_input("Add custom ETF ticker:", "")
with custom_col2:
if st.button("Add", disabled=not custom_etf or len(st.session_state.comparison_etfs) >= 5):
if custom_etf.upper() not in st.session_state.comparison_etfs:
st.session_state.comparison_etfs.append(custom_etf.upper())
st.rerun()
# Display selected ETFs and performance
if st.session_state.comparison_etfs:
if selected_analysis == "Performance Metrics":
display_performance_comparison(period_value, selected_period)
elif selected_analysis == "NAV Premium/Discount":
display_nav_comparison(period_value, selected_period)
elif selected_analysis == "Dividend Yield & Erosion":
display_yield_erosion_comparison(period_value, selected_period)
elif selected_analysis == "Dividend Sustainability":
display_dividend_sustainability_comparison()
elif selected_analysis == "ESG Scores":
display_esg_comparison()
elif selected_analysis == "Institutional Ownership":
display_whale_analysis_comparison()
elif selected_analysis == "Dividend Calendar":
display_dividend_calendar_comparison()
else:
st.info("Select ETFs to compare their performance.")
# Show example comparison
st.subheader("Sample Comparison Chart")
if selected_analysis == "Performance Metrics":
st.image("https://i.imgur.com/JE2Zxsm.png",
caption="Example of ETF performance comparison chart showing relative returns over time")
st.write("""
The performance comparison provides critical metrics for investment decisions:
- **Relative Performance**: See how ETFs perform against each other over time
- **Volatility**: Measure of price fluctuation (lower is generally less risky)
- **Maximum Drawdown**: Largest percentage drop from peak to trough (shows downside risk)
- **Time-based Returns**: Performance over various time periods (1M, 3M, 6M, 1Y)
""")
elif selected_analysis == "NAV Premium/Discount":
st.write("""
The NAV Premium/Discount analysis shows:
- **NAV (Net Asset Value)**: The per-share value of the ETF's underlying assets
- **Premium/Discount**: The percentage difference between market price and NAV
- **Premium/Discount Trend**: How the relationship changes over time
- **Premium/Discount Volatility**: The stability of the price-to-NAV relationship
""")
elif selected_analysis == "Dividend Yield & Erosion":
st.write("""
The Dividend Yield & Erosion analysis shows:
- **Current Yield**: The latest dividend yield based on TTM dividends
- **Yield Trend**: How the yield has changed over time
- **Yield Erosion**: Decline in yield compared to historical averages
- **Yield Volatility**: How stable the yield has been
""")
elif selected_analysis == "Dividend Sustainability":
st.write("""
The Dividend Sustainability analysis shows:
- **Payout Ratio**: Average payout ratio of ETF holdings (<70% is sustainable)
- **Dividend Growth Rate**: Annual growth rate of dividends (>5% signals quality)
- **Growth Consistency**: Percentage of years with positive dividend growth
- **Overall Sustainability**: Combined assessment of dividend sustainability
""")
elif selected_analysis == "ESG Scores":
st.write("""
The ESG Score comparison shows:
- **Environmental Score**: Impact on the environment and natural resources
- **Social Score**: Relationships with employees, suppliers, customers, communities
- **Governance Score**: Leadership, audits, internal controls, shareholder rights
- **Overall ESG Score**: Combined assessment of environmental, social, and governance factors
""")
elif selected_analysis == "Institutional Ownership":
st.write("""
The Institutional Ownership analysis shows:
- **Major Holders**: Top institutional investors holding the ETF
- **Ownership Concentration**: Percentage of ETF owned by top institutions
- **Comparison of Whale Investors**: Compare institutional ownership patterns across ETFs
- **Ownership Changes**: How institutional ownership has changed over time
""")
elif selected_analysis == "Dividend Calendar":
st.write("""
The Dividend Calendar analysis shows:
- **Distribution Schedule**: When each ETF typically pays dividends
- **Distribution Pattern**: Monthly, quarterly, semi-annual, or annual payment patterns
- **Payment Timing**: Compare when different ETFs make their dividend payments
- **Distribution History**: Historical dividend payment records
""")
def display_performance_comparison(period_value, selected_period):
"""Display performance metrics comparison."""
st.subheader(f"ETF Performance Comparison ({selected_period})")
# Fetch data and calculate metrics for all ETFs
performance_data = {}
price_history = {}
with st.spinner("Loading performance data..."):
for symbol in st.session_state.comparison_etfs:
# Get historical prices
prices = get_historical_prices(
symbol,
period=period_value,
force_refresh=st.session_state.get("force_refresh", False)
)
if not prices.empty:
# Store price history for charts
price_history[symbol] = prices
# Calculate performance metrics
performance_data[symbol] = calculate_performance_metrics(prices)
# If we have data, display it
if performance_data:
# Create comparison table of returns
returns_data = []
for symbol, metrics in performance_data.items():
row = {"Symbol": symbol}
# Add returns for different time periods
if "returns" in metrics:
for period, value in metrics["returns"].items():
row[period] = f"{value:.2f}%" if value is not None else "N/A"
# Add volatility and max drawdown
row["Volatility"] = f"{metrics.get('volatility', 0):.2f}%" if metrics.get('volatility') is not None else "N/A"
row["Max Drawdown"] = f"{metrics.get('max_drawdown', 0):.2f}%" if metrics.get('max_drawdown') is not None else "N/A"
returns_data.append(row)
# Create DataFrame and display
if returns_data:
returns_df = pd.DataFrame(returns_data)
returns_df.set_index("Symbol", inplace=True)
st.dataframe(returns_df, use_container_width=True)
# Create price chart
if price_history:
st.subheader("Price Performance")
# Prepare data for chart
chart_data = pd.DataFrame()
for symbol, prices in price_history.items():
if not prices.empty:
# Normalize to percentage change from first day
temp_df = prices[["date", "close"]].copy()
base_price = temp_df["close"].iloc[0]
temp_df["return"] = (temp_df["close"] / base_price - 1) * 100
temp_df["Symbol"] = symbol
# Add to chart data
chart_data = pd.concat([chart_data, temp_df])
if not chart_data.empty:
# Create line chart of percentage returns
fig = px.line(
chart_data,
x="date",
y="return",
color="Symbol",
labels={
"date": "Date",
"return": "Return (%)",
"Symbol": "ETF"
},
title=f"Relative Performance ({selected_period})",
height=500
)
# Add reference line at 0%
fig.add_hline(y=0, line_dash="dash", line_color="gray")
st.plotly_chart(fig, use_container_width=True)
# Allow removing ETFs from comparison
st.subheader("Remove ETFs")
remove_cols = st.columns(len(st.session_state.comparison_etfs))
for i, symbol in enumerate(st.session_state.comparison_etfs):
with remove_cols[i]:
if st.button(f"Remove {symbol}", key=f"remove_{symbol}"):
st.session_state.comparison_etfs.remove(symbol)
st.rerun()
# Clear all button
if st.button("Clear All"):
st.session_state.comparison_etfs = []
st.rerun()
else:
st.warning("No performance data available for the selected ETFs.")
def display_nav_comparison(period_value, selected_period):
"""Display NAV premium/discount comparison."""
st.subheader(f"ETF NAV Premium/Discount Analysis ({selected_period})")
# Fetch data for all ETFs
nav_data = {}
price_data = {}
premium_discount_data = {}
with st.spinner("Loading NAV data..."):
for symbol in st.session_state.comparison_etfs:
# Get price history
prices = get_historical_prices(
symbol,
period=period_value,
force_refresh=st.session_state.get("force_refresh", False)
)
# Get NAV history
nav = get_nav_data(
symbol,
period=period_value,
force_refresh=st.session_state.get("force_refresh", False)
)
if not prices.empty:
price_data[symbol] = prices
# If NAV data is available, calculate premium/discount
if not nav.empty:
nav_data[symbol] = nav
premium_discount = calculate_nav_premium_discount(prices, nav)
if not premium_discount.empty:
premium_discount_data[symbol] = premium_discount
# If we have data, display it
if premium_discount_data:
# Create summary table
summary_data = []
for symbol, pd_df in premium_discount_data.items():
if not pd_df.empty:
# Calculate average, min, max premium/discount
avg_pd = pd_df["premium_discount"].mean()
min_pd = pd_df["premium_discount"].min()
max_pd = pd_df["premium_discount"].max()
current_pd = pd_df["premium_discount"].iloc[-1]
volatility_pd = pd_df["premium_discount"].std()
summary_data.append({
"Symbol": symbol,
"Current P/D": f"{current_pd:.2f}%",
"Avg P/D": f"{avg_pd:.2f}%",
"Min P/D": f"{min_pd:.2f}%",
"Max P/D": f"{max_pd:.2f}%",
"P/D Volatility": f"{volatility_pd:.2f}%"
})
# Create DataFrame and display
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_df.set_index("Symbol", inplace=True)
st.dataframe(summary_df, use_container_width=True)
# Explanation of premium/discount
st.info("""
**Premium/Discount (P/D) Interpretation:**
- **Positive values**: ETF trading at a premium to NAV
- **Negative values**: ETF trading at a discount to NAV
- **Higher volatility**: Less consistent pricing relative to NAV
""")
# Create premium/discount chart
if premium_discount_data:
st.subheader("Premium/Discount Trend")
# Prepare data for chart
chart_data = pd.DataFrame()
for symbol, pd_df in premium_discount_data.items():
if not pd_df.empty:
temp_df = pd_df[["date", "premium_discount"]].copy()
temp_df["Symbol"] = symbol
# Add to chart data
chart_data = pd.concat([chart_data, temp_df])
if not chart_data.empty:
# Create line chart of premium/discount
fig = px.line(
chart_data,
x="date",
y="premium_discount",
color="Symbol",
labels={
"date": "Date",
"premium_discount": "Premium/Discount (%)",
"Symbol": "ETF"
},
title=f"NAV Premium/Discount ({selected_period})",
height=500
)
# Add reference line at 0%
fig.add_hline(y=0, line_dash="dash", line_color="gray")
st.plotly_chart(fig, use_container_width=True)
# Show NAV vs Price charts for each ETF
for symbol in premium_discount_data.keys():
if symbol in price_data and symbol in nav_data:
st.subheader(f"{symbol}: NAV vs. Price")
# Prepare data
price_df = price_data[symbol]
nav_df = nav_data[symbol]
# Merge data
merged_df = pd.merge(
price_df[["date", "close"]],
nav_df[["date", "nav"]],
on="date",
how="inner"
)
if not merged_df.empty:
# Create figure with secondary y-axis
fig = go.Figure()
# Add price line
fig.add_trace(
go.Scatter(
x=merged_df["date"],
y=merged_df["close"],
name="Price",
line=dict(color="blue")
)
)
# Add NAV line
fig.add_trace(
go.Scatter(
x=merged_df["date"],
y=merged_df["nav"],
name="NAV",
line=dict(color="red")
)
)
# Update layout
fig.update_layout(
title=f"{symbol}: Price vs. NAV",
xaxis_title="Date",
yaxis_title="Value ($)",
height=400,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("""
NAV data is not available for the selected ETFs.
""")
st.info("""
⚠️ PREMIUM API FEATURE: NAV data is typically only available with higher-tier FMP API subscriptions.
However, this application now attempts to fetch NAV data from Yahoo Finance as a fallback when FMP data is unavailable.
For some ETFs, Yahoo Finance provides:
- Direct NAV data through the "-IV" suffix ticker
- Estimated NAV based on the ETF's price (less accurate but still useful)
If you're still not seeing data, try enabling debug mode in the sidebar to see more information about the data retrieval process.
""")
# Allow removing ETFs from comparison
if st.session_state.comparison_etfs:
st.subheader("Remove ETFs")
remove_cols = st.columns(len(st.session_state.comparison_etfs))
for i, symbol in enumerate(st.session_state.comparison_etfs):
with remove_cols[i]:
if st.button(f"Remove {symbol}", key=f"remove_nav_{symbol}"):
st.session_state.comparison_etfs.remove(symbol)
st.rerun()
# Clear all button
if st.button("Clear All NAV"):
st.session_state.comparison_etfs = []
st.rerun()
def display_yield_erosion_comparison(period_value, selected_period):
"""Display dividend yield and erosion comparison."""
st.subheader(f"ETF Dividend Yield & Erosion Analysis ({selected_period})")
# Fetch data for all ETFs
yield_data = {}
yield_metrics = {}
with st.spinner("Loading dividend data..."):
for symbol in st.session_state.comparison_etfs:
# Get yield history
yields = get_dividend_yield_history(
symbol,
period=period_value,
force_refresh=st.session_state.get("force_refresh", False)
)
if not yields.empty:
# Store yield history
yield_data[symbol] = yields
# Calculate yield erosion metrics
yield_metrics[symbol] = calculate_yield_erosion(yields)
# If we have data, display it
if yield_metrics:
# Create summary table
summary_data = []
for symbol, metrics in yield_metrics.items():
if metrics:
row = {"Symbol": symbol}
# Add current yield
row["Current Yield"] = f"{metrics.get('current_yield', 0):.2f}%"
# Add yield erosion metrics
if "yield_erosion" in metrics:
for period, value in metrics["yield_erosion"].items():
period_name = period.replace("_", "-").title()
row[f"Erosion ({period_name})"] = f"{value:.2f}%"
# Add yield volatility
if "yield_volatility" in metrics:
row["Yield Volatility"] = f"{metrics.get('yield_volatility', 0):.2f}%"
summary_data.append(row)
# Create DataFrame and display
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_df.set_index("Symbol", inplace=True)
st.dataframe(summary_df, use_container_width=True)
# Explanation of yield erosion
st.info("""
**Yield Erosion Interpretation:**
- **Positive values**: Yield has increased compared to historical average
- **Negative values**: Yield has decreased (eroded) compared to historical average
- **Higher volatility**: Less consistent yield over time
""")
# Create yield trend chart
if yield_data:
st.subheader("Dividend Yield Trend")
# Prepare data for chart
chart_data = pd.DataFrame()
for symbol, yields_df in yield_data.items():
if not yields_df.empty:
temp_df = yields_df[["date", "dividend_yield"]].copy()
temp_df["Symbol"] = symbol
# Add to chart data
chart_data = pd.concat([chart_data, temp_df])
if not chart_data.empty:
# Create line chart of dividend yields
fig = px.line(
chart_data,
x="date",
y="dividend_yield",
color="Symbol",
labels={
"date": "Date",
"dividend_yield": "Dividend Yield (%)",
"Symbol": "ETF"
},
title=f"Dividend Yield Trend ({selected_period})",
height=500
)
st.plotly_chart(fig, use_container_width=True)
# Show individual yield charts for each ETF
for symbol, yields_df in yield_data.items():
if not yields_df.empty:
st.subheader(f"{symbol}: Dividend Yield Components")
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add price line
fig.add_trace(
go.Scatter(
x=yields_df["date"],
y=yields_df["price"],
name="Price",
line=dict(color="blue")
),
secondary_y=False
)
# Add TTM dividend line
fig.add_trace(
go.Scatter(
x=yields_df["date"],
y=yields_df["ttm_dividend"],
name="TTM Dividend",
line=dict(color="green")
),
secondary_y=True
)
# Add yield line
fig.add_trace(
go.Scatter(
x=yields_df["date"],
y=yields_df["dividend_yield"],
name="Yield (%)",
line=dict(color="red", dash="dash")
),
secondary_y=True
)
# Update layout
fig.update_layout(
title=f"{symbol}: Price, TTM Dividend, and Yield",
height=400,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
# Update y-axis labels
fig.update_yaxes(title_text="Price ($)", secondary_y=False)
fig.update_yaxes(title_text="Dividend/Yield", secondary_y=True)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("""
Dividend yield data is not available for the selected ETFs.
This could be because:
1. The selected ETFs don't pay dividends
2. There isn't enough dividend history in the time period selected
3. The API subscription level doesn't provide access to dividend data
Try selecting different ETFs or a longer time period.
""")
# Allow removing ETFs from comparison
if st.session_state.comparison_etfs:
st.subheader("Remove ETFs")
remove_cols = st.columns(len(st.session_state.comparison_etfs))
for i, symbol in enumerate(st.session_state.comparison_etfs):
with remove_cols[i]:
if st.button(f"Remove {symbol}", key=f"remove_yield_{symbol}"):
st.session_state.comparison_etfs.remove(symbol)
st.rerun()
# Clear all button
if st.button("Clear All Yield"):
st.session_state.comparison_etfs = []
st.rerun()
def display_export():
"""Display export options."""
st.header("📤 Export ETF Data")
st.info("This feature will allow you to export ETF data to CSV, PDF, or directly to the ETF Portfolio Builder application.")
# Placeholder for future implementation
st.warning("Export functionality will be implemented in a future update.")
def test_api_connection():
"""Test the connection to the FMP API."""
st.header("🔍 API Connection Test")
# Get API key
api_key = os.environ.get("FMP_API_KEY")
if not api_key:
api_key = st.session_state.get("fmp_api_key")
if not api_key:
st.error("FMP API key not found. Please enter it in the sidebar.")
return
st.write("Testing connection to Financial Modeling Prep API...")
# Try a simple API endpoint first
with st.spinner("Testing API with ETF list endpoint..."):
result = fmp_request("etf/list", debug_mode=True)
if isinstance(result, dict) and "error" in result:
st.error(f"❌ API Test Failed: {result['error']}")
elif isinstance(result, list):
st.success(f"✅ API Test Succeeded! Retrieved {len(result)} ETFs")
# Show sample of the results
if len(result) > 0:
st.write("Sample ETF data:")
sample_df = pd.DataFrame(result[:5])
st.dataframe(sample_df)
else:
st.warning("⚠️ Unexpected API response format")
st.json(result)
# Try a specific ETF profile
test_ticker = "SPY" # S&P 500 ETF - should exist in any ETF database
with st.spinner(f"Testing API with ETF profile for {test_ticker}..."):
result = fmp_request(f"etf/profile/{test_ticker}", debug_mode=True)
if isinstance(result, dict) and "error" in result:
st.error(f"❌ API Test Failed: {result['error']}")
elif isinstance(result, list) and len(result) > 0:
st.success(f"✅ API Test Succeeded! Retrieved profile for {test_ticker}")
# Show the profile data
st.write(f"{test_ticker} Profile Data:")
profile = result[0]
st.json(profile)
else:
st.warning("⚠️ Unexpected API response format")
st.json(result)
# Try searching for MSTY ticker
test_ticker_msty = "MSTY"
with st.spinner(f"Testing search for {test_ticker_msty}..."):
# Get ETF list
etf_list = fmp_request("etf/list", debug_mode=True)
if isinstance(etf_list, list):
# Check if MSTY is in the list
etf_list_df = pd.DataFrame(etf_list)
if 'symbol' in etf_list_df.columns:
if test_ticker_msty in etf_list_df['symbol'].values:
st.success(f"{test_ticker_msty} is a valid ETF in the FMP database")
# Try to get profile
msty_profile = fmp_request(f"etf/profile/{test_ticker_msty}", debug_mode=True)
if isinstance(msty_profile, list) and len(msty_profile) > 0:
st.success(f"✅ Retrieved profile for {test_ticker_msty}")
st.json(msty_profile[0])
else:
st.error(f"{test_ticker_msty} is in the ETF list but profile couldn't be retrieved")
else:
st.warning(f"⚠️ {test_ticker_msty} is NOT found in the FMP ETF database")
# Show closest matches
st.write("Closest matching tickers:")
closest = etf_list_df[etf_list_df['symbol'].str.contains(test_ticker_msty[:2], case=False)]
if not closest.empty:
st.dataframe(closest[['symbol', 'name']])
else:
st.write("No similar tickers found")
else:
st.error("ETF list doesn't contain symbol column")
else:
st.error("Failed to retrieve ETF list for validation")
# Show API connection instructions
st.subheader("📋 Troubleshooting Steps")
st.markdown("""
If the tests above failed, please check:
1. **API Key**: Make sure your FMP API key is correct and active
- Verify at [FMP Dashboard](https://financialmodelingprep.com/developer/docs/)
- Ensure you have an active subscription that includes ETF data
2. **Internet Connection**: Check that you can access financialmodelingprep.com
3. **API Limits**: You might have exceeded your API call limit for the day
4. **Firewall/Network**: Ensure your network allows API calls to external services
""")
def display_whale_analysis_comparison():
"""Display institutional ownership comparison for selected ETFs."""
st.subheader("Institutional Ownership Comparison")
if not st.session_state.comparison_etfs:
st.warning("Please select at least one ETF to analyze.")
return
# Fetch data for all ETFs
institutional_data = {}
ownership_concentration = []
with st.spinner("Loading institutional ownership data..."):
for symbol in st.session_state.comparison_etfs:
# Get institutional ownership data
holders_df = get_institutional_ownership(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
# Store data if available
if not holders_df.empty and "percentage" in holders_df.columns:
institutional_data[symbol] = holders_df
# Calculate metrics
total_institutional = holders_df["percentage"].sum()
# Get top 5 holders percentage
holders_df_sorted = holders_df.sort_values("percentage", ascending=False)
top_5_pct = holders_df.head(5)["percentage"].sum() if len(holders_df) >= 5 else holders_df["percentage"].sum()
# Store concentration data
ownership_concentration.append({
"Symbol": symbol,
"Total Institutional": total_institutional,
"Top 5 Concentration": top_5_pct,
"Number of Institutions": len(holders_df)
})
# Display concentration metrics comparison
if ownership_concentration:
st.subheader("Ownership Concentration")
concentration_df = pd.DataFrame(ownership_concentration)
concentration_df.set_index("Symbol", inplace=True)
# Format percentages
concentration_df["Total Institutional"] = concentration_df["Total Institutional"].apply(lambda x: f"{x:.2f}%")
concentration_df["Top 5 Concentration"] = concentration_df["Top 5 Concentration"].apply(lambda x: f"{x:.2f}%")
st.dataframe(concentration_df, use_container_width=True)
# Create bar chart comparing institutional ownership
chart_data = pd.DataFrame(ownership_concentration)
fig = px.bar(
chart_data,
x="Symbol",
y="Total Institutional",
title="Total Institutional Ownership Percentage",
labels={"Total Institutional": "Ownership %"}
)
st.plotly_chart(fig, use_container_width=True)
# Compare top holders across ETFs
st.subheader("Top 3 Institutional Holders by ETF")
for symbol, holders_df in institutional_data.items():
if not holders_df.empty and "percentage" in holders_df.columns:
st.write(f"**{symbol}**")
# Display top 3 holders
top_holders = holders_df.sort_values("percentage", ascending=False).head(3)
# Format display dataframe
display_df = top_holders.copy()
# Rename columns for better display
column_mapping = {
"holder": "Holder",
"shares": "Shares",
"sharesHeld": "Shares Held",
"dateReported": "Date Reported",
"percentage": "Percentage"
}
display_df = display_df.rename(columns={k: v for k, v in column_mapping.items() if k in display_df.columns})
# Format percentage column
if "Percentage" in display_df.columns:
display_df["Percentage"] = display_df["Percentage"].apply(lambda x: f"{x:.2f}%")
st.dataframe(display_df, use_container_width=True)
else:
st.warning("No institutional ownership data available for the selected ETFs.")
st.info("This data may require a premium API subscription.")
def display_dividend_calendar_comparison():
"""Display dividend calendar comparison for selected ETFs."""
st.subheader("Dividend Distribution Calendar Comparison")
if not st.session_state.comparison_etfs:
st.warning("Please select at least one ETF to analyze.")
return
# Fetch data for all ETFs
dividend_data = {}
distribution_patterns = {}
with st.spinner("Loading dividend calendar data..."):
for symbol in st.session_state.comparison_etfs:
# Get dividend calendar data
dividend_df = get_dividend_calendar(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
# Store data if available
if not dividend_df.empty:
dividend_data[symbol] = dividend_df
# Store distribution pattern
if "distribution_pattern" in dividend_df.columns:
pattern = dividend_df["distribution_pattern"].iloc[0]
distribution_patterns[symbol] = pattern
else:
distribution_patterns[symbol] = "Unknown"
# Display distribution patterns comparison
if distribution_patterns:
st.subheader("Distribution Patterns")
# Create DataFrame for display
patterns_df = pd.DataFrame([
{"Symbol": symbol, "Distribution Pattern": pattern}
for symbol, pattern in distribution_patterns.items()
])
patterns_df.set_index("Symbol", inplace=True)
st.dataframe(patterns_df, use_container_width=True)
# Create chart to visualize monthly distribution patterns
st.subheader("Monthly Distribution Patterns")
# Prepare data for chart
monthly_data = []
# Get latest 2 years of data
current_year = datetime.now().year
min_year = current_year - 2
for symbol, df in dividend_data.items():
if "date" in df.columns and "month" in df.columns and "dividend" in df.columns:
# Filter to recent data
recent_df = df[df["year"] >= min_year]
if not recent_df.empty:
# Calculate average dividend by month
monthly_avg = recent_df.groupby("month")["dividend"].mean().reset_index()
# Make sure all months are represented
all_months = pd.DataFrame({"month": range(1, 13)})
monthly_avg = pd.merge(all_months, monthly_avg, on="month", how="left")
monthly_avg["dividend"] = monthly_avg["dividend"].fillna(0)
# Add symbol column
monthly_avg["Symbol"] = symbol
# Add to monthly data
monthly_data.append(monthly_avg)
if monthly_data:
# Combine all monthly data
combined_monthly = pd.concat(monthly_data)
# Add month name
combined_monthly["month_name"] = combined_monthly["month"].apply(
lambda x: datetime(2000, x, 1).strftime("%b")
)
# Create bar chart
fig = px.bar(
combined_monthly,
x="month",
y="dividend",
color="Symbol",
barmode="group",
labels={"month": "Month", "dividend": "Avg Dividend Amount ($)"},
title="Average Monthly Dividend Distribution",
category_orders={"month": list(range(1, 13))}
)
# Update x-axis to show month names
fig.update_layout(
xaxis=dict(
tickmode="array",
tickvals=list(range(1, 13)),
ticktext=[datetime(2000, m, 1).strftime("%b") for m in range(1, 13)]
)
)
st.plotly_chart(fig, use_container_width=True)
# Create a heatmap grid for each ETF
st.subheader("Dividend Calendar Heatmaps")
for symbol, df in dividend_data.items():
if "date" in df.columns and "month" in df.columns and "year" in df.columns and "dividend" in df.columns:
# Filter to recent data
recent_df = df[df["year"] >= min_year]
if not recent_df.empty:
st.write(f"**{symbol} Dividend Calendar**")
try:
# Group by year and month
calendar_data = recent_df.groupby(["year", "month"]).agg({
"dividend": "sum"
}).reset_index()
# Create a complete month-year grid with all possible combinations
all_years = sorted(calendar_data["year"].unique())
# Ensure we have a complete grid by reindexing
pivot_data = calendar_data.pivot_table(
index="month",
columns="year",
values="dividend",
fill_value=0 # Fill missing values with 0
)
# Reindex to ensure all 12 months are included
pivot_data = pivot_data.reindex(list(range(1, 13)), fill_value=0)
# Get month labels
month_labels = [datetime(2000, i, 1).strftime("%b") for i in range(1, 13)]
# Create heatmap
fig = px.imshow(
pivot_data,
labels=dict(x="Year", y="Month", color="Dividend Amount"),
x=pivot_data.columns.tolist(),
y=month_labels,
aspect="auto",
title=f"{symbol} Dividend Distribution Calendar"
)
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"Unable to generate calendar for {symbol}: {str(e)}")
# Display recent dividend payments
st.subheader("Recent Dividend Payments")
for symbol, df in dividend_data.items():
if "date" in df.columns and "dividend" in df.columns:
st.write(f"**{symbol}**")
# Display recent dividends
recent_dividends = df.sort_values("date", ascending=False).head(5)
# Format display dataframe
display_df = recent_dividends[["date", "dividend"]].copy()
display_df["date"] = display_df["date"].dt.strftime("%Y-%m-%d")
display_df.columns = ["Date", "Dividend Amount ($)"]
st.dataframe(display_df, use_container_width=True)
else:
st.warning("No dividend calendar data available for the selected ETFs.")
st.info("Some ETFs may not pay dividends, or dividend history may be limited.")
def get_dividend_sustainability(symbol: str, force_refresh: bool = False) -> Dict:
"""Calculate dividend sustainability metrics for an ETF.
Args:
symbol: ETF ticker symbol
force_refresh: Whether to force refresh data from API
Returns:
Dictionary with dividend sustainability metrics
"""
debug_mode = st.session_state.get("debug_mode", False)
# Get ETF holdings first
holdings = get_etf_holdings(symbol, force_refresh=force_refresh)
# Get dividend history
dividend_history = get_etf_dividend_history(symbol, force_refresh=force_refresh)
results = {
"symbol": symbol,
"payout_ratio": None,
"dividend_growth_rate": None,
"growth_years": 0,
"dividend_consistency": None
}
# Calculate dividend growth rate if we have sufficient dividend history
if not dividend_history.empty and "date" in dividend_history.columns and "dividend" in dividend_history.columns:
try:
# Sort by date
dividend_history = dividend_history.sort_values("date")
# Convert date to datetime if not already
dividend_history["date"] = pd.to_datetime(dividend_history["date"])
# Add year column
dividend_history["year"] = dividend_history["date"].dt.year
# Calculate annual dividends
annual_dividends = dividend_history.groupby("year")["dividend"].sum().reset_index()
if len(annual_dividends) >= 3: # Need at least 3 years for meaningful growth rate
# Calculate year-over-year growth rates
annual_dividends["growth_rate"] = annual_dividends["dividend"].pct_change()
# Remove first year (which has NaN growth rate)
annual_dividends = annual_dividends.dropna()
# Calculate average growth rate
avg_growth_rate = annual_dividends["growth_rate"].mean() * 100
# Get number of years with data
growth_years = len(annual_dividends)
# Calculate consistency (percentage of years with positive growth)
positive_growth_years = (annual_dividends["growth_rate"] > 0).sum()
consistency = (positive_growth_years / len(annual_dividends)) * 100
results["dividend_growth_rate"] = avg_growth_rate
results["growth_years"] = growth_years
results["dividend_consistency"] = consistency
if debug_mode:
st.write(f"Annual dividends for {symbol}:", annual_dividends)
except Exception as e:
if debug_mode:
st.error(f"Error calculating dividend growth rate: {str(e)}")
# Calculate average payout ratio for holdings if available
if not holdings.empty and "asset" in holdings.columns:
try:
# Filter to top holdings that represent majority of ETF
if "weightPercentage" in holdings.columns:
sorted_holdings = holdings.sort_values("weightPercentage", ascending=False)
top_holdings = sorted_holdings.head(10) # Top 10 holdings
else:
top_holdings = holdings.head(10)
# Get tickers of top holdings
if "asset" in top_holdings.columns:
tickers = top_holdings["asset"].tolist()
# Calculate payout ratios for each ticker
payout_ratios = []
for ticker in tickers:
# Strip any exchange or extra information from ticker
ticker = ticker.split(':')[-1].split(' ')[0]
# Get financial data
financial_data = fmp_request(
f"key-metrics-ttm/{ticker}",
force_refresh=force_refresh,
debug_mode=debug_mode
)
if isinstance(financial_data, list) and len(financial_data) > 0:
if "payoutRatioTTM" in financial_data[0]:
payout_ratio = financial_data[0]["payoutRatioTTM"]
if payout_ratio is not None and payout_ratio < 2: # Filter out extreme values
payout_ratios.append(payout_ratio * 100) # Convert to percentage
# Calculate average payout ratio if we have data
if payout_ratios:
avg_payout_ratio = sum(payout_ratios) / len(payout_ratios)
results["payout_ratio"] = avg_payout_ratio
if debug_mode:
st.write(f"Payout ratios for {symbol} holdings:", payout_ratios)
except Exception as e:
if debug_mode:
st.error(f"Error calculating payout ratio: {str(e)}")
return results
def assess_dividend_sustainability(metrics: Dict) -> Dict:
"""Assess dividend sustainability based on metrics.
Args:
metrics: Dictionary with dividend sustainability metrics
Returns:
Dictionary with sustainability assessments
"""
assessment = {
"sustainability_score": 0,
"payout_ratio_assessment": "No Data",
"growth_rate_assessment": "No Data",
"consistency_assessment": "No Data",
"overall_assessment": "No Data"
}
score = 0
max_score = 0
# Assess payout ratio (lower is better)
if metrics.get("payout_ratio") is not None:
max_score += 1
payout_ratio = metrics["payout_ratio"]
if payout_ratio < 30:
assessment["payout_ratio_assessment"] = "Excellent"
score += 1
elif payout_ratio < 50:
assessment["payout_ratio_assessment"] = "Good"
score += 0.75
elif payout_ratio < 70:
assessment["payout_ratio_assessment"] = "Fair"
score += 0.5
elif payout_ratio < 90:
assessment["payout_ratio_assessment"] = "Caution"
score += 0.25
else:
assessment["payout_ratio_assessment"] = "High Risk"
score += 0
# Assess dividend growth rate (higher is better)
if metrics.get("dividend_growth_rate") is not None:
max_score += 1
growth_rate = metrics["dividend_growth_rate"]
if growth_rate > 10:
assessment["growth_rate_assessment"] = "Excellent"
score += 1
elif growth_rate > 5:
assessment["growth_rate_assessment"] = "Good"
score += 0.75
elif growth_rate > 0:
assessment["growth_rate_assessment"] = "Fair"
score += 0.5
elif growth_rate > -5:
assessment["growth_rate_assessment"] = "Caution"
score += 0.25
else:
assessment["growth_rate_assessment"] = "Declining"
score += 0
# Assess consistency (higher is better)
if metrics.get("dividend_consistency") is not None:
max_score += 1
consistency = metrics["dividend_consistency"]
if consistency > 90:
assessment["consistency_assessment"] = "Excellent"
score += 1
elif consistency > 75:
assessment["consistency_assessment"] = "Good"
score += 0.75
elif consistency > 50:
assessment["consistency_assessment"] = "Fair"
score += 0.5
elif consistency > 25:
assessment["consistency_assessment"] = "Inconsistent"
score += 0.25
else:
assessment["consistency_assessment"] = "Unreliable"
score += 0
# Calculate overall sustainability score
if max_score > 0:
sustainability_score = (score / max_score) * 100
assessment["sustainability_score"] = sustainability_score
# Overall assessment based on sustainability score
if sustainability_score > 80:
assessment["overall_assessment"] = "Highly Sustainable"
elif sustainability_score > 60:
assessment["overall_assessment"] = "Sustainable"
elif sustainability_score > 40:
assessment["overall_assessment"] = "Moderately Sustainable"
elif sustainability_score > 20:
assessment["overall_assessment"] = "Questionable Sustainability"
else:
assessment["overall_assessment"] = "Unsustainable"
return assessment
def display_dividend_sustainability(symbol: str):
"""Display dividend sustainability analysis."""
st.subheader("Dividend Sustainability Analysis")
with st.spinner("Calculating dividend sustainability metrics..."):
metrics = get_dividend_sustainability(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
assessment = assess_dividend_sustainability(metrics)
# Display results
has_data = (metrics.get("payout_ratio") is not None or
metrics.get("dividend_growth_rate") is not None or
metrics.get("dividend_consistency") is not None)
if not has_data:
st.warning("No dividend sustainability data available for this ETF.")
st.info("⚠️ PREMIUM API FEATURE: Dividend sustainability analysis requires both dividend history data and holdings data with financial metrics. These detailed analytics typically require a paid FMP API subscription.")
st.info("Without a premium subscription, the app is unable to calculate payout ratios and growth consistency metrics needed for sustainability analysis.")
return
# Create columns for metrics
col1, col2 = st.columns(2)
with col1:
# Sustainability score gauge chart
if assessment["sustainability_score"] > 0:
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=assessment["sustainability_score"],
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "Sustainability Score"},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 20], 'color': "red"},
{'range': [20, 40], 'color': "orange"},
{'range': [40, 60], 'color': "yellow"},
{'range': [60, 80], 'color': "lightgreen"},
{'range': [80, 100], 'color': "green"}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': assessment["sustainability_score"]
}
}
))
fig.update_layout(
height=250,
margin=dict(l=20, r=20, t=50, b=20),
)
st.plotly_chart(fig, use_container_width=True)
# Overall assessment
st.metric(
"Overall Assessment",
assessment["overall_assessment"]
)
with col2:
# Metrics table
metrics_data = []
if metrics.get("payout_ratio") is not None:
metrics_data.append({
"Metric": "Average Payout Ratio",
"Value": f"{metrics['payout_ratio']:.2f}%",
"Assessment": assessment["payout_ratio_assessment"]
})
if metrics.get("dividend_growth_rate") is not None:
metrics_data.append({
"Metric": f"{metrics['growth_years']}-Year Dividend Growth Rate",
"Value": f"{metrics['dividend_growth_rate']:.2f}%",
"Assessment": assessment["growth_rate_assessment"]
})
if metrics.get("dividend_consistency") is not None:
metrics_data.append({
"Metric": "Dividend Growth Consistency",
"Value": f"{metrics['dividend_consistency']:.2f}%",
"Assessment": assessment["consistency_assessment"]
})
if metrics_data:
st.dataframe(
pd.DataFrame(metrics_data),
use_container_width=True,
hide_index=True
)
# Add detailed explanation
st.write("""
### Understanding Dividend Sustainability
The sustainability score evaluates how likely the ETF can maintain or grow its dividend payments over time. A higher score indicates better sustainability.
**Interpreting the Score:**
- **80-100**: Highly Sustainable - Strong likelihood of continued dividend growth
- **60-80**: Sustainable - Good prospects for maintaining dividends
- **40-60**: Moderately Sustainable - May maintain dividends but with limited growth
- **20-40**: Questionable Sustainability - Risk of dividend cuts
- **0-20**: Unsustainable - High probability of dividend reduction
**Key Metrics:**
- **Payout Ratio**: Percentage of earnings paid as dividends
- Below 30%: Excellent (very safe)
- 30-50%: Good (safe)
- 50-70%: Fair (sustainable)
- 70-90%: Caution (potentially unsustainable)
- Above 90%: High Risk (likely unsustainable)
- **Dividend Growth Rate**: Annual growth rate of dividend payments
- Above 10%: Excellent growth
- 5-10%: Good growth
- 0-5%: Fair growth
- Below 0%: Declining dividends
- **Growth Consistency**: Percentage of years with positive dividend growth
- Higher percentages indicate more reliable dividend growth
""")
# Display annual dividend growth chart if we have the data
if metrics.get("growth_years", 0) >= 3:
with st.spinner("Generating dividend growth chart..."):
# Get dividend history again to generate the chart
dividend_history = get_etf_dividend_history(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
if not dividend_history.empty:
try:
# Sort by date
dividend_history = dividend_history.sort_values("date")
# Convert date to datetime if not already
dividend_history["date"] = pd.to_datetime(dividend_history["date"])
# Add year column
dividend_history["year"] = dividend_history["date"].dt.year
# Calculate annual dividends
annual_dividends = dividend_history.groupby("year")["dividend"].sum().reset_index()
# Calculate YoY growth rates
annual_dividends["growth_rate"] = annual_dividends["dividend"].pct_change() * 100
annual_dividends["growth_rate"] = annual_dividends["growth_rate"].round(2)
# Create figure with two y-axes
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add annual dividends as bars
fig.add_trace(
go.Bar(
x=annual_dividends["year"],
y=annual_dividends["dividend"],
name="Annual Dividend",
marker_color="blue"
),
secondary_y=False
)
# Add growth rates as a line (excluding first year which has NaN growth)
growth_df = annual_dividends.dropna()
if not growth_df.empty:
fig.add_trace(
go.Scatter(
x=growth_df["year"],
y=growth_df["growth_rate"],
name="YoY Growth Rate",
marker_color="red",
mode="lines+markers"
),
secondary_y=True
)
# Update layout
fig.update_layout(
title=f"Annual Dividends and Growth Rates for {symbol}",
xaxis_title="Year",
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
# Update y-axes titles
fig.update_yaxes(title_text="Annual Dividend ($)", secondary_y=False)
fig.update_yaxes(title_text="YoY Growth Rate (%)", secondary_y=True)
# Display chart
st.subheader("Dividend Growth History")
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"Error generating dividend growth chart: {str(e)}")
def display_dividend_sustainability_comparison():
"""Display dividend sustainability comparison for selected ETFs."""
st.subheader("Dividend Sustainability Comparison")
if not st.session_state.comparison_etfs:
st.warning("Please select at least one ETF to analyze.")
return
# Fetch data for all ETFs
sustainability_data = {}
assessment_data = []
with st.spinner("Calculating dividend sustainability metrics..."):
for symbol in st.session_state.comparison_etfs:
# Get sustainability metrics
metrics = get_dividend_sustainability(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
# Store metrics if available
if metrics and (metrics.get("payout_ratio") is not None or
metrics.get("dividend_growth_rate") is not None or
metrics.get("dividend_consistency") is not None):
sustainability_data[symbol] = metrics
# Get assessment
assessment = assess_dividend_sustainability(metrics)
# Create assessment summary
summary = {
"Symbol": symbol,
"Sustainability Score": f"{assessment.get('sustainability_score', 0):.1f}",
"Overall Assessment": assessment.get("overall_assessment", "No Data")
}
if metrics.get("payout_ratio") is not None:
summary["Payout Ratio"] = f"{metrics['payout_ratio']:.2f}%"
summary["Payout Assessment"] = assessment.get("payout_ratio_assessment", "No Data")
if metrics.get("dividend_growth_rate") is not None:
summary["Growth Rate"] = f"{metrics['dividend_growth_rate']:.2f}%"
summary["Growth Assessment"] = assessment.get("growth_rate_assessment", "No Data")
if metrics.get("dividend_consistency") is not None:
summary["Consistency"] = f"{metrics['dividend_consistency']:.2f}%"
summary["Consistency Assessment"] = assessment.get("consistency_assessment", "No Data")
assessment_data.append(summary)
# Display assessment comparison table
if assessment_data:
st.subheader("Dividend Sustainability Assessment")
assessment_df = pd.DataFrame(assessment_data)
assessment_df.set_index("Symbol", inplace=True)
st.dataframe(assessment_df, use_container_width=True)
# Create bar chart comparing sustainability scores
score_data = []
for symbol in sustainability_data.keys():
assessment = assess_dividend_sustainability(sustainability_data[symbol])
score = assessment.get("sustainability_score", 0)
if score > 0:
score_data.append({
"Symbol": symbol,
"Sustainability Score": score
})
if score_data:
score_df = pd.DataFrame(score_data)
# Create bar chart
fig = px.bar(
score_df,
x="Symbol",
y="Sustainability Score",
title="Dividend Sustainability Score Comparison",
color="Sustainability Score",
color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"],
range_color=[0, 100]
)
st.plotly_chart(fig, use_container_width=True)
# Create comparison charts
# 1. Payout ratio comparison
payout_data = []
for symbol, metrics in sustainability_data.items():
if metrics.get("payout_ratio") is not None:
payout_data.append({
"Symbol": symbol,
"Payout Ratio": metrics["payout_ratio"]
})
if payout_data and len(payout_data) > 1: # Only show if we have multiple ETFs
st.subheader("Payout Ratio Comparison")
payout_df = pd.DataFrame(payout_data)
# Create bar chart
fig = px.bar(
payout_df,
x="Symbol",
y="Payout Ratio",
title="Average Payout Ratio of Holdings",
color="Payout Ratio",
color_continuous_scale=["green", "lightgreen", "yellow", "orange", "red"],
range_color=[0, 100]
)
# Add reference lines for different thresholds
fig.add_hline(y=30, line_dash="dash", line_color="green",
annotation_text="Excellent (<30%)", annotation_position="bottom right")
fig.add_hline(y=70, line_dash="dash", line_color="orange",
annotation_text="Warning (>70%)", annotation_position="bottom right")
st.plotly_chart(fig, use_container_width=True)
# 2. Growth rate comparison
growth_data = []
for symbol, metrics in sustainability_data.items():
if metrics.get("dividend_growth_rate") is not None:
growth_data.append({
"Symbol": symbol,
"Growth Rate": metrics["dividend_growth_rate"],
"Years": metrics["growth_years"]
})
if growth_data and len(growth_data) > 1: # Only show if we have multiple ETFs
st.subheader("Dividend Growth Rate Comparison")
growth_df = pd.DataFrame(growth_data)
# Create bar chart
fig = px.bar(
growth_df,
x="Symbol",
y="Growth Rate",
title="Average Annual Dividend Growth Rate",
color="Growth Rate",
color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"],
text="Years",
hover_data=["Years"]
)
# Add reference line for 0% growth
fig.add_hline(y=0, line_dash="dash", line_color="gray")
# Add reference line for good growth rate
fig.add_hline(y=5, line_dash="dash", line_color="green",
annotation_text="Good Growth (>5%)", annotation_position="bottom right")
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("No dividend sustainability data available for the selected ETFs.")
st.info("""
This could be due to:
1. The selected ETFs don't have sufficient dividend history
2. Holdings data is not available for the ETFs
3. Financial data for the ETF holdings is not accessible
Try selecting ETFs with longer dividend history or more accessible holding data.
""")
def get_esg_score(symbol: str, force_refresh: bool = False) -> Dict:
"""Get ESG (Environmental, Social, Governance) scores for an ETF.
Args:
symbol: ETF ticker symbol
force_refresh: Whether to force refresh data from API
Returns:
Dictionary with ESG score data
"""
debug_mode = st.session_state.get("debug_mode", False)
# Try to get ESG score from FMP API
esg_data = fmp_request(f"esg-score/{symbol}", force_refresh=force_refresh, debug_mode=debug_mode)
# Initialize results dictionary
results = {
"symbol": symbol,
"esg_score": None,
"environmental_score": None,
"social_score": None,
"governance_score": None,
"year": None,
"peer_comparison": None
}
# Check for error or empty response
if isinstance(esg_data, dict) and "error" in esg_data:
if debug_mode:
st.warning(f"ESG score data not available via direct API for {symbol}")
# Try alternative approach - get ESG scores from holdings
results = get_esg_from_holdings(symbol, force_refresh, debug_mode)
return results
# Process ESG data if available
if isinstance(esg_data, list) and len(esg_data) > 0:
try:
# Get most recent ESG data
recent_esg = esg_data[0]
# Extract ESG scores
if "totalEsg" in recent_esg:
results["esg_score"] = recent_esg["totalEsg"]
if "environmentalScore" in recent_esg:
results["environmental_score"] = recent_esg["environmentalScore"]
if "socialScore" in recent_esg:
results["social_score"] = recent_esg["socialScore"]
if "governanceScore" in recent_esg:
results["governance_score"] = recent_esg["governanceScore"]
if "year" in recent_esg:
results["year"] = recent_esg["year"]
# Include peer comparison if available
if "peerGroup" in recent_esg:
results["peer_comparison"] = {
"group": recent_esg.get("peerGroup"),
"avg_esg": recent_esg.get("peerEsgScorePerformance", 0),
"percentile": recent_esg.get("percentile", 0)
}
return results
except Exception as e:
if debug_mode:
st.error(f"Error processing ESG data: {str(e)}")
# Try alternative approach
return get_esg_from_holdings(symbol, force_refresh, debug_mode)
# If no direct ESG data, try getting ESG from holdings
return get_esg_from_holdings(symbol, force_refresh, debug_mode)
def get_esg_from_holdings(symbol: str, force_refresh: bool, debug_mode: bool) -> Dict:
"""Get ESG scores by aggregating data from ETF holdings.
Args:
symbol: ETF ticker symbol
force_refresh: Whether to force refresh data from API
debug_mode: Whether to show debug information
Returns:
Dictionary with aggregated ESG score data
"""
# Initialize results dictionary
results = {
"symbol": symbol,
"esg_score": None,
"environmental_score": None,
"social_score": None,
"governance_score": None,
"year": datetime.now().year, # Use current year for aggregated data
"is_aggregated": True # Flag to indicate this is aggregated from holdings
}
# Get ETF holdings
holdings = get_etf_holdings(symbol, force_refresh=force_refresh)
if holdings.empty or "asset" not in holdings.columns:
return results
try:
# Filter to top holdings that represent majority of ETF
if "weightPercentage" in holdings.columns:
sorted_holdings = holdings.sort_values("weightPercentage", ascending=False)
top_holdings = sorted_holdings.head(10) # Top 10 holdings
else:
top_holdings = holdings.head(10)
# Get tickers of top holdings
if "asset" in top_holdings.columns:
tickers = top_holdings["asset"].tolist()
# Initialize lists to store ESG scores
esg_scores = []
env_scores = []
social_scores = []
gov_scores = []
# Get weights if available
weights = []
if "weightPercentage" in top_holdings.columns:
weights = top_holdings["weightPercentage"].tolist()
# Normalize weights to sum to 1
total_weight = sum(weights)
if total_weight > 0:
weights = [w / total_weight for w in weights]
# If no weights, use equal weighting
if not weights:
weights = [1 / len(tickers)] * len(tickers)
# Collect ESG scores for each ticker
for i, ticker in enumerate(tickers):
# Strip any exchange or extra information from ticker
ticker = ticker.split(':')[-1].split(' ')[0]
# Get ESG data for the ticker
ticker_esg = fmp_request(
f"esg-score/{ticker}",
force_refresh=force_refresh,
debug_mode=debug_mode
)
if isinstance(ticker_esg, list) and len(ticker_esg) > 0:
recent_esg = ticker_esg[0]
# Extract ESG scores and apply weight
if "totalEsg" in recent_esg:
esg_scores.append(recent_esg["totalEsg"] * weights[i])
if "environmentalScore" in recent_esg:
env_scores.append(recent_esg["environmentalScore"] * weights[i])
if "socialScore" in recent_esg:
social_scores.append(recent_esg["socialScore"] * weights[i])
if "governanceScore" in recent_esg:
gov_scores.append(recent_esg["governanceScore"] * weights[i])
# Calculate weighted average scores
if esg_scores:
results["esg_score"] = sum(esg_scores)
if env_scores:
results["environmental_score"] = sum(env_scores)
if social_scores:
results["social_score"] = sum(social_scores)
if gov_scores:
results["governance_score"] = sum(gov_scores)
return results
except Exception as e:
if debug_mode:
st.error(f"Error calculating ESG scores from holdings: {str(e)}")
return results
def assess_esg_score(esg_data: Dict) -> Dict:
"""Assess ESG scores based on standard industry thresholds.
Args:
esg_data: Dictionary with ESG score data
Returns:
Dictionary with ESG score assessments
"""
assessment = {
"esg_rating": "No Data",
"environmental_rating": "No Data",
"social_rating": "No Data",
"governance_rating": "No Data",
"overall_assessment": "No Data"
}
# Rate overall ESG score
if esg_data.get("esg_score") is not None:
esg_score = esg_data["esg_score"]
if esg_score >= 70:
assessment["esg_rating"] = "Excellent"
elif esg_score >= 60:
assessment["esg_rating"] = "Very Good"
elif esg_score >= 50:
assessment["esg_rating"] = "Good"
elif esg_score >= 40:
assessment["esg_rating"] = "Average"
elif esg_score >= 30:
assessment["esg_rating"] = "Below Average"
else:
assessment["esg_rating"] = "Poor"
# Rate environmental score
if esg_data.get("environmental_score") is not None:
env_score = esg_data["environmental_score"]
if env_score >= 70:
assessment["environmental_rating"] = "Excellent"
elif env_score >= 60:
assessment["environmental_rating"] = "Very Good"
elif env_score >= 50:
assessment["environmental_rating"] = "Good"
elif env_score >= 40:
assessment["environmental_rating"] = "Average"
elif env_score >= 30:
assessment["environmental_rating"] = "Below Average"
else:
assessment["environmental_rating"] = "Poor"
# Rate social score
if esg_data.get("social_score") is not None:
social_score = esg_data["social_score"]
if social_score >= 70:
assessment["social_rating"] = "Excellent"
elif social_score >= 60:
assessment["social_rating"] = "Very Good"
elif social_score >= 50:
assessment["social_rating"] = "Good"
elif social_score >= 40:
assessment["social_rating"] = "Average"
elif social_score >= 30:
assessment["social_rating"] = "Below Average"
else:
assessment["social_rating"] = "Poor"
# Rate governance score
if esg_data.get("governance_score") is not None:
gov_score = esg_data["governance_score"]
if gov_score >= 70:
assessment["governance_rating"] = "Excellent"
elif gov_score >= 60:
assessment["governance_rating"] = "Very Good"
elif gov_score >= 50:
assessment["governance_rating"] = "Good"
elif gov_score >= 40:
assessment["governance_rating"] = "Average"
elif gov_score >= 30:
assessment["governance_rating"] = "Below Average"
else:
assessment["governance_rating"] = "Poor"
# Overall assessment based on ESG rating
if assessment["esg_rating"] != "No Data":
assessment["overall_assessment"] = assessment["esg_rating"]
return assessment
def display_esg_analysis(symbol: str):
"""Display ESG analysis for an ETF."""
st.subheader("ESG (Environmental, Social, Governance) Analysis")
with st.spinner("Loading ESG data..."):
esg_data = get_esg_score(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
assessment = assess_esg_score(esg_data)
# Display results
has_data = (esg_data.get("esg_score") is not None or
esg_data.get("environmental_score") is not None or
esg_data.get("social_score") is not None or
esg_data.get("governance_score") is not None)
if not has_data:
st.warning("No ESG data available for this ETF.")
st.info("⚠️ PREMIUM API FEATURE: Environmental, Social, and Governance (ESG) scores require the Professional or Enterprise tier of the FMP API subscription.")
st.info("ESG data provides insights into sustainability practices, social responsibility, and governance quality of the ETF's holdings. This data is increasingly important for socially conscious investors.")
return
# Check if data is aggregated
if esg_data.get("is_aggregated", False):
st.info("ESG data is aggregated from top holdings and may not represent the official ESG score for this ETF.")
# Create columns for overall score and components
col1, col2 = st.columns([1, 2])
with col1:
# Overall ESG score gauge chart
if esg_data.get("esg_score") is not None:
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=esg_data["esg_score"],
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "ESG Score"},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 30], 'color': "red"},
{'range': [30, 50], 'color': "orange"},
{'range': [50, 70], 'color': "lightgreen"},
{'range': [70, 100], 'color': "green"}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': esg_data["esg_score"]
}
}
))
fig.update_layout(
height=250,
margin=dict(l=20, r=20, t=50, b=20),
)
st.plotly_chart(fig, use_container_width=True)
# Overall rating
st.metric(
"ESG Rating",
assessment["overall_assessment"]
)
# Data year
if esg_data.get("year") is not None:
st.caption(f"Data Year: {esg_data['year']}")
with col2:
# ESG component scores
component_data = []
components = [
("Environmental", esg_data.get("environmental_score"), assessment.get("environmental_rating")),
("Social", esg_data.get("social_score"), assessment.get("social_rating")),
("Governance", esg_data.get("governance_score"), assessment.get("governance_rating"))
]
for name, score, rating in components:
if score is not None:
component_data.append({
"Component": name,
"Score": score,
"Rating": rating
})
# Create horizontal bar chart for components
if component_data:
fig = px.bar(
pd.DataFrame(component_data),
y="Component",
x="Score",
color="Score",
color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"],
range_color=[0, 100],
labels={"Score": "Score (0-100)"},
title="ESG Component Scores",
text="Rating",
orientation="h"
)
# Update layout
fig.update_layout(
yaxis=dict(autorange="reversed"), # Reverse y-axis for better reading
height=250,
margin=dict(l=20, r=20, t=50, b=20)
)
st.plotly_chart(fig, use_container_width=True)
# Add explanation
st.caption("""
**ESG Score Interpretation:**
- **Environmental**: Evaluates resource use, emissions, innovation, and environmental impact
- **Social**: Assesses workforce, human rights, community, and product responsibility
- **Governance**: Reviews management structure, policies, and shareholder relations
- **Overall ESG Score**: Combined metric (scale 0-100, higher is better)
""")
# Show peer comparison if available
if esg_data.get("peer_comparison") is not None:
st.subheader("Peer Comparison")
peer = esg_data["peer_comparison"]
peer_col1, peer_col2 = st.columns(2)
with peer_col1:
st.metric(
"Peer Group",
peer.get("group", "Not Available")
)
with peer_col2:
st.metric(
"Percentile Rank",
f"{peer.get('percentile', 0):.0f}%",
help="Higher percentile means better ESG performance relative to peers"
)
# Create comparison chart if we have peer average
if peer.get("avg_esg") is not None and esg_data.get("esg_score") is not None:
peer_data = pd.DataFrame([
{"Entity": symbol, "ESG Score": esg_data["esg_score"]},
{"Entity": "Peer Average", "ESG Score": peer["avg_esg"]}
])
fig = px.bar(
peer_data,
x="Entity",
y="ESG Score",
color="Entity",
title="ESG Score vs. Peer Average",
text="ESG Score",
text_auto=".1f"
)
st.plotly_chart(fig, use_container_width=True)
def display_esg_comparison():
"""Display ESG score comparison for selected ETFs."""
st.subheader("ESG Score Comparison")
if not st.session_state.comparison_etfs:
st.warning("Please select at least one ETF to analyze.")
return
# Fetch ESG data for all ETFs
esg_data = {}
assessment_data = []
with st.spinner("Fetching ESG data..."):
for symbol in st.session_state.comparison_etfs:
# Get ESG metrics
metrics = get_esg_score(
symbol,
force_refresh=st.session_state.get("force_refresh", False)
)
# Store metrics if available
if metrics and (metrics.get("esg_score") is not None or
metrics.get("environmental_score") is not None or
metrics.get("social_score") is not None or
metrics.get("governance_score") is not None):
esg_data[symbol] = metrics
# Get assessment
assessment = assess_esg_score(metrics)
# Create assessment summary
summary = {
"Symbol": symbol,
"ESG Score": metrics.get("esg_score"),
"ESG Rating": assessment.get("esg_rating", "No Data")
}
if metrics.get("environmental_score") is not None:
summary["Environmental"] = metrics["environmental_score"]
summary["Env. Rating"] = assessment.get("environmental_rating", "No Data")
if metrics.get("social_score") is not None:
summary["Social"] = metrics["social_score"]
summary["Social Rating"] = assessment.get("social_rating", "No Data")
if metrics.get("governance_score") is not None:
summary["Governance"] = metrics["governance_score"]
summary["Gov. Rating"] = assessment.get("governance_rating", "No Data")
if metrics.get("is_aggregated", False):
summary["Data Source"] = "Aggregated"
else:
summary["Data Source"] = "Direct"
assessment_data.append(summary)
# Display assessment comparison table
if assessment_data:
st.subheader("ESG Score Assessment")
assessment_df = pd.DataFrame(assessment_data)
# Format numeric columns
numeric_cols = ["ESG Score", "Environmental", "Social", "Governance"]
for col in numeric_cols:
if col in assessment_df.columns:
assessment_df[col] = assessment_df[col].apply(lambda x: f"{x:.1f}" if x is not None else "N/A")
# Set index to Symbol
if "Symbol" in assessment_df.columns:
assessment_df.set_index("Symbol", inplace=True)
st.dataframe(assessment_df, use_container_width=True)
# Create bar chart comparing overall ESG scores
score_data = []
for symbol, metrics in esg_data.items():
if metrics.get("esg_score") is not None:
score_data.append({
"Symbol": symbol,
"ESG Score": metrics["esg_score"]
})
if score_data and len(score_data) > 0:
score_df = pd.DataFrame(score_data)
# Create bar chart
fig = px.bar(
score_df,
x="Symbol",
y="ESG Score",
title="ESG Score Comparison",
color="ESG Score",
color_continuous_scale=["red", "orange", "yellow", "lightgreen", "green"],
range_color=[0, 100]
)
# Add reference lines for different ESG thresholds
fig.add_hline(y=70, line_dash="dash", line_color="green",
annotation_text="Excellent (>70)", annotation_position="bottom right")
fig.add_hline(y=50, line_dash="dash", line_color="gold",
annotation_text="Good (>50)", annotation_position="bottom right")
fig.add_hline(y=30, line_dash="dash", line_color="orange",
annotation_text="Below Average (<30)", annotation_position="bottom right")
st.plotly_chart(fig, use_container_width=True)
# Create component comparison if we have multiple ETFs
if len(esg_data) > 1:
# Prepare data for ESG components comparison
components = ["Environmental", "Social", "Governance"]
component_data = []
for symbol, metrics in esg_data.items():
for component in components:
component_key = component.lower() + "_score"
if metrics.get(component_key) is not None:
component_data.append({
"Symbol": symbol,
"Component": component,
"Score": metrics[component_key]
})
if component_data:
st.subheader("ESG Component Comparison")
# Create grouped bar chart
component_df = pd.DataFrame(component_data)
fig = px.bar(
component_df,
x="Symbol",
y="Score",
color="Component",
barmode="group",
title="ESG Component Comparison",
labels={"Score": "Score (0-100)"},
text="Score",
text_auto=".1f"
)
# Add reference line for good score
fig.add_hline(y=50, line_dash="dash", line_color="gray")
st.plotly_chart(fig, use_container_width=True)
# Create radar chart for ESG component comparison
radar_data = []
for symbol in esg_data.keys():
symbol_data = {"Symbol": symbol}
for component in components:
component_key = component.lower() + "_score"
if esg_data[symbol].get(component_key) is not None:
symbol_data[component] = esg_data[symbol][component_key]
else:
symbol_data[component] = 0
radar_data.append(symbol_data)
if radar_data:
# Create radar chart
radar_df = pd.DataFrame(radar_data)
# Fill NaN values with 0
radar_df = radar_df.fillna(0)
fig = go.Figure()
for i, row in radar_df.iterrows():
symbol = row["Symbol"]
fig.add_trace(go.Scatterpolar(
r=[row.get(c, 0) for c in components],
theta=components,
fill="toself",
name=symbol
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 100]
)
),
title="ESG Component Radar Chart",
showlegend=True
)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("No ESG data available for the selected ETFs.")
st.info("""
This could be due to:
1. ESG data is not available for the selected ETFs
2. Your API subscription level does not include ESG data
Try selecting different ETFs or check your API subscription level.
""")
# --- Main Application ---
def main():
# Initialize cache directory
setup_cache_dir()
# Title and Description
st.title("📊 ETF Analyzer")
st.write("Comprehensive ETF Analysis Tool for Investment Research")
# Sidebar
st.sidebar.header("Settings")
# API Key Input
api_key = st.sidebar.text_input(
"FMP API Key",
value=st.session_state.get("fmp_api_key", ""),
type="password",
help="Enter your Financial Modeling Prep API key."
)
# If API key provided, update in session state and environment
if api_key:
st.session_state.fmp_api_key = api_key
os.environ["FMP_API_KEY"] = api_key
# Force refresh toggle
st.session_state.force_refresh = st.sidebar.checkbox(
"Force refresh data (ignore cache)",
value=st.session_state.get("force_refresh", False),
help="When enabled, always fetch fresh data from APIs"
)
# Debug mode toggle
st.session_state.debug_mode = st.sidebar.checkbox(
"Debug Mode",
value=st.session_state.get("debug_mode", False),
help="Show detailed API request and response information"
)
# Track API calls
if "api_calls" not in st.session_state:
st.session_state.api_calls = 0
st.sidebar.write(f"API calls this session: {st.session_state.api_calls}")
# Navigation
st.sidebar.header("Navigation")
# Initialize current tab if not in session state
if "current_tab" not in st.session_state:
st.session_state.current_tab = "search"
# Navigation buttons
if st.sidebar.button("🔍 ETF Search", key="nav_search"):
st.session_state.current_tab = "search"
st.rerun()
if st.sidebar.button("📊 ETF Analysis", key="nav_analysis"):
if "selected_etf" in st.session_state:
st.session_state.current_tab = "analysis"
st.rerun()
else:
st.sidebar.warning("Please select an ETF first.")
if st.sidebar.button("🔄 ETF Comparison", key="nav_comparison"):
st.session_state.current_tab = "comparison"
st.rerun()
if st.sidebar.button("📤 Export Data", key="nav_export"):
st.session_state.current_tab = "export"
st.rerun()
# Add API test button
if st.sidebar.button("🔌 Test API Connection", key="nav_test_api"):
st.session_state.current_tab = "test_api"
st.rerun()
# Display the selected tab content
if st.session_state.current_tab == "search":
display_etf_search()
elif st.session_state.current_tab == "analysis" and "selected_etf" in st.session_state:
display_etf_analysis(st.session_state.selected_etf)
elif st.session_state.current_tab == "comparison":
display_comparison()
elif st.session_state.current_tab == "export":
display_export()
elif st.session_state.current_tab == "test_api":
test_api_connection()
else:
display_etf_search()
if __name__ == "__main__":
main()