Source code for kdiagram.plot.anomaly

# Author: LKouadio <etanoyau@gmail.com>
# License: Apache License 2.0

from __future__ import annotations

import warnings
from typing import Any, Literal

import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.axes import Axes
from matplotlib.colors import Normalize
from matplotlib.lines import Line2D

from ..compat.matplotlib import get_cmap, get_colors
from ..decorators import check_non_emptiness, isdf
from ..metrics import clustered_anomaly_severity
from ..utils.generic_utils import get_valid_kwargs
from ..utils.plot import (
    Acov,
    map_theta_to_span,
    resolve_polar_span,
    set_axis_grid,
    setup_polar_axes,
)
from ..utils.validator import exist_features

__all__ = [
    "plot_anomaly_severity",
    "plot_anomaly_profile",
    "plot_anomaly_glyphs",
    "plot_cas_profile",
    "plot_glyphs",
    "plot_cas_layers",
]


[docs] @check_non_emptiness @isdf def plot_anomaly_severity( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, window_size: int = 21, title: str | None = None, figsize: tuple[float, float] = (9.0, 9.0), cmap: str = "plasma", s: int = 40, alpha: float = 0.8, acov: str = "default", mask_angle: bool = True, mask_radius: bool = False, show_grid: bool = True, grid_props: dict = None, ax: Axes | None = None, savefig: str | None = None, **kwargs: Any, ) -> Axes | None: required_cols = [actual_col, q_low_col, q_up_col] exist_features(df, features=required_cols) data = df[required_cols].dropna().copy() if data.empty: warnings.warn("DataFrame is empty after dropping NaNs.", stacklevel=2) return None # 1. Calculate the score and get detailed data cas_score, details = clustered_anomaly_severity( data[actual_col].to_numpy(), data[q_low_col].to_numpy(), data[q_up_col].to_numpy(), window_size=window_size, return_details=True, ) anomalies = details[details["is_anomaly"]].copy() if anomalies.empty: warnings.warn("No anomalies detected in the data.", stacklevel=2) return None # 2. Set up polar axes and angular mapping fig, ax, span = setup_polar_axes(ax, acov=acov, figsize=figsize) theta = (anomalies.index.to_numpy() / len(data)) * float(span) # 3. Prepare visual mappings radii = anomalies["magnitude"].to_numpy() density = anomalies["local_density"].to_numpy() cmap_obj = get_cmap(cmap, default="plasma") norm = Normalize(vmin=density.min(), vmax=density.max()) colors = cmap_obj(norm(density)) # 4. Plot the anomalies under_mask = anomalies["type"] == "under" over_mask = anomalies["type"] == "over" if np.any(over_mask): ax.scatter( theta[over_mask], radii[over_mask], c=colors[over_mask], s=s, alpha=alpha, marker="o", label="Over-prediction (Risk Underestimated)", edgecolor="k", linewidth=0.5, ) if np.any(under_mask): ax.scatter( theta[under_mask], radii[under_mask], c=colors[under_mask], s=s * 1.5, alpha=alpha, marker="X", label="Under-prediction (Risk Overestimated)", edgecolor="w", linewidth=0.5, ) # 5. Add formatting and interpretation aids title_str = title or ( f"Anomaly Severity Analysis\nCAS Score: {cas_score:.4f}" ) ax.set_title(title_str, fontsize=16, y=1.1) sm = cm.ScalarMappable(norm=norm, cmap=cmap_obj) cbar = fig.colorbar(sm, ax=ax, pad=0.1, shrink=0.7) cbar.set_label("Local Anomaly Density", fontsize=10) ax.set_ylabel("Anomaly Magnitude", labelpad=25) ax.set_ylim(bottom=0) ax.legend(loc="upper right", bbox_to_anchor=(1.4, 1.05)) set_axis_grid(ax, show_grid=show_grid, grid_props=grid_props) if mask_angle: ax.set_xticklabels([]) if mask_radius: ax.set_yticklabels([]) # fig.tight_layout() if savefig: fig.savefig(savefig, dpi=300, bbox_inches="tight") plt.close(fig) else: plt.show() return ax
plot_anomaly_severity.__doc__ = r""" Visualizes clustered anomaly severity using a polar scatter plot. This function creates a diagnostic plot to analyze forecast failures. It identifies anomalies where the true value falls outside the predicted interval and visualizes their location, magnitude, type, and clustering density in a single, compact polar view. Parameters ---------- df : pd.DataFrame The input DataFrame containing the actual and predicted quantile values. actual_col : str The name of the column containing the true observed values. q_low_col : str The name of the column for the lower bound of the prediction interval. q_up_col : str The name of the column for the upper bound of the prediction interval. window_size : int, default=21 The size of the moving window used to calculate the local density of anomalies, which defines a "cluster". title : str, optional A custom title for the plot. If ``None``, a default title including the CAS score is generated. figsize : tuple of (float, float), default=(9.0, 9.0) The figure size in inches. cmap : str, default='plasma' The colormap for coloring points based on local density. s : int, default=40 The marker size for the scatter points. alpha : float, default=0.8 The transparency of the scatter points. acov : {'default', 'half_circle', 'quarter_circle', 'eighth_circle'},\ default='default' Specifies the angular coverage of the polar plot. mask_angle : bool, default=True If ``True``, hides the angular tick labels (e.g., degrees). mask_radius : bool, default=False If ``True``, hides the radial tick labels. ax : matplotlib.axes.Axes, optional An existing polar axes to draw the plot on. If ``None``, a new figure and axes are created. savefig : str, optional The file path to save the plot. If ``None``, the plot is displayed interactively. Returns ------- ax : matplotlib.axes.Axes or None The Matplotlib Axes object containing the plot, or ``None`` if no anomalies are detected in the data. See Also -------- plot_anomaly_profile : A stylized "fiery ring" version of this plot. plot_anomaly_glyphs : A version using informative glyphs instead of dots. clustered_anomaly_severity_score : The underlying metric function. Notes ----- This plot visualizes the four key dimensions of forecast failures as described in :footcite:t:`kouadioc2025`. **Visual Mapping:** - **Angle (:math:`varepsilon`)**: The sample index, showing *where* in the dataset the failure occurred. - **Radius (`r`)**: The **Anomaly Magnitude**—the distance from the true value to the nearest violated interval bound. Larger radii indicate more severe failures. - **Color**: The **Local Anomaly Density**. Hotter colors indicate the anomaly is part of a dense cluster of other failures. - **Marker Shape**: The **Type** of anomaly. - 'o' (circle): Over-prediction (risk was underestimated). - 'X': Under-prediction (risk was overestimated). The title of the plot automatically includes the overall Clustered Anomaly Severity (CAS) score for a quantitative summary. References ---------- .. footbibliography:: Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_anomaly_severity >>> >>> # Simulate data with a cluster of severe failures >>> np.random.seed(0) >>> n_samples = 400 >>> y_true = 100 + 20 * np.sin(np.linspace(0, 4*np.pi, n_samples)) >>> y_qlow = y_true - 10 >>> y_qup = y_true + 10 >>> y_true[100:140] = y_qup[100:140] + np.random.uniform(10, 25, 40) >>> >>> df = pd.DataFrame({ ... "actual": y_true, "q10": y_qlow, "q90": y_qup ... }) >>> >>> ax = plot_anomaly_severity( ... df, ... actual_col="actual", ... q_low_col="q10", ... q_up_col="q90", ... window_size=31, ... title="Severity of Clustered Anomalies" ... ) """
[docs] @check_non_emptiness @isdf def plot_anomaly_profile( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, window_size: int = 21, theta_bins: int = 72, title: str | None = None, figsize: tuple[float, float] = (9.0, 9.0), cmap: str = "plasma", colors: list[str] = None, alpha: float = 0.8, acov: str = "default", show_grid: bool = True, grid_props: dict = None, ax: Axes | None = None, savefig: str | None = None, jitter: float = 0.85, max_flares_per_bin: int | None = None, flare_scale: str = "sqrt", flare_clip: float | None = None, flare_linewidth: float = 1.4, ring_height: float = 0.06, ring_alpha: float = 0.95, legend_anchor: tuple[float, float] = (1.35, 1.04), **kwargs: Any, ) -> Axes | None: # ---- data & metric req = [actual_col, q_low_col, q_up_col] data = df[req].dropna().copy() if data.empty: warnings.warn( "DataFrame is empty after dropping NaNs.", stacklevel=2, ) return None cas_score, details = clustered_anomaly_severity( data[actual_col].to_numpy(), data[q_low_col].to_numpy(), data[q_up_col].to_numpy(), window_size=window_size, return_details=True, ) anomalies = details[details["is_anomaly"]].copy() if anomalies.empty: warnings.warn( "No anomalies detected in the data.", stacklevel=2, ) return None # ---- axes fig, ax, span = setup_polar_axes(ax, acov=acov, figsize=figsize) # ---- angle mapping & binning # angle ~ index position in [0, span) anomalies["theta"] = (anomalies.index.to_numpy() / len(data)) * float( span ) bin_edges = np.linspace(0, float(span), theta_bins + 1) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 bin_width = float(span) / theta_bins # integer bin ids in [0, theta_bins) bin_id = ( np.digitize( anomalies["theta"].to_numpy(), bin_edges, right=False, ) - 1 ) bin_id = np.clip(bin_id, 0, theta_bins - 1) anomalies["theta_bin_id"] = bin_id # ---- ring (density heat) cmap_obj = get_cmap(cmap, default="plasma") vmin = details["local_density"].min() vmax = details["local_density"].max() norm = Normalize(vmin=vmin, vmax=vmax) ring_density = anomalies.groupby("theta_bin_id", observed=False)[ "local_density" ].mean() ax.bar( bin_centers[ring_density.index], height=ring_height, width=bin_width, bottom=1.0, color=cmap_obj(norm(ring_density.values)), alpha=ring_alpha, linewidth=0, ) # ---- flare helpers def _scale_mag(x: np.ndarray) -> np.ndarray: # map magnitude -> radial length x = np.asarray(x) if flare_scale == "linear": out = x.copy() elif flare_scale == "sqrt": out = np.sqrt(np.maximum(x, 0.0)) elif flare_scale == "log": out = np.log1p(np.maximum(x, 0.0)) else: raise ValueError("flare_scale must be {'linear','sqrt','log'}") if flare_clip is not None: out = np.minimum(out, float(flare_clip)) return out # safe jitter jitter = float(np.clip(jitter, 0.0, 1.0)) # pick colors (over, under) over_color, under_color = get_colors( 2, colors=colors or ["#E74C3C", "#3498DB"], cmap=cmap, default="tab10", failsafe="discrete", ) # ---- flares with in-bin angular dodging for b in range(theta_bins): for typ, base_r, color in ( ("over", 1.05, over_color), ("under", 1.00, under_color), ): sub = anomalies[ (anomalies["theta_bin_id"] == b) & (anomalies["type"] == typ) ] if sub.empty: continue # limit count per bin (keep largest) if ( max_flares_per_bin is not None and len(sub) > max_flares_per_bin ): sub = sub.nlargest(max_flares_per_bin, "magnitude") # small first, big last (nice layering) sub = sub.sort_values("magnitude", ascending=True) # angular offsets inside the bin n = len(sub) if n == 1: thetas = np.array([bin_centers[b]]) else: spread = bin_width * jitter offsets = np.linspace(-spread / 2.0, spread / 2.0, n) thetas = bin_centers[b] + offsets mags = _scale_mag(sub["magnitude"].to_numpy()) # draw flares if typ == "over": for th, m in zip(thetas, mags): ax.plot( [th, th], [base_r, base_r + m], color=color, linewidth=flare_linewidth, alpha=alpha, solid_capstyle="round", zorder=3, ) else: for th, m in zip(thetas, mags): ax.plot( [th, th], [base_r, max(0.0, base_r - m)], color=color, linewidth=flare_linewidth, alpha=alpha, solid_capstyle="round", zorder=3, ) # ---- formatting ttl = title or (f"Anomaly Severity Profile\nCAS: {cas_score:.4f}") ax.set_title(ttl, fontsize=16, y=1.04) ax.set_yticklabels([]) ax.set_xticklabels([]) ax.spines["polar"].set_visible(False) set_axis_grid(ax, show_grid=show_grid, grid_props=grid_props) # legend outside (no overlap) legend_elems = [ Line2D( [0], [0], color=over_color, lw=2, label="Over-prediction Flare", ), Line2D( [0], [0], color=under_color, lw=2, label="Under-prediction Flare", ), ] ax.legend( handles=legend_elems, loc="upper right", bbox_to_anchor=legend_anchor, ) # colorbar for ring density sm = cm.ScalarMappable(norm=norm, cmap=cmap_obj) cbar = fig.colorbar(sm, ax=ax, pad=0.1, shrink=0.7) cbar.set_label( "Local Anomaly Density (Ring Color)", fontsize=10, ) # radial limit: ring + longest scaled flare max_len = _scale_mag(anomalies["magnitude"].to_numpy()).max() ax.set_ylim(0.0, 1.1 + max_len) if savefig: fig.savefig(savefig, dpi=300, bbox_inches="tight") plt.close(fig) else: plt.show() return ax
plot_anomaly_profile.__doc__ = r""" Visualize anomaly severity as a polar profile or "fiery ring". This figure emphasizes readability for papers. It encodes clustered anomaly density as a colored ring, and shows each failed sample as a short "flare" growing inward or outward. The design avoids overlap by angular binning and in–bin dodging. Parameters ---------- df : pandas.DataFrame Input table that holds the observed series and the two prediction bounds. Missing rows are dropped. actual_col : str Column name of the observed values. q_low_col : str Column name of the lower prediction bound. q_up_col : str Column name of the upper prediction bound. window_size : int, default=21 Window length used to compute the local anomaly density. Odd values are recommended for symmetric windows. theta_bins : int, default=72 Number of angular bins used for the density ring and to group flares for anti-overlap dodging. title : str, optional Figure title. If not given, a title including the CAS score is used. figsize : tuple of float, default=(9.0, 9.0) Figure size in inches. cmap : str, default='plasma' Colormap used for the density ring. colors : list of str, optional Two colors for the flares (over, under). If not given, defaults are chosen. alpha : float, default=0.8 Global alpha for flare lines. acov : {'default','half_circle','quarter_circle', 'eighth_circle'}, default='default' Angular coverage preset. This controls the polar span. show_grid : bool, default=True If True, show a light polar grid. grid_props : dict, optional Keyword arguments forwarded to the grid styling helper. ax : matplotlib.axes.Axes, optional Existing polar axes. If None, a new figure is created. savefig : str, optional Path to save the figure. If None, the figure is shown. jitter : float, default=0.85 Fraction of bin width used to spread flares within each bin to reduce overlap. Clipped to [0, 1]. max_flares_per_bin : int, optional If given, at most this many flares are drawn per bin. The largest magnitudes are kept. flare_scale : {'linear','sqrt','log'}, default='sqrt' Transform applied to anomaly magnitude before mapping to flare length. Use 'sqrt' or 'log' to tame outliers. flare_clip : float, optional Maximum flare length after scaling. If None, no clipping is applied. flare_linewidth : float, default=1.4 Line width of the flares. ring_height : float, default=0.06 Radial thickness of the density ring. ring_alpha : float, default=0.95 Alpha value for the density ring. legend_anchor : tuple of float, default=(1.35, 1.04) Anchor for the legend box (axes coordinates). Returns ------- ax : matplotlib.axes.Axes or None The polar axes with the plot. Returns None if no anomaly is detected after preprocessing. Notes ----- **Visual mapping.** - Angle :math:`\varepsilon`: encodes sample position (index order). - Ring color: mean local anomaly density within each angular bin. Hot colors indicate clustered failures. - Flares: one per failed sample. - Length: anomaly magnitude (scaled and optionally clipped). - Direction: type. Outward = over-prediction. Inward = under-prediction. **Anomalies.** A point is an anomaly when the observed value lies outside the prediction interval. Density is computed with a moving window of length ``window_size`` and then averaged within bins. **Styling and overlap control.** Angular binning plus in-bin jitter reduce overlap. Use ``theta_bins`` to raise angular resolution, ``jitter`` to control spread, ``max_flares_per_bin`` to cap clutter, and ``flare_scale`` or ``flare_clip`` to keep lengths balanced. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_anomaly_profile >>> rng = np.random.default_rng(30) >>> n = 500 >>> base = np.sin(np.linspace(0, 6*np.pi, n)) * 10 + 20 >>> qlow = base - 5 >>> qup = base + 5 >>> y = base.copy() >>> y[100:130] += rng.uniform(6, 12, 30) # over >>> y[300:330] -= rng.uniform(6, 12, 30) # under >>> df = pd.DataFrame({'actual': y, 'q10': qlow, 'q90': qup}) >>> ax = plot_anomaly_profile( ... df, ... actual_col='actual', ... q_low_col='q10', ... q_up_col='q90', ... window_size=31, ... theta_bins=96, ... jitter=0.9, ... ) >>> _ = ax.figure # keep handle for saving outside See Also -------- plot_anomaly_severity : Polar scatter of anomaly points. plot_anomaly_glyphs : Glyph-based variant with richer marks. clustered_anomaly_severity : Metric used by this plot. References ---------- .. [1] Kouadio, K. L., et al. 2025. CAS: Cluster-Aware Scoring for Probabilistic Forecasts. in review. .. [2] Gneiting, T., and Raftery, A. E. 2007. Strictly proper scoring rules, prediction, and estimation. JASA, 102(477), 359–378. """ @check_non_emptiness @isdf def plot_anomaly_glyphs( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, window_size: int = 21, title: str | None = None, figsize: tuple[float, float] = (9.0, 9.0), cmap: str = "inferno", s: int = 70, alpha: float = 0.85, acov: str = "default", mask_angle: bool = True, mask_radius: bool = False, show_grid: bool = True, grid_props: dict = None, ax: Axes | None = None, savefig: str | None = None, **kwargs: Any, ) -> Axes | None: required_cols = [actual_col, q_low_col, q_up_col] data = df[required_cols].dropna().copy() if data.empty: warnings.warn("DataFrame is empty after dropping NaNs.", stacklevel=2) return None cas_score, details = clustered_anomaly_severity( data[actual_col].to_numpy(), data[q_low_col].to_numpy(), data[q_up_col].to_numpy(), window_size=window_size, return_details=True, ) anomalies = details[details["is_anomaly"]].copy() if anomalies.empty: warnings.warn("No anomalies detected in the data.", stacklevel=2) return None fig, ax, span = setup_polar_axes(ax, acov=acov, figsize=figsize) theta = (anomalies.index.to_numpy() / len(data)) * float(span) # Visual Mappings radii = anomalies["magnitude"].to_numpy() density = anomalies["local_density"].to_numpy() cmap_obj = get_cmap(cmap, default="inferno") norm = Normalize(vmin=density.min(), vmax=density.max()) colors = cmap_obj(norm(density)) # Plot Glyphs in two passes (for under- and over-predictions) under_mask = anomalies["type"] == "under" over_mask = anomalies["type"] == "over" # Over-prediction glyphs (outward triangles) if np.any(over_mask): ax.scatter( theta[over_mask], radii[over_mask], c=colors[over_mask], s=s, alpha=alpha, marker="^", # Triangle pointing up (radially outward) label="Over-prediction (Risk Underestimated)", edgecolor="w", linewidth=0.5, ) # Under-prediction glyphs (inward triangles) if np.any(under_mask): ax.scatter( theta[under_mask], radii[under_mask], c=colors[under_mask], s=s, alpha=alpha, marker="v", # Triangle pointing down (radially inward) label="Under-prediction (Risk Overestimated)", edgecolor="w", linewidth=0.5, ) # Formatting title_str = ( title or f"Polar Anomaly Glyph Plot\nCAS Score: {cas_score:.4f}" ) ax.set_title(title_str, fontsize=16, y=1.1) sm = cm.ScalarMappable(norm=norm, cmap=cmap_obj) cbar = fig.colorbar(sm, ax=ax, pad=0.1, shrink=0.7) cbar.set_label("Local Anomaly Density (Glyph Color)", fontsize=10) ax.set_ylabel("Anomaly Magnitude (Glyph Radius)", labelpad=25) ax.set_ylim(bottom=0) ax.legend(loc="upper right", bbox_to_anchor=(1.4, 1.05)) set_axis_grid(ax, show_grid=show_grid, grid_props=grid_props) if mask_angle: ax.set_xticklabels([]) if mask_radius: ax.set_yticklabels([]) fig.tight_layout() if savefig: fig.savefig(savefig, dpi=300, bbox_inches="tight") plt.close(fig) else: plt.show() return ax plot_anomaly_glyphs.__doc__ = r""" Visualizes anomaly severity using polar glyphs. This function creates a highly informative diagnostic plot where each forecast failure (anomaly) is represented by a glyph (a custom symbol). The glyph's properties—location, size, shape, and color—encode the key characteristics of the anomaly, offering a clear and scientifically rigorous visualization. Parameters ---------- df : pd.DataFrame The input DataFrame containing the actual and predicted quantile values. actual_col : str The name of the column containing the true observed values. q_low_col : str The name of the column for the lower bound of the prediction interval. q_up_col : str The name of the column for the upper bound of the prediction interval. window_size : int, default=21 The size of the moving window used to calculate the local anomaly density, which determines the glyph color. title : str, optional A custom title for the plot. If ``None``, a default title including the CAS score is generated. figsize : tuple of (float, float), default=(9.0, 9.0) The figure size in inches. cmap : str, default='inferno' The colormap for coloring glyphs based on local density. s : int, default=70 The marker size for the glyphs. alpha : float, default=0.85 The transparency of the glyphs. acov : {'default', 'half_circle', 'quarter_circle', 'eighth_circle'}, default='default' Specifies the angular coverage of the polar plot. mask_angle : bool, default=True If ``True``, hides the angular tick labels. ax : matplotlib.axes.Axes, optional An existing polar axes to draw the plot on. If ``None``, a new figure and axes are created. savefig : str, optional The file path to save the plot. If ``None``, the plot is displayed interactively. Returns ------- ax : matplotlib.axes.Axes or None The Matplotlib Axes object containing the plot, or ``None`` if no anomalies are detected in the data. See Also -------- plot_anomaly_severity : A version using simple scatter dots. plot_anomaly_profile : A stylized "fiery ring" version of this plot. clustered_anomaly_severity_score : The underlying metric function. Notes ----- This plot uses a glyph-based approach to encode multiple dimensions of information for each forecast failure, as proposed in the framework of :footcite:t:`kouadiob2025`. **Visual Mapping (Glyph Properties):** - **Angle (`θ`)**: The sample index, showing *where* in the dataset the failure occurred. - **Radius (`r`)**: The **Anomaly Magnitude**. Glyphs farther from the center are more severe failures. - **Color**: The **Local Anomaly Density**. Hotter colors indicate the anomaly is part of a dense cluster. - **Shape**: The **Type** of anomaly, using an intuitive metaphor: - `▲` (up-triangle): Over-prediction (risk "escaping" upward). - `▼` (down-triangle): Under-prediction (risk "collapsing" inward). References ---------- .. footbibliography:: Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_anomaly_glyphs >>> >>> # Simulate data with a cluster of severe failures >>> np.random.seed(0) >>> n_samples = 400 >>> y_true = 100 + 20 * np.sin(np.linspace(0, 4*np.pi, n_samples)) >>> y_qlow = y_true - 10 >>> y_qup = y_true + 10 >>> y_true[100:140] = y_qup[100:140] + np.random.uniform(10, 25, 40) >>> >>> df = pd.DataFrame({ ... "actual": y_true, "q10": y_qlow, "q90": y_qup ... }) >>> >>> ax = plot_anomaly_glyphs( ... df, ... actual_col="actual", ... q_low_col="q10", ... q_up_col="q90", ... window_size=31, ... title="Glyph Plot of Anomaly Hotspot" ... ) """
[docs] @check_non_emptiness @isdf def plot_cas_profile( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, window_size: int = 21, title: str | None = None, figsize: tuple[float, float] = (12.0, 6.0), cmap: str = "plasma", s: int = 60, alpha: float = 0.85, ax: Axes | None = None, savefig: str | None = None, **kwargs: Any, ) -> Axes | None: required_cols = [actual_col, q_low_col, q_up_col] data = df[required_cols].dropna().copy() if data.empty: warnings.warn("DataFrame is empty after dropping NaNs.", stacklevel=2) return None cas_score, details = clustered_anomaly_severity( data[actual_col].to_numpy(), data[q_low_col].to_numpy(), data[q_up_col].to_numpy(), window_size=window_size, return_details=True, ) anomalies = details[details["is_anomaly"]].copy() if anomalies.empty: warnings.warn("No anomalies detected in the data.", stacklevel=2) return None # Use a standard Cartesian axis if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.figure # Visual Mappings x_coords = anomalies.index.to_numpy() y_coords = anomalies["magnitude"].to_numpy() density = anomalies["local_density"].to_numpy() cmap_obj = get_cmap(cmap, default="plasma") norm = Normalize(vmin=density.min(), vmax=density.max()) colors = cmap_obj(norm(density)) # Plot Glyphs in two passes for different markers under_mask = anomalies["type"] == "under" over_mask = anomalies["type"] == "over" if np.any(over_mask): ax.scatter( x_coords[over_mask], y_coords[over_mask], c=colors[over_mask], s=s, alpha=alpha, marker="^", label="Over-prediction (Risk Underestimated)", edgecolor="k", linewidth=0.5, ) if np.any(under_mask): ax.scatter( x_coords[under_mask], y_coords[under_mask], c=colors[under_mask], s=s, alpha=alpha, marker="v", label="Under-prediction (Risk Overestimated)", edgecolor="k", linewidth=0.5, ) # Formatting title_str = ( title or f"Anomaly Severity Profile (CAS Score: {cas_score:.4f})" ) ax.set_title(title_str, fontsize=16) ax.set_xlabel("Sample Index", fontsize=12) ax.set_ylabel("Anomaly Magnitude", fontsize=12) ax.set_ylim(bottom=0) ax.margins(x=0.02) # Add a little padding to x-axis # Add a color bar for the cluster density sm = cm.ScalarMappable(norm=norm, cmap=cmap_obj) cbar = fig.colorbar(sm, ax=ax, pad=0.01) cbar.set_label("Local Anomaly Density", fontsize=10) ax.legend(loc="upper right") set_axis_grid(ax, show_grid=True) fig.tight_layout() if savefig: fig.savefig(savefig, dpi=300, bbox_inches="tight") plt.close(fig) else: plt.show() return ax
def _ensure_array_like(x): if x is None: return None, None if isinstance(x, str): return x, None if isinstance(x, (np.ndarray, pd.Series, list, tuple)): arr = np.asarray(x) return None, arr return None, None def _prepare_sort_values(df, sort_by): # Robustly resolve `sort_by` to a float array of length len(df). # Accepts: column name, array-like, or None. Handles dtype # casting (datetime, timedelta, categorical, bool, numeric, # object) and returns np.ndarray[float]. from pandas.api.types import ( # is_categorical_dtype as _iscat, is_bool_dtype as _isbool, ) from pandas.api.types import ( is_datetime64_any_dtype as _isdt, ) from pandas.api.types import ( is_numeric_dtype as _isnum, ) from pandas.api.types import ( is_timedelta64_dtype as _istd, ) key, arr = _ensure_array_like(sort_by) def _resolve_key(k): # match exact, case-insensitive, or stripped/underscored cols = list(df.columns) if k in df: return k low = {str(c).lower(): c for c in cols} k_low = str(k).lower() if k_low in low: return low[k_low] def norm(s): return str(s).strip().lower().replace(" ", "_") norm_map = {norm(c): c for c in cols} k_norm = norm(k) if k_norm in norm_map: return norm_map[k_norm] raise KeyError(f"`sort_by` '{k}' not in DataFrame.") if key is not None: col = _resolve_key(key) s = df[col] elif arr is not None: vals = np.asarray(arr) if vals.shape[0] != len(df): raise ValueError("`sort_by` length mismatch with DataFrame.") s = pd.Series(vals, index=df.index, copy=False) else: return np.arange(len(df), dtype=float) # ---- dtype-specific coercions to float array if _isdt(s) or _istd(s): # ns since epoch vals = s.astype("int64").to_numpy("int64", copy=False) return vals.astype(float, copy=False) if isinstance(s.dtype, pd.CategoricalDtype): codes = s.cat.codes.to_numpy() vals = codes.astype(float, copy=False) vals[codes == -1] = np.nan return vals if _isbool(s): return s.astype(float, copy=False).to_numpy() if _isnum(s): return s.astype(float, copy=False).to_numpy() # object or mixed: try datetime, then numeric, else factorize td = pd.to_datetime(s, errors="coerce") if _isdt(td) and getattr(td, "notna", lambda: np.array([]))().any(): vals = td.view("int64").to_numpy() return vals.astype(float, copy=False) vals = pd.to_numeric(s, errors="coerce").to_numpy() if np.isnan(vals).all(): codes, _ = pd.factorize(s, sort=True) vals = codes.astype(float, copy=False) vals[codes == -1] = np.nan return vals return vals.astype(float, copy=False) plot_cas_profile.__doc__ = r""" Visualizes clustered anomaly severity on a Cartesian plot. This function creates a non-polar "profile" of forecast failures. It is highly effective for sequential data (like time series) where the x-axis represents the sample index or time. It visualizes an anomaly's location, magnitude, type, and clustering density. Parameters ---------- df : pd.DataFrame The input DataFrame containing the actual and predicted quantile values. actual_col : str The name of the column containing the true observed values. q_low_col : str The name of the column for the lower bound of the prediction interval. q_up_col : str The name of the column for the upper bound of the prediction interval. window_size : int, default=21 The size of the moving window used to calculate the local anomaly density, which determines the point color. title : str, optional A custom title for the plot. If ``None``, a default title including the CAS score is generated. figsize : tuple of (float, float), default=(12.0, 6.0) The figure size in inches. cmap : str, default='plasma' The colormap for coloring points based on local density. s : int, default=60 The marker size for the points. alpha : float, default=0.85 The transparency of the points. ax : matplotlib.axes.Axes, optional An existing Cartesian axes to draw the plot on. If ``None``, a new figure and axes are created. savefig : str, optional The file path to save the plot. If ``None``, the plot is displayed interactively. Returns ------- ax : matplotlib.axes.Axes or None The Matplotlib Axes object containing the plot, or ``None`` if no anomalies are detected in the data. See Also -------- plot_anomaly_glyphs : A polar version using informative glyphs. clustered_anomaly_severity_score : The underlying metric function. Notes ----- This plot provides a direct, sequential view of forecast failures, making it easy to spot trends or regime changes in model performance over time :footcite:p:`kouadiob2025`. **Visual Mapping:** - **X-axis**: The sample index, showing *when* or *where* in the sequence the failure occurred. - **Y-axis**: The **Anomaly Magnitude**. The height of a point shows its severity. - **Color**: The **Local Anomaly Density**. Hotter colors show "hotspots" where failures are concentrated. - **Shape**: The **Type** of anomaly. - `▲` (up-triangle): Over-prediction (risk underestimated). - `▼` (down-triangle): Under-prediction (risk overestimated). References ---------- .. footbibliography:: Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_anomaly_profile_cartesian >>> >>> # Simulate data with a failure hotspot in the middle >>> np.random.seed(0) >>> n_samples = 400 >>> y_true = np.zeros(n_samples) >>> y_qlow = y_true - 10 >>> y_qup = y_true + 10 >>> y_true[180:220] = y_qup[180:220] + np.random.uniform(5, 15, 40) >>> >>> df = pd.DataFrame({ ... "actual": y_true, "q10": y_qlow, "q90": y_qup ... }) >>> >>> ax = plot_anomaly_profile_cartesian( ... df, ... actual_col="actual", ... q_low_col="q10", ... q_up_col="q90", ... title="Cartesian Anomaly Severity Profile" ... ) """
[docs] def plot_glyphs( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, sort_by=None, window_size: int = 21, title: str | None = None, figsize: tuple[float, float] = (9.0, 9.0), cmap: str = "inferno", s: int = 70, alpha: float = 0.85, acov: Acov = "default", mask_angle: bool = True, mask_radius: bool = False, show_grid: bool = True, grid_props: dict | None = None, radius: str = "severity", color_by: str = "local_density", vmin: float | None = None, vmax: float | None = None, zero_at: Literal["N", "E", "S", "W"] = "N", clockwise: bool = True, ax: Axes | None = None, savefig: str | None = None, **kwargs, ) -> Axes | None: # Compute CAS details _, details = clustered_anomaly_severity( actual_col, q_low_col, q_up_col, data=df, window_size=window_size, return_details=True, ) # Prepare sort axis → theta if isinstance(sort_by, str): exist_features(df, features=sort_by) sort_by = df[sort_by].values sort_vals = _prepare_sort_values( df[[actual_col, q_low_col, q_up_col]].assign( __idx=np.arange(len(df)) ), sort_by, ) span = resolve_polar_span(acov) theta = map_theta_to_span( sort_vals, span=span, data_min=np.nanmin(sort_vals), data_max=np.nanmax(sort_vals), ) # Choose radial field valid_r = ("magnitude", "local_density", "severity") if radius not in valid_r: raise ValueError(f"`radius` must be in {valid_r}. Got {radius!r}.") r = details[radius].to_numpy(dtype=float) # Optionally mask non-anomalies is_anom = details["is_anomaly"].to_numpy() if mask_angle: theta_plot = np.where(is_anom, theta, np.nan) else: theta_plot = theta.copy() if mask_radius: r_plot = np.where(is_anom, r, 0.0) else: r_plot = r.copy() # Normalize radius to [0, 1] for nicer scaling r_max = float(np.nanmax(r_plot)) if np.isfinite(r_plot).any() else 1.0 if r_max <= 0: r_max = 1.0 r_plot = r_plot / r_max # Color mapping if color_by not in details.columns: raise ValueError( f"`color_by` must be a details column. " f"Available: {list(details.columns)}" ) cvals = details[color_by].to_numpy(dtype=float) if vmin is None: vmin = np.nanmin(cvals) if np.isfinite(cvals).any() else 0.0 if vmax is None: vmax = np.nanmax(cvals) if np.isfinite(cvals).any() else 1.0 # Setup polar axes fig, ax, _ = setup_polar_axes( ax, acov=acov, figsize=figsize, zero_at=zero_at, clockwise=clockwise, ) if title: ax.set_title(title, fontsize=13, pad=10.0) set_axis_grid(ax, show_grid=show_grid, grid_props=grid_props) # Markers by anomaly type typ = details["type"].astype(str).to_numpy() mk_over = "^" mk_under = "v" # mk_none = "o" # Base scatter for non-anomalies (if not masked) if not mask_angle or not mask_radius: sel = ~is_anom if sel.any(): ax.scatter( theta_plot[sel], r_plot[sel], s=max(int(s * 0.45), 10), c=cvals[sel], vmin=vmin, vmax=vmax, cmap=cmap, alpha=max(min(alpha * 0.65, 1.0), 0.1), linewidths=0.0, **get_valid_kwargs(ax.scatter, kwargs), ) # Over anomalies sel_over = is_anom & (typ == "over") if sel_over.any(): ax.scatter( theta_plot[sel_over], r_plot[sel_over], s=s, c=cvals[sel_over], vmin=vmin, vmax=vmax, cmap=cmap, marker=mk_over, alpha=alpha, edgecolors="none", **get_valid_kwargs(ax.scatter, kwargs), ) # Under anomalies sel_under = is_anom & (typ == "under") if sel_under.any(): ax.scatter( theta_plot[sel_under], r_plot[sel_under], s=s, c=cvals[sel_under], vmin=vmin, vmax=vmax, cmap=cmap, marker=mk_under, alpha=alpha, edgecolors="none", **get_valid_kwargs(ax.scatter, kwargs), ) # Optional thin path of selected radius metric if kwargs.get("show_path", True): order = np.argsort(sort_vals) ax.plot( theta[order], r_plot[order], lw=1.25, alpha=0.85, color="tab:gray", ) # Colorbar mappable = plt.cm.ScalarMappable( cmap=get_cmap(cmap, default="inferno"), norm=plt.Normalize(vmin=vmin, vmax=vmax), ) cbar = plt.colorbar(mappable, ax=ax, pad=0.12, shrink=0.86) cbar.set_label(color_by.replace("_", " ")) # Legend handles = [ Line2D( [0], [0], marker=mk_over, linestyle="", label="over", markersize=max(np.sqrt(s) * 0.8, 4.0), color="black", ), Line2D( [0], [0], marker=mk_under, linestyle="", label="under", markersize=max(np.sqrt(s) * 0.8, 4.0), color="black", ), ] ax.legend( handles=handles, loc="upper right", bbox_to_anchor=(1.15, 1.15), frameon=False, title="Anomaly type", ) if mask_angle: ax.set_xticklabels([]) if mask_radius: ax.set_yticklabels([]) if savefig: fig.savefig(savefig, bbox_inches="tight") plt.close(fig) else: plt.show() return ax
def _as_array(x): import numpy as np import pandas as pd if x is None: return None if isinstance(x, str): return None if isinstance(x, (np.ndarray, pd.Series, list, tuple)): return np.asarray(x) return None def _order_index(df, sort_by): import numpy as np if sort_by is None: return np.arange(len(df)) if isinstance(sort_by, str): vals = df[sort_by].to_numpy() else: vals = _as_array(sort_by) if vals is None: raise ValueError("Invalid `sort_by`.") if vals.shape[0] != len(df): raise ValueError("`sort_by` length mismatch.") # datetime friendly if hasattr(vals, "dtype") and str(vals.dtype).startswith("datetime"): vals = vals.astype("datetime64[ns]").astype("int64") idx = np.argsort(vals) return idx, vals plot_glyphs.__doc__ = r""" Visualizes anomaly characteristics using a polar glyph plot. This function creates a highly informative diagnostic plot where each data point is represented by a glyph (a custom symbol) on a polar axis. The glyph's properties—location, size, shape, and color—encode multiple dimensions of the data, offering a clear and scientifically rigorous visualization of forecast failures or other phenomena. Parameters ---------- df : pd.DataFrame The input DataFrame containing the actual and predicted quantile values. actual_col : str Name of the column containing the true observed values. q_low_col : str Name of the column for the lower bound of the prediction interval. q_up_col : str Name of the column for the upper bound of the prediction interval. sort_by : str or array-like, optional The feature used to order points around the angular axis. Can be a column name or an external array. If ``None``, the DataFrame's index is used, which is suitable for time series. window_size : int, default=21 The size of the moving window used to calculate the local anomaly density, which can be used for coloring. title : str, optional A custom title for the plot. figsize : tuple of (float, float), default=(9.0, 9.0) The figure size in inches. cmap : str, default='inferno' The colormap for coloring glyphs. s : int, default=70 The base marker size for the glyphs. alpha : float, default=0.85 The transparency of the glyphs. acov : {'default', 'half_circle', 'quarter_circle', 'eighth_circle'},\ default='default' Specifies the angular coverage of the polar plot. mask_angle : bool, default=True If ``True``, hides the angular tick labels. mask_radius : bool, default=False If ``True``, hides the radial tick labels. show_grid : bool, default=True If ``True``, displays the polar grid lines. grid_props : dict, optional Custom keyword arguments to style the grid. radius : {'magnitude', 'local_density', 'severity'}, default='magnitude' The data field to map to the radial coordinate (distance from the center). color_by : str, default='local_density' The data field to map to the glyph color. Must be a column in the details DataFrame returned by the CAS metric. vmin, vmax : float, optional The minimum and maximum values for the color normalization. If ``None``, they are inferred from the `color_by` data. zero_at : {'N', 'E', 'S', 'W'}, default='N' The direction for the 0° angle on the polar plot. clockwise : bool, default=True If ``True``, angles increase in the clockwise direction. ax : matplotlib.axes.Axes, optional An existing polar axes to draw the plot on. savefig : str, optional The file path to save the plot. Returns ------- ax : matplotlib.axes.Axes or None The Matplotlib Axes object containing the plot, or ``None`` if no anomalies are detected. See Also -------- clustered_anomaly_severity : The underlying metric function. plot_cas_layers : A Cartesian alternative showing anomaly layers. Notes ----- This plot uses a glyph-based approach to encode multiple dimensions of information for each forecast failure. **Visual Mapping (Glyph Properties):** - **Angle** (:math:`\varepsilon`): The position in the sequence, ordered by the `sort_by` parameter. - **Radius (`r`)**: The value of the column specified by `radius`, normalized to [0, 1]. A larger radius indicates a higher value for that metric. - **Color**: The value of the column specified by `color_by`. By default, hotter colors indicate a denser cluster of anomalies. - **Shape**: The **Type** of anomaly, using an intuitive metaphor: - `▲` (up-triangle): Over-prediction (risk "escaping" upward). - `▼` (down-triangle): Under-prediction (risk "collapsing" inward). The plot also includes a thin gray line (`show_path=True`) that traces the radial metric along the sorted angular path, helping to visualize trends. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_glyphs >>> >>> # Simulate data with a failure hotspot >>> np.random.seed(0) >>> n_samples = 200 >>> time = pd.to_datetime(pd.date_range( ... "2024-01-01", periods=n_samples) ... ) >>> y_true = 50 + 10 * np.sin(np.arange(n_samples) * np.pi / 50) >>> y_qlow = y_true - 5 >>> y_qup = y_true + 5 >>> y_true[80:100] = y_qup[80:100] + np.random.uniform(5, 10, 20) >>> >>> df = pd.DataFrame({ ... "time": time, "actual": y_true, ... "q10": y_qlow, "q90": y_qup ... }) >>> >>> ax = plot_glyphs( ... df, ... actual_col="actual", ... q_low_col="q10", ... q_up_col="q90", ... sort_by="time", ... radius="magnitude", ... color_by="local_density", ... title="Glyph Plot of Anomaly Hotspot" ... ) """
[docs] def plot_cas_layers( df: pd.DataFrame, actual_col: str, q_low_col: str, q_up_col: str, *, sort_by=None, window_size: int = 21, title: str | None = None, figsize: tuple[float, float] = (11.0, 6.5), show_severity: bool = True, show_density: bool = True, band_alpha: float = 0.18, anom_alpha: float = 0.85, base_alpha: float = 0.35, cmap: str = "inferno", mark_size: int = 36, show_grid: bool = True, grid_props: dict = None, lw: float = 1.4, ax: Axes | None = None, savefig: str | None = None, **kwargs, ) -> Axes | tuple[Axes, Axes] | None: score, details = clustered_anomaly_severity( actual_col, q_low_col, q_up_col, data=df, window_size=window_size, return_details=True, ) if isinstance(sort_by, str) or _as_array(sort_by) is not None: # order_idx, x_vals = _order_index(df, sort_by) # det = details.iloc[order_idx].reset_index(drop=True) # x = x_vals[order_idx] order_idx, x_vals = _order_index(df, sort_by) det = details.iloc[order_idx].reset_index(drop=True) # use ordinal x so the series spans 0..N-1 x = np.arange(len(det)) # keep the sorted categories for ticks later cats = np.asarray(x_vals)[order_idx] else: det = details.copy() x = np.arange(len(det)) y = det["y_true"].to_numpy(dtype=float) lo = det["y_qlow"].to_numpy(dtype=float) up = det["y_qup"].to_numpy(dtype=float) is_anom = det["is_anomaly"].to_numpy() typ = det["type"].astype(str).to_numpy() sev = det["severity"].to_numpy(dtype=float) dens = det["local_density"].to_numpy(dtype=float) # mag = det["magnitude"].to_numpy(dtype=float) vmin = float(np.nanmin(sev)) if np.isfinite(sev).any() else 0.0 vmax = float(np.nanmax(sev)) if np.isfinite(sev).any() else 1.0 if ax is None: if show_severity: fig, (ax, ax2) = plt.subplots( 2, 1, figsize=figsize, sharex=True, gridspec_kw={"height_ratios": [2.6, 1.0]}, ) else: fig, ax = plt.subplots(figsize=figsize) ax2 = None else: fig = ax.figure ax2 = None if isinstance(sort_by, str): s = pd.Series(cats) sizes = s.value_counts(sort=False) edges = np.r_[0, sizes.cumsum().to_numpy()] centers = (edges[:-1] + edges[1:]) / 2 labels = sizes.index.astype(str).tolist() ax.set_xlim(0, len(det)) ax.set_xticks(centers) ax.set_xticklabels(labels) for e in edges[1:-1]: ax.axvline(e, color="0.7", lw=0.8, ls="--", alpha=0.8) if ax2 is not None: ax2.axvline(e, color="0.7", lw=0.8, ls="--", alpha=0.8) set_axis_grid(ax, show_grid=show_grid, grid_props=grid_props) ax.fill_between( x, lo, up, color="0.2", alpha=band_alpha, linewidth=0.0, ) ax.plot( x, (lo + up) * 0.5, lw=lw * 0.8, color="0.35", alpha=base_alpha, ) ax.plot(x, y, lw=lw, color="0.05", alpha=0.95) cm = get_cmap(cmap, default="inferno") norm = plt.Normalize(vmin=vmin, vmax=vmax) sel_over = is_anom & (typ == "over") sel_under = is_anom & (typ == "under") ax.scatter( x[sel_over], y[sel_over], s=mark_size, c=sev[sel_over], cmap=cm, norm=norm, marker="^", alpha=anom_alpha, edgecolors="none", **get_valid_kwargs(ax.scatter, kwargs), ) ax.scatter( x[sel_under], y[sel_under], s=mark_size, c=sev[sel_under], cmap=cm, norm=norm, marker="v", alpha=anom_alpha, edgecolors="none", **get_valid_kwargs(ax.scatter, kwargs), ) cbar = fig.colorbar( plt.cm.ScalarMappable(norm=norm, cmap=cm), ax=ax, pad=0.02, fraction=0.04, ) cbar.set_label("severity") if title: ax.set_title(title, fontsize=13, pad=10.0) if show_severity: set_axis_grid(ax2, show_grid=show_grid, grid_props=grid_props) bar_c = cm(norm(sev)) ax2.vlines( x, 0.0, sev, colors=bar_c, linewidth=lw, alpha=0.95, ) if show_density: ax2.plot( x, dens, lw=lw * 1.1, color="0.1", alpha=0.65, ) ax2.set_ylabel("sev / dens") ax2.set_xlabel(kwargs.get("xlabel", "")) if savefig: fig.savefig(savefig, bbox_inches="tight") return (ax, ax2) if show_severity else ax
plot_cas_layers.__doc__ = r""" Visualizes anomaly severity in layered Cartesian coordinates. This function creates a highly informative, non-polar diagnostic plot that visualizes a forecast's prediction interval, the true values, and the calculated anomaly characteristics in layered panels. It is particularly effective for sequential data like time series. Parameters ---------- df : pd.DataFrame The input DataFrame containing the data. actual_col : str Name of the column containing the true observed values. q_low_col : str Name of the column for the lower bound of the prediction interval. q_up_col : str Name of the column for the upper bound of the prediction interval. sort_by : str or array-like, optional The feature used to order points along the x-axis. Can be a column name (e.g., a datetime column) or an external array. If ``None``, the DataFrame's index is used. window_size : int, default=21 The size of the moving window used to calculate the local anomaly density and severity. title : str, optional A custom title for the plot. figsize : tuple of (float, float), default=(11.0, 6.5) The figure size in inches. show_severity : bool, default=True If ``True``, a second panel is added below the main plot to visualize the per-sample severity scores. show_density : bool, default=True If ``True`` and `show_severity` is also ``True``, the local density line is overlaid in the bottom panel. band_alpha : float, default=0.18 Transparency of the shaded prediction interval band. anom_alpha : float, default=0.85 Transparency of the anomaly markers. cmap : str, default='inferno' The colormap for coloring anomaly markers and severity bars based on their severity score. mark_size : int, default=36 The size of the anomaly markers (`^` and `v`). show_grid : bool, default=True If ``True``, displays grid lines on the axes. grid_props : dict, optional Custom keyword arguments to style the grid. lw : float, default=1.4 The base line width for plotted lines. ax : matplotlib.axes.Axes, optional An existing Cartesian axes for the main plot. Note that the second panel (`ax2`) will be created new. savefig : str, optional The file path to save the plot. Returns ------- ax : matplotlib.axes.Axes The Axes object for the main plot. (ax, ax2) : tuple of (Axes, Axes) A tuple of both axes is returned if `show_severity` is True. None If the data is empty after handling NaNs. See Also -------- clustered_anomaly_severity : The underlying metric function. plot_glyphs : A polar alternative for visualizing anomalies. Notes ----- This plot decomposes the CAS diagnostic into layers, providing a clear, sequential view of model performance. **Top Panel (Forecast and Anomalies):** - A shaded gray area shows the prediction interval (`q_low_col` to `q_up_col`). - A dark line shows the true values (`actual_col`). - Anomalies are marked with colored triangles (`▲` for over-predictions, `▼` for under-predictions). The color intensity of the marker corresponds to its **severity score**. **Bottom Panel (Severity Breakdown):** - Vertical bars show the per-sample **severity score**, colored consistently with the markers above. - An optional black line shows the **local anomaly density**, highlighting the "hotspot" regions that contribute to high severity scores. References ---------- .. footbibliography:: Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kdiagram.plot.anomaly import plot_cas_layers >>> >>> # Simulate data with a failure hotspot in the middle >>> np.random.seed(0) >>> n_samples = 400 >>> x_axis = np.arange(n_samples) >>> y_true = 20 * np.sin(x_axis * np.pi / 100) >>> y_qlow = y_true - 10 >>> y_qup = y_true + 10 >>> # Introduce a cluster of severe failures >>> y_true[180:220] += np.random.uniform(12, 20, 40) >>> >>> df = pd.DataFrame({ ... "x": x_axis, "actual": y_true, ... "q10": y_qlow, "q90": y_qup ... }) >>> >>> axes = plot_cas_layers( ... df, ... actual_col="actual", ... q_low_col="q10", ... q_up_col="q90", ... sort_by="x", ... title="Layered CAS Diagnostic Profile" ... ) """