Source code for kdiagram.utils.mathext

#   License: Apache 2.0
#   Author: LKouadio <etanoyau@gmail.com>

from __future__ import annotations

from typing import Callable, Literal, Union, overload

import numpy as np
import pandas as pd
from scipy.stats import kstest, uniform

from .handlers import columns_manager
from .validator import exist_features, validate_length_range, validate_yy

FrameLike = Union[np.ndarray, pd.DataFrame, pd.Series]

__all__ = [
    "minmax_scaler",
    "compute_coverage_score",
    "compute_winkler_score",
    "build_cdf_interpolator",
    "calculate_calibration_error",
    "compute_pinball_loss",
    "compute_pit",
    "compute_crps",
    "get_forecast_arrays",
]


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: str,
    pred_cols: None = None,
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["numpy"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> np.ndarray: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: None,
    pred_cols: str,
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["numpy"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> np.ndarray: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: None,
    pred_cols: list[str],
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["numpy"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> np.ndarray: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: str,
    pred_cols: str | list[str],
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["numpy"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> tuple[np.ndarray, np.ndarray]: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: str,
    pred_cols: None = None,
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["pandas"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> pd.Series: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: None,
    pred_cols: str,
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["pandas"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> pd.Series: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: None,
    pred_cols: list[str],
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["pandas"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> pd.DataFrame: ...


@overload
def get_forecast_arrays(
    df: pd.DataFrame,
    actual_col: str,
    pred_cols: str | list[str],
    *,
    drop_na: bool = ...,
    na_policy: Literal["any", "all", "none"] = ...,
    fillna: object | None = ...,
    return_as: Literal["pandas"] = ...,
    squeeze: bool = ...,
    with_index: bool = ...,
    sort_index: bool = ...,
    dtype: object | None = ...,
    ensure_numeric: bool = ...,
    coerce_numeric: bool = ...,
    copy: bool = ...,
) -> tuple[pd.Series, pd.Series | pd.DataFrame]: ...


[docs] def get_forecast_arrays( df: pd.DataFrame, actual_col: str | None = None, pred_cols: str | list[str] | None = None, *, drop_na: bool = True, na_policy: Literal["any", "all", "none"] = "any", fillna: object | None = None, return_as: Literal["numpy", "pandas"] = "numpy", squeeze: bool = True, with_index: bool = False, sort_index: bool = False, dtype: object | None = None, ensure_numeric: bool = False, coerce_numeric: bool = False, copy: bool = True, ): if actual_col is None and pred_cols is None: raise ValueError( "Provide at least one of 'actual_col' or 'pred_cols'." ) # collect required columns cols: list[str] = [] if actual_col: cols.append(actual_col) pcols = columns_manager(pred_cols) or [] cols.extend(pcols) # validate presence exist_features(df, features=cols) # subset and optional copy/sort sub = df.loc[:, cols].copy() if copy else df.loc[:, cols] if sort_index: sub = sub.sort_index() # optional fill if fillna is not None: if fillna == "ffill": sub = sub.ffill() elif fillna == "bfill": sub = sub.bfill() else: sub = sub.fillna(fillna) # if fillna in ("ffill", "bfill"): # sub = sub.fillna(method=str(fillna)) # else: # sub = sub.fillna(fillna) # drop NA per policy if drop_na and na_policy != "none": how = "any" if na_policy == "any" else "all" sub = sub.dropna(how=how) # optional numeric enforcement if ensure_numeric: errors = "coerce" if coerce_numeric else "raise" for c in cols: sub[c] = pd.to_numeric(sub[c], errors=errors) # extract pieces y_true = sub[actual_col] if actual_col else None y_pred = None if pcols: y_pred = sub[pcols] # cast pandas dtypes if requested if return_as == "pandas" and dtype is not None: if y_true is not None: y_true = y_true.astype(dtype) if y_pred is not None: if isinstance(y_pred, pd.Series): y_pred = y_pred.astype(dtype) else: y_pred = y_pred.astype(dtype) # prepare NumPy outputs if return_as == "numpy": if y_true is not None: y_true = y_true.to_numpy(dtype=dtype) if y_pred is not None: arr = y_pred.to_numpy(dtype=dtype) if squeeze and arr.ndim == 2 and arr.shape[1] == 1: arr = arr.ravel() y_pred = arr # squeeze pandas single-column preds to Series if ( return_as == "pandas" and isinstance(pred_cols, str) and y_pred is not None and isinstance(y_pred, pd.DataFrame) ): y_pred = y_pred.iloc[:, 0] # index handling if with_index: idx = sub.index.to_numpy() if return_as == "numpy" else sub.index if y_true is not None and y_pred is not None: return idx, y_true, y_pred if y_true is not None: return idx, y_true return idx, y_pred # type: ignore[return-value] # standard returns if y_true is not None and y_pred is not None: return y_true, y_pred if y_true is not None: return y_true return y_pred # type: ignore[return-value]
get_forecast_arrays.__doc__ = r""" Extract true and/or predicted values from a DataFrame. This is a flexible bridge between a DataFrame-centric workflow and NumPy-based utilities. It supports dropping or filling NAs, numeric coercion, and optional index return, providing a robust way to prepare data for analysis. Parameters ---------- df : pd.DataFrame The source DataFrame. actual_col : str, optional The name of the column holding the ground-truth values. pred_cols : str or list of str, optional The name(s) of the prediction column(s). A string implies a single point forecast; a list implies multiple columns, such as for quantile forecasts. drop_na : bool, default=True If ``True``, drop rows with missing data according to the ``na_policy``. na_policy : {"any", "all", "none"}, default="any" The policy for dropping rows with NA values: - "any": Drop rows if any selected column has an NA. - "all": Drop rows only if all selected columns are NA. - "none": Do not drop rows based on NAs. fillna : scalar, dict or {"ffill", "bfill"}, optional A value or method to use for filling NA values before any dropping occurs. return_as : {"numpy", "pandas"}, default="numpy" The desired container type for the output. squeeze : bool, default=True If ``True`` and a single prediction column is requested, the output will be squeezed to a 1D array or Series. with_index : bool, default=False If ``True``, the DataFrame index is returned as the first item in the output tuple. sort_index : bool, default=False If ``True``, the DataFrame is sorted by its index before extracting the data. dtype : object, optional The target data type for the output arrays or Series. ensure_numeric : bool, default=False If ``True``, raises an error if any selected column is not of a numeric data type. coerce_numeric : bool, default=False If ``True`` and ``ensure_numeric=True``, attempts to convert non-numeric columns to a numeric type, with invalid parsing resulting in NaN. copy : bool, default=True If ``True``, operates on a copy of the data, ensuring the original DataFrame is not modified. Returns ------- np.ndarray, pd.Series, pd.DataFrame, or tuple The return type depends on the input parameters: - If only ``actual_col`` is provided -> y_true - If only ``pred_cols`` is provided -> y_pred(s) - If both are provided -> (y_true, y_pred(s)) If ``with_index=True``, the index is prepended to the return value(s). See Also -------- compute_forecast_errors : A utility that uses this function's output. compute_pit : Another utility that benefits from this data extraction. Notes ----- This function is designed to be the primary entry point for extracting data before passing it to the mathematical or plotting functions in `k-diagram`. It provides a single, consistent interface for handling various data cleaning and formatting tasks. For ``return_as="numpy"`` with a single prediction column, the output is a 1D array by default. To preserve the 2D column vector shape ``(n, 1)``, set ``squeeze=False``. Examples -------- >>> import pandas as pd >>> import numpy as np >>> from kdiagram.utils.mathext import get_forecast_arrays >>> >>> df = pd.DataFrame({ ... 'actual': [10, 20, 30, 40, np.nan], ... 'pred_point': [12, 18, 33, 42, 48], ... 'q10': [8, 15, 25, 35, 45], ... 'q90': [12, 25, 35, 45, 55], ... }) >>> >>> # Example 1: Get both true and quantile predictions as NumPy arrays >>> y_true, y_preds_q = get_forecast_arrays( ... df, actual_col='actual', pred_cols=['q10', 'q90'] ... ) >>> print("--- True Values (NumPy) ---") >>> print(y_true) >>> print("\\n--- Quantile Predictions (NumPy) ---") >>> print(y_preds_q) .. code-block:: text :caption: Expected Output for Example 1 --- True Values (NumPy) --- [10. 20. 30. 40.] --- Quantile Predictions (NumPy) --- [[ 8 12] [15 25] [25 35] [35 45]] >>> # Example 2: Get a single prediction as a pandas Series >>> y_preds_series = get_forecast_arrays( ... df, pred_cols='pred_point', return_as='pandas', drop_na=False ... ) >>> print("\\n--- Point Predictions (pandas Series) ---") >>> print(y_preds_series) .. code-block:: text :caption: Expected Output for Example 2 --- Point Predictions (pandas Series) --- 0 12 1 18 2 33 3 42 4 48 Name: pred_point, dtype: int64 """
[docs] def compute_pit( y_true: np.ndarray, y_preds_quantiles: np.ndarray, quantiles: np.ndarray, ) -> np.ndarray: """ Computes the Probability Integral Transform (PIT) for each observation. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_preds_quantiles : np.ndarray 2D array of quantile forecasts, with shape (n_samples, n_quantiles). quantiles : np.ndarray 1D array of the quantile levels. Returns ------- np.ndarray A 1D array of PIT values, one for each observation. """ y_true, y_preds_quantiles = validate_yy( y_true, y_preds_quantiles, allow_2d_pred=True ) # Sort quantiles and predictions to ensure correct calculation sort_idx = np.argsort(quantiles) sorted_preds = y_preds_quantiles[:, sort_idx] # PIT is the fraction of forecast quantiles <= the true value pit_values = np.mean(sorted_preds <= y_true[:, np.newaxis], axis=1) return pit_values
compute_pit.__doc__ = r""" Computes the Probability Integral Transform (PIT) for each observation. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_preds_quantiles : np.ndarray 2D array of quantile forecasts, with shape ``(n_samples, n_quantiles)``. quantiles : np.ndarray 1D array of the quantile levels. Returns ------- np.ndarray A 1D array of PIT values, one for each observation. See Also -------- plot_pit_histogram : A visualization of these PIT values. calculate_calibration_error : A summary score based on PIT values. Notes ----- The Probability Integral Transform (PIT) is a fundamental tool for evaluating the calibration of probabilistic forecasts :footcite:p:`Gneiting2007b`. When the predictive distribution is represented by a finite set of :math:`M` quantiles, the PIT value for each observation :math:`y_i` is approximated as the fraction of forecast quantiles that are less than or equal to the observation: .. math:: \text{PIT}_i = \frac{1}{M} \sum_{j=1}^{M} \mathbf{1}\{q_{i,j} \le y_i\} where :math:`q_{i,j}` is the :math:`j`-th quantile forecast for observation :math:`i`, and :math:`\mathbf{1}` is the indicator function. A uniform distribution of PIT values indicates perfect calibration. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import compute_pit >>> >>> # Define true values and quantile forecasts for 3 observations >>> y_true = np.array([10, 1, 5.5]) >>> quantiles = np.array([0.1, 0.5, 0.9]) >>> y_preds = np.array([ ... [8, 11, 13], # Forecast for y_true = 10 ... [0, 0.5, 2], # Forecast for y_true = 1 ... [4, 5, 6] # Forecast for y_true = 5.5 ... ]) >>> >>> # Calculate the PIT value for each observation >>> pit_values = compute_pit(y_true, y_preds, quantiles) >>> print(pit_values) .. code-block:: text :caption: Expected Output [0.33333333 0.66666667 0.66666667] References ---------- .. footbibliography:: """
[docs] def compute_crps( y_true: np.ndarray, y_preds_quantiles: np.ndarray, quantiles: np.ndarray, ) -> float: y_true, y_preds_quantiles = validate_yy( y_true, y_preds_quantiles, allow_2d_pred=True ) # Reshape y_true for broadcasting y_true_reshaped = y_true[:, np.newaxis] # Calculate Pinball Loss for all quantiles at once pinball_losses = np.where( y_true_reshaped >= y_preds_quantiles, (y_true_reshaped - y_preds_quantiles) * quantiles, (y_preds_quantiles - y_true_reshaped) * (1 - quantiles), ) # Average over quantiles for each observation, then over all observations return np.mean(np.mean(pinball_losses, axis=1))
compute_crps.__doc__ = r""" Approximates the Continuous Ranked Probability Score (CRPS). The CRPS is calculated as the average of the Pinball Loss across all provided quantiles. It is a proper scoring rule that assesses both calibration and sharpness simultaneously. A lower score is better. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_preds_quantiles : np.ndarray 2D array of quantile forecasts. quantiles : np.ndarray 1D array of the quantile levels. Returns ------- float The average CRPS over all observations. See Also -------- compute_pinball_loss : The underlying metric for a single quantile. plot_crps_comparison : A visualization of this score. Notes ----- The Continuous Ranked Probability Score (CRPS) is a widely used metric for evaluating probabilistic forecasts :footcite:p:`Gneiting2007b`. It is approximated here by averaging the Pinball Loss :math:`\mathcal{L}_{\tau}` over all :math:`M` provided quantile levels :math:`\tau`. The Pinball Loss for a single quantile forecast :math:`q` at level :math:`\tau` is: .. math:: :label: eq:pinball_loss_util \mathcal{L}_{\tau}(q, y) = \begin{cases} (y - q) \tau & \text{if } y \ge q \\ (q - y) (1 - \tau) & \text{if } y < q \end{cases} The final score is the average over all observations and all quantiles. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import compute_crps >>> >>> # Define true values and quantile forecasts for 2 observations >>> y_true = np.array([10, 25]) >>> quantiles = np.array([0.1, 0.5, 0.9]) >>> y_preds = np.array([ ... [8, 11, 13], # Forecast for y_true = 10 ... [20, 22, 26] # Forecast for y_true = 25 ... ]) >>> >>> # Calculate the average CRPS >>> crps_score = compute_crps(y_true, y_preds, quantiles) >>> print(f"Average CRPS: {crps_score:.3f}") .. code-block:: text :caption: Expected Output Average CRPS: 1.467 References ---------- .. footbibliography:: """
[docs] def calculate_calibration_error( y_true: np.ndarray, y_preds_quantiles: np.ndarray, quantiles: np.ndarray, ) -> float: # Validate inputs y_true, y_preds_quantiles = validate_yy( y_true, y_preds_quantiles, allow_2d_pred=True ) # Calculate PIT values sort_idx = np.argsort(quantiles) sorted_preds = y_preds_quantiles[:, sort_idx] pit_values = np.mean(sorted_preds <= y_true[:, np.newaxis], axis=1) if len(pit_values) < 2: return 1.0 # Max penalty for insufficient data to test # Compare the empirical distribution # of PIT values to a uniform distribution ks_statistic, _ = kstest(pit_values, uniform.cdf) return ks_statistic
calculate_calibration_error.__doc__ = r""" Calculates the calibration error using the PIT and KS test. This function quantifies the **calibration** (or reliability) of a probabilistic forecast. It first computes the Probability Integral Transform (PIT) values for all observations and then uses the Kolmogorov-Smirnov (KS) test to measure how much the distribution of these PIT values deviates from a perfect uniform distribution. Parameters ---------- y_true : np.ndarray 1D array of observed (true) values. y_preds_quantiles : np.ndarray 2D array of quantile forecasts, with shape ``(n_samples, n_quantiles)``. quantiles : np.ndarray 1D array of the quantile levels corresponding to the columns of ``y_preds_quantiles``. Returns ------- float The Kolmogorov-Smirnov (KS) statistic, a value in [0, 1]. A score of 0 indicates perfect calibration (PIT values are perfectly uniform), while a score of 1 indicates the worst possible calibration. See Also -------- compute_pit : The utility for calculating PIT values. plot_pit_histogram : The visual equivalent of this test. plot_calibration_sharpness : A plot that uses this metric as an axis. scipy.stats.kstest : The underlying statistical test used. Notes ----- This function follows a two-step process: 1. **Calculate PIT Values**: It first computes the Probability Integral Transform (PIT) values. For a forecast given by :math:`M` quantiles, the PIT for a single observation :math:`y_i` is the fraction of predicted quantiles that are less than or equal to :math:`y_i`. .. math:: \text{PIT}_i = \frac{1}{M} \sum_{j=1}^{M} \mathbf{1}\{q_{i,j} \le y_i\} 2. **Kolmogorov-Smirnov Test**: For a perfectly calibrated forecast, the resulting PIT values should be uniformly distributed on [0, 1]. This function uses the KS test (`scipy.stats.kstest`) to measure the maximum distance between the empirical CDF of the calculated PIT values and the CDF of a perfect uniform distribution. This KS statistic is returned as the calibration error score. If fewer than 2 data points are available after validation, the function returns a maximum error of 1.0. Examples -------- >>> import numpy as np >>> from scipy.stats import norm >>> from kdiagram.utils.mathext import calculate_calibration_error >>> >>> np.random.seed(42) >>> n_samples = 500 >>> y_true = np.random.normal(loc=10, scale=3, size=n_samples) >>> quantiles = np.linspace(0.05, 0.95, 19) >>> >>> # Well-calibrated forecast >>> preds_good = norm.ppf(quantiles, loc=y_true[:, np.newaxis], scale=3) >>> # Biased (miscalibrated) forecast >>> preds_bad = norm.ppf(quantiles, loc=y_true[:, np.newaxis] + 2, scale=3) >>> >>> err_good = calculate_calibration_error(y_true, preds_good, quantiles) >>> err_bad = calculate_calibration_error(y_true, preds_bad, quantiles) >>> >>> print(f"Good Model Calibration Error (KS): {err_good:.3f}") Good Model Calibration Error (KS): 0.034 >>> print(f"Bad Model Calibration Error (KS): {err_bad:.3f}") Bad Model Calibration Error (KS): 0.284 References ---------- .. footbibliography:: """
[docs] def build_cdf_interpolator( y_preds_quantiles: np.ndarray, quantiles: np.ndarray, ) -> Callable[[np.ndarray], np.ndarray]: # Sort quantiles and predictions to ensure correct interpolation sort_idx = np.argsort(quantiles) sorted_quantiles = quantiles[sort_idx] sorted_preds = np.asarray(y_preds_quantiles)[:, sort_idx] n_samples = sorted_preds.shape[0] def _interpolator(y_true: np.ndarray) -> np.ndarray: """The returned CDF interpolator function.""" y_true = np.asarray(y_true) if len(y_true) != n_samples: raise ValueError( f"The number of true values ({len(y_true)}) must match the " f"number of forecast distributions the interpolator was " f"built with ({n_samples})." ) pit_values = np.zeros_like(y_true, dtype=float) for i in range(len(y_true)): # Use np.interp for robust linear interpolation pit_values[i] = np.interp( y_true[i], sorted_preds[i, :], sorted_quantiles, left=0.0, # Values below the lowest quantile get p=0 right=1.0, # Values above the highest quantile get p=1 ) return pit_values return _interpolator
build_cdf_interpolator.__doc__ = r""" Builds an interpolator to act as a Cumulative Distribution Function. This function takes a set of quantile forecasts and returns a callable function that linearly interpolates between them. This effectively creates an empirical, continuous Cumulative Distribution Function (CDF) for each individual forecast, which is a foundational tool for probabilistic analysis. Parameters ---------- y_preds_quantiles : np.ndarray 2D array of quantile forecasts, with shape ``(n_samples, n_quantiles)``. Each row represents a complete probabilistic forecast for a single observation. quantiles : np.ndarray 1D array of the quantile levels corresponding to the columns of the prediction array (e.g., ``[0.05, 0.1, ..., 0.95]``). Returns ------- Callable[[np.ndarray], np.ndarray] A function that takes a 1D array of observed values (``y_true``) and returns the corresponding PIT values, which are the CDF evaluated at each of those points. Raises ------ ValueError If the number of `y_true` values passed to the returned interpolator does not match the number of forecast distributions it was built with. See Also -------- compute_pit : A simplified utility that uses this logic directly. scipy.interpolate.interp1d : The underlying concept for interpolation. Notes ----- The Probability Integral Transform (PIT) is a key concept in probabilistic forecast evaluation :footcite:p:`Gneiting2007b`. For a continuous predictive CDF :math:`F`, the PIT of an observation :math:`y` is :math:`F(y)`. This utility constructs an empirical approximation of :math:`F` for each forecast. The function works by creating a closure: the returned ``_interpolator`` function "remembers" the quantile forecasts it was built with. For each observation :math:`y_i`, it performs a linear interpolation using the corresponding forecast quantiles :math:`\mathbf{q}_i = (q_{i,1}, ..., q_{i,M})` as the x-coordinates and the quantile levels :math:`\mathbf{\tau} = (\tau_1, ..., \tau_M)` as the y-coordinates. This allows you to estimate the cumulative probability for any value of :math:`y_i`. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import build_cdf_interpolator >>> >>> # Forecasts for 3 observations at 3 quantiles (0.1, 0.5, 0.9) >>> preds_quantiles = np.array([ ... [8, 10, 12], ... [0, 1, 2], ... [4, 5, 6] ... ]) >>> quantiles = np.array([0.1, 0.5, 0.9]) >>> >>> # Build the interpolator >>> cdf_func = build_cdf_interpolator(preds_quantiles, quantiles) >>> >>> # Now, use the interpolator to find the PIT for 3 observations >>> y_true = np.array([10.0, 0.5, 5.5]) >>> pit_values = cdf_func(y_true) >>> print(pit_values) [0.5 0.3 0.7] References ---------- .. footbibliography:: """
[docs] def compute_coverage_score( y_true: np.ndarray, y_pred_lower: np.ndarray, y_pred_upper: np.ndarray, *, method: Literal["within", "above", "below"] = "within", return_counts: bool = False, ) -> float | int: # Validate and convert inputs y_true, y_pred_lower = validate_yy(y_true, y_pred_lower) _, y_pred_upper = validate_yy(y_true, y_pred_upper) # Handle NaNs by creating a mask of valid (non-NaN) entries valid_mask = ( ~np.isnan(y_true) & ~np.isnan(y_pred_lower) & ~np.isnan(y_pred_upper) ) y_true_valid = y_true[valid_mask] lower_valid = y_pred_lower[valid_mask] upper_valid = y_pred_upper[valid_mask] n_valid = len(y_true_valid) if n_valid == 0: return 0.0 if not return_counts else 0 if method == "within": count = np.sum( (y_true_valid >= lower_valid) & (y_true_valid <= upper_valid) ) elif method == "above": count = np.sum(y_true_valid > upper_valid) elif method == "below": count = np.sum(y_true_valid < lower_valid) else: raise ValueError( f"Invalid method '{method}'. Choose from" " 'within', 'above', or 'below'." ) if return_counts: return int(count) return float(count / n_valid)
compute_coverage_score.__doc__ = r""" Computes the coverage score for a given prediction interval. This utility calculates the fraction (or count) of true values that fall within, above, or below the specified prediction interval. It is a fundamental metric for assessing the calibration of a forecast's uncertainty bounds. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_pred_lower : np.ndarray 1D array of the lower bound of the prediction interval. y_pred_upper : np.ndarray 1D array of the upper bound of the prediction interval. method : {'within', 'above', 'below'}, default='within' The type of coverage to calculate: - 'within': The standard coverage score. Calculates the proportion of true values such that `lower <= true <= upper`. - 'above': Calculates the proportion of true values that are strictly *above* the upper bound (`true > upper`). - 'below': Calculates the proportion of true values that are strictly *below* the lower bound (`true < lower`). return_counts : bool, default=False If ``True``, returns the raw count of observations matching the condition instead of the proportion (a float between 0 and 1). Returns ------- float or int The coverage score as a proportion or a raw count. See Also -------- plot_coverage : A visualization of this score. compute_winkler_score : A score that penalizes for lack of coverage. Notes ----- The empirical coverage is a key diagnostic for checking if a model's prediction intervals are well-calibrated. For a given :math:`(1-\alpha) \cdot 100\%` prediction interval, the empirical coverage should be close to :math:`1-\alpha`. For the standard 'within' method, the coverage for a set of :math:`N` observations is calculated as: .. math:: :label: eq:coverage_score \text{Coverage} = \frac{1}{N} \sum_{i=1}^{N} \mathbf{1}\{y_{lower,i} \le y_{true,i} \le y_{upper,i}\} where :math:`\mathbf{1}` is the indicator function. The 'above' and 'below' methods are useful for diagnosing the direction of miscalibration. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import compute_coverage_score >>> >>> y_true = np.array([1, 2, 3, 4, 5, 6]) >>> y_lower = np.array([0, 3, 2, 5, 4, 7]) >>> y_upper = np.array([2, 4, 4, 6, 6, 8]) >>> >>> # Calculate the standard coverage (4 out of 6 are within) >>> coverage = compute_coverage_score(y_true, y_lower, y_upper) >>> print(f"Coverage Score: {coverage:.2f}") .. code-block:: text :caption: Expected Output Coverage Score: 0.67 >>> # Calculate the number of points below the interval >>> count_below = compute_coverage_score( ... y_true, y_lower, y_upper, method='below', return_counts=True ... ) >>> print(f"Count below interval: {count_below}") .. code-block:: text :caption: Expected Output Count below interval: 2 """
[docs] def compute_pinball_loss( y_true: np.ndarray, y_pred_quantile: np.ndarray, quantile: float, ) -> float: # Validate and handle NaNs y_true, y_pred_quantile = validate_yy(y_true, y_pred_quantile) valid_mask = ~np.isnan(y_true) & ~np.isnan(y_pred_quantile) y_true_valid = y_true[valid_mask] y_pred_valid = y_pred_quantile[valid_mask] if len(y_true_valid) == 0: return np.nan if not (0 < quantile < 1): raise ValueError("Quantile level must be between 0 and 1.") # Calculate Pinball Loss loss = np.where( y_true_valid >= y_pred_valid, (y_true_valid - y_pred_valid) * quantile, (y_pred_valid - y_true_valid) * (1 - quantile), ) return np.mean(loss)
compute_pinball_loss.__doc__ = r""" Computes the Pinball Loss for a single quantile forecast. The Pinball Loss is a metric used to evaluate the accuracy of a specific quantile forecast. It is the foundational component of the Continuous Ranked Probability Score (CRPS). A lower score is better. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_pred_quantile : np.ndarray 1D array of the predicted values for a single quantile. quantile : float The quantile level (must be between 0 and 1) for which the predictions were made. Returns ------- float The average Pinball Loss over all observations. See Also -------- compute_crps : A score calculated by averaging the pinball loss. plot_crps_comparison : A visualization of the CRPS. Notes ----- The Pinball Loss, :math:`\mathcal{L}_{\tau}`, is a proper scoring rule for evaluating a single quantile forecast :math:`q` at level :math:`\tau` against an observation :math:`y`. It asymmetrically penalizes errors, giving a weight of :math:`\tau` to under-predictions and :math:`(1 - \tau)` to over-predictions. .. math:: \mathcal{L}_{\tau}(q, y) = \begin{cases} (y - q) \tau & \text{if } y \ge q \\ (q - y) (1 - \tau) & \text{if } y < q \end{cases} This function calculates the average of this loss over all provided observations. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import compute_pinball_loss >>> >>> y_true = np.array([10, 10, 5]) >>> y_pred_q90 = np.array([8, 12, 5]) # Under-predict, over-predict, exact >>> quantile = 0.9 >>> >>> # Loss for y=10, q=8: (10-8) * 0.9 = 1.8 >>> # Loss for y=10, q=12: (12-10) * (1-0.9) = 0.2 >>> # Loss for y=5, q=5: (5-5) * 0.9 = 0.0 >>> # Average = (1.8 + 0.2 + 0.0) / 3 = 0.667 >>> >>> loss = compute_pinball_loss(y_true, y_pred_q90, quantile) >>> print(f"Average Pinball Loss for Q90: {loss:.3f}") .. code-block:: text :caption: Expected Output Average Pinball Loss for Q90: 0.667 References ---------- .. footbibliography:: """
[docs] def compute_winkler_score( y_true: np.ndarray, y_pred_lower: np.ndarray, y_pred_upper: np.ndarray, alpha: float = 0.1, ) -> float: # Validate and handle NaNs y_true, y_pred_lower = validate_yy(y_true, y_pred_lower) _, y_pred_upper = validate_yy(y_true, y_pred_upper) valid_mask = ( ~np.isnan(y_true) & ~np.isnan(y_pred_lower) & ~np.isnan(y_pred_upper) ) y_true_valid = y_true[valid_mask] lower_valid = y_pred_lower[valid_mask] upper_valid = y_pred_upper[valid_mask] if len(y_true_valid) == 0: return np.nan # Calculate interval width (sharpness) interval_width = upper_valid - lower_valid # Calculate penalties for observations outside the interval penalty_lower = (2 / alpha) * (lower_valid - y_true_valid) penalty_upper = (2 / alpha) * (y_true_valid - upper_valid) # The score is the width plus any applicable penalty scores = ( interval_width + np.where(y_true_valid < lower_valid, penalty_lower, 0) + np.where(y_true_valid > upper_valid, penalty_upper, 0) ) return np.mean(scores)
compute_winkler_score.__doc__ = r""" Computes the Winkler score for a given prediction interval. The Winkler score is a proper scoring rule that evaluates a prediction interval by combining its width (sharpness) with a penalty for observations that fall outside the interval. A lower score indicates a better forecast. Parameters ---------- y_true : np.ndarray 1D array of the true observed values. y_pred_lower : np.ndarray 1D array of the lower bound of the prediction interval. y_pred_upper : np.ndarray 1D array of the upper bound of the prediction interval. alpha : float, default=0.1 The significance level for the prediction interval. For example, alpha=0.1 corresponds to a (1-0.1)*100 = 90% prediction interval. Returns ------- float The average Winkler score over all observations. See Also -------- compute_coverage_score : A metric that only assesses coverage. compute_interval_width : A metric that only assesses sharpness. Notes ----- The Winkler score :footcite:p:`Gneiting2007b` is designed to evaluate both the **sharpness** and **calibration** of a prediction interval simultaneously. The score for a single observation :math:`y` and a :math:`(1-\alpha)` prediction interval :math:`[l, u]` is defined as: .. math:: S_{\alpha}(l, u, y) = (u - l) + \begin{cases} \frac{2}{\alpha}(l - y) & \text{if } y < l \\ 0 & \text{if } l \le y \le u \\ \frac{2}{\alpha}(y - u) & \text{if } y > u \end{cases} The first term, :math:`(u - l)`, is the interval width, which rewards sharpness (narrower intervals). The second term is a penalty that is applied only if the observation falls outside the interval. The penalty increases as the observation gets further from the violated bound. This function returns the average of this score over all observations. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import compute_winkler_score >>> >>> y_true = np.array([1, 5, 12]) >>> y_lower = np.array([2, 4, 8]) >>> y_upper = np.array([8, 6, 10]) >>> >>> # For a 90% interval (alpha=0.1) >>> # Obs 1 (y=1): outside. Width=6. Penalty=(2/0.1)*(2-1)=20. Score=26. >>> # Obs 2 (y=5): inside. Width=2. Penalty=0. Score=2. >>> # Obs 3 (y=12): outside. Width=2. Penalty=(2/0.1)*(12-10)=40. Score=42. >>> # Average = (26 + 2 + 42) / 3 = 23.33 >>> >>> score = compute_winkler_score( ... y_true, y_lower, y_upper, alpha=0.1 ... ) >>> print(f"Average Winkler Score: {score:.2f}") .. code-block:: text :caption: Expected Output Average Winkler Score: 23.33 References ---------- .. footbibliography:: """ @overload def minmax_scaler( X: FrameLike, y: None = ..., feature_range: tuple[float, float] = ..., eps: float = ..., ) -> np.ndarray: ... @overload def minmax_scaler( X: FrameLike, y: FrameLike, feature_range: tuple[float, float] = ..., eps: float = ..., ) -> tuple[np.ndarray, np.ndarray]: ...
[docs] def minmax_scaler( X: FrameLike, y: FrameLike | None = None, feature_range: tuple[float, float] = (0.0, 1.0), eps: float = 1e-8, ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: def _to_array(obj: FrameLike) -> np.ndarray: if isinstance(obj, (pd.DataFrame, pd.Series)): return obj.to_numpy() return np.asarray(obj) X_arr = _to_array(X) X_shape = X_arr.shape if X_arr.ndim == 1: X_arr = X_arr.reshape(-1, 1) # validate range feature_range = validate_length_range( # type: ignore feature_range, param_name="Feature range" ) min_val, max_val = feature_range if min_val >= max_val: raise ValueError("feature_range must be (min, max) with min < max.") # min-max per column X_min = X_arr.min(axis=0, keepdims=True) X_max = X_arr.max(axis=0, keepdims=True) num = X_arr - X_min denom = (X_max - X_min) + eps X_scaled = min_val + (max_val - min_val) * (num / denom) # restore 1D if original was 1D/column vector if ( (len(X_shape) == 1) or (X_arr.ndim == 1) or (X_arr.ndim > 1 and X_shape[1] == 1) ): X_scaled = X_scaled.ravel() if y is not None: y_arr = _to_array(y).astype(float) y_min = np.min(y_arr) y_max = np.max(y_arr) y_num = y_arr - y_min y_denom = (y_max - y_min) + eps y_scaled = min_val + (max_val - min_val) * (y_num / y_denom) return X_scaled, y_scaled return X_scaled
minmax_scaler.__doc__ = r""" Scale features to a specified range using a Min-Max approach. This function transforms features by scaling each feature to a given range, typically [0, 1]. This method is robust to features with zero variance by adding a small epsilon to the denominator to prevent division-by-zero errors. Parameters ---------- X : {numpy.ndarray, pandas.DataFrame, pandas.Series} The input data to scale. Can be a 1D array or a 2D matrix of features. y : {numpy.ndarray, pandas.DataFrame, pandas.Series}, optional Optional target values to scale using the same approach. If provided, it is scaled independently of ``X``. feature_range : tuple of (float, float), default=(0.0, 1.0) The desired range of the transformed data. eps : float, default=1e-8 A small constant added to the denominator to ensure numerical stability when a feature has zero variance. Returns ------- X_scaled : numpy.ndarray The transformed version of ``X``, with each feature scaled to the specified ``feature_range``. y_scaled : numpy.ndarray, optional The scaled version of ``y``, returned only if ``y`` is provided. See Also -------- sklearn.preprocessing.MinMaxScaler : The scikit-learn equivalent. Notes ----- The Min-Max scaling is a common preprocessing step for many machine learning algorithms that are sensitive to the magnitude of features. For each feature (column) in the input data :math:`\mathbf{X}`, the transformation is calculated as: .. math:: X_{\text{scaled}} = \text{min}_{\text{range}} + (\text{max}_{\text{range}} - \text{min}_{\text{range}}) \cdot \frac{\mathbf{X} - \min(\mathbf{X})} {(\max(\mathbf{X}) - \min(\mathbf{X})) + \varepsilon} where :math:`\text{min}_{\text{range}}` and :math:`\text{max}_{\text{range}}` are the bounds of the ``feature_range``, and :math:`\varepsilon` is a small epsilon to prevent division by zero. Examples -------- >>> import numpy as np >>> from kdiagram.utils.mathext import minmax_scaler >>> >>> # Scale a 2D array >>> X = np.array([[1, 10], [2, 20], [3, 30]]) >>> X_scaled = minmax_scaler(X) >>> print(X_scaled) [[0. 0. ] [0.5 0.5] [1. 1. ]] >>> # Scale to a different range >>> X_scaled_custom = minmax_scaler(X, feature_range=(-1, 1)) >>> print(X_scaled_custom) [[-1. -1.] [ 0. 0.] [ 1. 1.]] """