Source code for kdiagram.utils.forecast_utils

#   License: Apache 2.0
#   Author: LKouadio <etanoyau@gmail.com>

"""Forecast utilites"""

from __future__ import annotations

from typing import Literal

import numpy as np
import pandas as pd

from ..decorators import isdf
from .handlers import columns_manager
from .validator import exist_features, validate_yy

__all__ = [
    "calculate_probabilistic_scores",
    "pivot_forecasts_long",
    "compute_interval_width",
    "bin_by_feature",
    "compute_forecast_errors",
]


[docs] def calculate_probabilistic_scores( y_true: np.ndarray, y_preds_quantiles: np.ndarray, quantiles: np.ndarray, ) -> pd.DataFrame: y_true, y_preds_quantiles = validate_yy( y_true, y_preds_quantiles, allow_2d_pred=True ) # --- PIT Calculation --- sort_idx = np.argsort(quantiles) sorted_preds = y_preds_quantiles[:, sort_idx] pit_values = np.mean(sorted_preds <= y_true[:, np.newaxis], axis=1) # --- Sharpness Calculation --- lower_bound = y_preds_quantiles[:, np.argmin(quantiles)] upper_bound = y_preds_quantiles[:, np.argmax(quantiles)] sharpness = upper_bound - lower_bound # --- CRPS Calculation (approximated via pinball loss) --- pinball_loss = np.where( y_true[:, np.newaxis] >= y_preds_quantiles, (y_true[:, np.newaxis] - y_preds_quantiles) * quantiles, (y_preds_quantiles - y_true[:, np.newaxis]) * (1 - quantiles), ) crps = np.mean(pinball_loss, axis=1) return pd.DataFrame( {"pit_value": pit_values, "sharpness": sharpness, "crps": crps} )
calculate_probabilistic_scores.__doc__ = r""" Calculates probabilistic scores for each observation. Computes the Probability Integral Transform (PIT), sharpness (interval width), and Continuous Ranked Probability Score (CRPS) for each forecast-observation pair. This utility provides a per-observation breakdown of key probabilistic metrics. Parameters ---------- y_true : np.ndarray 1D array of observed (true) values. y_preds_quantiles : np.ndarray 2D array of quantile forecasts. Each row corresponds to an observation in ``y_true``, and each column is a specific quantile forecast. quantiles : np.ndarray 1D array of the quantile levels corresponding to the columns of ``y_preds_quantiles``. Returns ------- pd.DataFrame A DataFrame with columns 'pit_value', 'sharpness', and 'crps', where each row corresponds to an observation. See Also -------- compute_pit : Calculate only the PIT values. compute_crps : Calculate only the average CRPS score. compute_winkler_score : Score a single prediction interval. Notes ----- This function calculates three fundamental scores for assessing the quality of a probabilistic forecast, which is judged by the joint properties of calibration and sharpness :footcite:p:`Gneiting2007b`. 1. **Probability Integral Transform (PIT)**: This score assesses **calibration**. For each observation :math:`y_i`, the PIT is approximated as the fraction of forecast quantiles less than or equal to the observation. .. math:: :label: eq:pit_score \text{PIT}_i = \frac{1}{M} \sum_{j=1}^{M} \mathbf{1}\{q_{i,j} \le y_i\} 2. **Sharpness**: This score assesses **precision**. It is the width of the prediction interval between the lowest (:math:`q_{min}`) and highest (:math:`q_{max}`) provided quantiles for each observation :math:`i`. .. math:: :label: eq:sharpness_score_ind \text{Sharpness}_i = y_{i, q_{max}} - y_{i, q_{min}} 3. **Continuous Ranked Probability Score (CRPS)**: This is an overall score that rewards both calibration and sharpness. It is approximated as the average of the **Pinball Loss** across all :math:`M` quantiles for each observation :math:`i`. .. math:: :label: eq:crps_score_ind \text{CRPS}_i \approx \frac{1}{M} \sum_{j=1}^{M} 2 \mathcal{L}_{\tau_j}(q_{i,j}, y_i) Examples -------- >>> import numpy as np >>> from scipy.stats import norm >>> from kdiagram.utils.forecast_utils import calculate_probabilistic_scores >>> >>> # Generate synthetic data >>> np.random.seed(42) >>> n_samples = 5 >>> y_true = np.random.normal(loc=10, scale=2, size=n_samples) >>> quantiles = np.array([0.1, 0.5, 0.9]) >>> y_preds = norm.ppf( ... quantiles, loc=y_true[:, np.newaxis], scale=1.5 ... ) >>> >>> # Calculate the scores >>> scores_df = calculate_probabilistic_scores( ... y_true, y_preds, quantiles ... ) >>> print(scores_df) pit_value sharpness crps 0 0.666667 3.844655 0.865381 1 0.333333 3.844655 0.892013 2 0.666667 3.844655 1.269438 3 0.666667 3.844655 0.472782 4 0.333333 3.844655 1.171358 References ---------- .. footbibliography:: """
[docs] @isdf def pivot_forecasts_long( df: pd.DataFrame, qlow_cols: list[str], q50_cols: list[str], qup_cols: list[str], horizon_labels: list[str] | None = None, id_vars: str | list[str] | None = None, ) -> pd.DataFrame: if not (len(qlow_cols) == len(q50_cols) == len(qup_cols)): raise ValueError("Quantile column lists must have the same length.") if not horizon_labels: horizon_labels = [f"H{i + 1}" for i in range(len(qlow_cols))] if len(horizon_labels) != len(qlow_cols): raise ValueError( "Length of horizon_labels must match" " the number of quantile columns." ) id_vars = columns_manager(id_vars) or [] # Create temporary mapping dataframes for melting df_long_list = [] for i, label in enumerate(horizon_labels): temp_df = df[ id_vars + [qlow_cols[i], q50_cols[i], qup_cols[i]] ].copy() temp_df["horizon"] = label temp_df.rename( columns={ qlow_cols[i]: "q_low", q50_cols[i]: "q_median", qup_cols[i]: "q_high", }, inplace=True, ) df_long_list.append(temp_df) return pd.concat(df_long_list, ignore_index=True)
pivot_forecasts_long.__doc__ = r""" Reshapes multi-horizon forecast data from wide to long format. This is a powerful data wrangling utility that transforms a DataFrame with separate columns for each horizon's quantiles (e.g., 'q10_2023', 'q50_2023', 'q10_2024', 'q50_2024') into a "long" format DataFrame. The resulting table has dedicated columns for 'horizon', 'q_low', 'q_median', and 'q_high', which is often a more convenient structure for plotting and analysis. Parameters ---------- df : pd.DataFrame The input DataFrame in wide format. qlow_cols : list of str List of column names for the lower quantile, one for each forecast horizon, in order. q50_cols : list of str List of column names for the median quantile, in the same horizon order. qup_cols : list of str List of column names for the upper quantile, in the same horizon order. horizon_labels : list of str, optional Custom labels for each forecast horizon. If not provided, generic labels like 'H1', 'H2' will be generated. The length must match the number of quantile columns. id_vars : str or list of str, optional Identifier column(s) to keep in the long-format DataFrame (e.g., a location or sample ID). These columns will be repeated for each horizon. Returns ------- pd.DataFrame The reshaped DataFrame in long format. Raises ------ ValueError If the lengths of the quantile column lists or the ``horizon_labels`` are inconsistent. See Also -------- pandas.melt : The underlying pandas function for unpivoting. plot_horizon_metrics : A plot that benefits from this data format. Examples -------- >>> import pandas as pd >>> from kdiagram.utils.forecast_utils import pivot_forecasts_long >>> >>> # Create a sample wide-format DataFrame >>> df_wide = pd.DataFrame({ ... 'location_id': ['A', 'B'], ... 'q10_2023': [10, 12], 'q50_2023': [15, 18], 'q90_2023': [20, 24], ... 'q10_2024': [12, 14], 'q50_2024': [18, 21], 'q90_2024': [24, 28], ... }) >>> >>> # Reshape the data >>> df_long = pivot_forecasts_long( ... df_wide, ... qlow_cols=['q10_2023', 'q10_2024'], ... q50_cols=['q50_2023', 'q50_2024'], ... qup_cols=['q90_2023', 'q90_2024'], ... horizon_labels=['Year 2023', 'Year 2024'], ... id_vars='location_id' ... ) >>> print(df_long) location_id horizon q_low q_median q_high 0 A Year 2023 10 15 20 1 B Year 2023 12 18 24 2 A Year 2024 12 18 24 3 B Year 2024 14 21 28 """
[docs] @isdf def compute_interval_width( df: pd.DataFrame, *quantile_pairs: list[str | float], prefix: str = "width_", inplace: bool = False, ) -> pd.DataFrame: if not quantile_pairs: raise ValueError( "At least one pair of quantile columns must be provided." ) output_df = df if inplace else df.copy() for pair in quantile_pairs: if len(pair) != 2: raise ValueError( "Each quantile pair must contain exactly" f" two columns, but got {pair}." ) lower_col, upper_col = pair exist_features(df, features=[lower_col, upper_col]) width = output_df[upper_col] - output_df[lower_col] new_col_name = f"{prefix}{upper_col}" output_df[new_col_name] = width return output_df
compute_interval_width.__doc__ = r""" Computes the width of one or more prediction intervals. This is a fundamental data preparation utility that calculates the difference between upper and lower quantile columns for one or more forecast intervals. The resulting interval width is a key measure of a forecast's **sharpness**. Parameters ---------- df : pd.DataFrame The input DataFrame containing the quantile forecast columns. quantile_pairs : list of (str or float) One or more lists or tuples, each containing two elements in the order: ``[lower_quantile_col, upper_quantile_col]``. prefix : str, default='width\_' The prefix for the new interval width column names. The new name will be f"{prefix}{upper_col_name}". inplace : bool, default=False If ``True``, modifies the original DataFrame by adding the new columns. If ``False`` (default), returns a new DataFrame. Returns ------- pd.DataFrame The DataFrame with the new interval width column(s) added. Raises ------ ValueError If a provided pair does not contain exactly two column names. See Also -------- plot_polar_sharpness : A plot that directly uses this metric. compute_winkler_score : A score that uses interval width as a component. Notes ----- The width of a prediction interval is the most direct measure of a forecast's **sharpness**, a key property of probabilistic forecasts :footcite:p:`Gneiting2007b`. A smaller width indicates a more precise, or sharper, forecast. For a given observation :math:`i`, the interval width :math:`w_i` is the simple difference between the upper and lower quantile forecasts: .. math:: w_i = q_{upper, i} - q_{lower, i} Examples -------- >>> import pandas as pd >>> from kdiagram.utils.forecast_utils import compute_interval_width >>> >>> df = pd.DataFrame({ ... 'q10_model_A': [1, 2], 'q90_model_A': [10, 12], ... 'q05_model_A': [0, 1], 'q95_model_A': [11, 13] ... }) >>> >>> # Calculate the 80% and 90% interval widths >>> widths_df = compute_interval_width( ... df, ['q10_model_A', 'q90_model_A'], ['q05_model_A', 'q95_model_A'] ... ) >>> print(widths_df) q10_model_A q90_model_A q05_model_A q95_model_A width_q90_model_A width_q95_model_A 0 1 10 0 11 9 11 1 2 12 1 13 10 12 References ---------- .. footbibliography:: """
[docs] @isdf def bin_by_feature( df: pd.DataFrame, bin_on_col: str, target_cols: str | list[str], n_bins: int = 10, agg_funcs: str | list[str] | dict = "mean", ) -> pd.DataFrame: target_cols = columns_manager(target_cols) required_cols = [bin_on_col] + target_cols exist_features(df, features=required_cols) # Create bins using pandas.cut bin_labels = f"{bin_on_col}_bin" df_binned = df.copy() df_binned[bin_labels] = pd.cut(df_binned[bin_on_col], bins=n_bins) # Group by the new bins and aggregate stats = df_binned.groupby(bin_labels, observed=False)[target_cols].agg( agg_funcs ) return stats.reset_index()
bin_by_feature.__doc__ = r""" Bins data by a feature and computes aggregate statistics. This is a powerful data wrangling utility that groups a DataFrame into bins based on the values in a specified column (``bin_on_col``). It then calculates aggregate statistics (like mean, std, etc.) for one or more target columns within each bin. This is the core logic behind plots like ``plot_error_bands``. Parameters ---------- df : pd.DataFrame The input DataFrame. bin_on_col : str The name of the column whose values will be used for binning. This column must contain numeric data. target_cols : str or list of str The name(s) of the column(s) for which to compute statistics. n_bins : int, default=10 The number of equal-width bins to create. agg_funcs : str, list of str, or dict, default='mean' The aggregation function(s) to apply. Can be any function accepted by pandas' ``.agg()`` method (e.g., 'mean', 'std', ['mean', 'std'], or {'col_A': 'sum'}). Returns ------- pd.DataFrame A DataFrame containing the aggregate statistics for each bin. See Also -------- pandas.cut : The underlying pandas function used for binning. pandas.DataFrame.groupby : The underlying pandas function for aggregation. plot_error_bands : A plot that uses this binning logic. Notes ----- This function first uses ``pandas.cut`` to partition the values in ``bin_on_col`` into ``n_bins`` discrete, equal-width intervals. It then uses ``pandas.DataFrame.groupby`` to group the DataFrame by these new bins and applies the specified aggregation function(s) to the ``target_cols`` for each group. Examples -------- >>> import pandas as pd >>> from kdiagram.utils.forecast_utils import bin_by_feature >>> >>> df = pd.DataFrame({ ... 'forecast_value': [10, 12, 20, 22, 30, 32], ... 'error': [-1, 1.5, -2, 2.5, -3, 3.5] ... }) >>> >>> # Calculate the mean and standard deviation of the error, >>> # binned by the forecast value. >>> binned_stats = bin_by_feature( ... df, ... bin_on_col='forecast_value', ... target_cols='error', ... n_bins=3, ... agg_funcs=['mean', 'std'] ... ) >>> print(binned_stats) forecast_value_bin mean std 0 (9.978, 17.333] 0.25 1.767767 1 (17.333, 24.667] 0.25 3.181981 2 (24.667, 32.0] 0.25 4.596194 """
[docs] @isdf def compute_forecast_errors( df: pd.DataFrame, actual_col: str, *pred_cols: str, error_type: Literal["raw", "absolute", "squared", "percentage"] = "raw", prefix: str = "error_", inplace: bool = False, ) -> pd.DataFrame: if not pred_cols: raise ValueError("At least one prediction column must be provided.") required_cols = [actual_col] + list(pred_cols) exist_features(df, features=required_cols) output_df = df if inplace else df.copy() actual_vals = output_df[actual_col] for pred_col in pred_cols: pred_vals = output_df[pred_col] new_col_name = f"{prefix}{pred_col}" if error_type == "raw": errors = actual_vals - pred_vals elif error_type == "absolute": errors = (actual_vals - pred_vals).abs() elif error_type == "squared": errors = (actual_vals - pred_vals) ** 2 elif error_type == "percentage": # Avoid division by zero errors = ( 100 * (actual_vals - pred_vals) / actual_vals.replace(0, np.nan) ) else: raise ValueError(f"Unknown error_type: '{error_type}'") output_df[new_col_name] = errors return output_df
compute_forecast_errors.__doc__ = r""" Computes forecast errors for one or more models. This is a core data preparation utility that calculates the difference between true and predicted values. It supports several common error types and can operate on multiple prediction columns at once, making it easy to prepare data for the diagnostic plots in the :mod:`kdiagram.plot.errors` module. Parameters ---------- df : pd.DataFrame The input DataFrame containing the actual and predicted values. actual_col : str The name of the column containing the true observed values. *pred_cols : str One or more column names containing the predicted values from different models. error_type : {'raw', 'absolute', 'squared', 'percentage'}, default='raw' The type of error to calculate: - 'raw': :math:`y_{true} - y_{pred}` - 'absolute': :math:`|y_{true} - y_{pred}|` - 'squared': :math:`(y_{true} - y_{pred})^2` - 'percentage': :math:`100 \cdot (y_{true} - y_{pred}) / y_{true}` prefix : str, default='error\_' The prefix to add to the new error column names. For example, a prediction column 'Model_A' will become 'error_Model_A'. inplace : bool, default=False If ``True``, modifies the original DataFrame by adding the new columns. If ``False`` (default), returns a new DataFrame. Returns ------- pd.DataFrame The DataFrame with the new error column(s) added. Raises ------ ValueError If no prediction columns are provided or if the specified ``error_type`` is invalid. See Also -------- plot_error_violins : A plot that directly uses these error columns. plot_error_bands : A plot that uses these errors for aggregation. Notes ----- The forecast error (or residual), :math:`e_i`, for an observation :math:`i` is the fundamental quantity for diagnosing model performance. This function calculates it in several forms: 1. **Raw Error**: The simple difference, which preserves the direction of the error (positive for under-prediction, negative for over-prediction). .. math:: e_i = y_{true,i} - y_{pred,i} 2. **Absolute Error**: The magnitude of the error, which is always non-negative. .. math:: e_{abs,i} = |y_{true,i} - y_{pred,i}| 3. **Squared Error**: Penalizes larger errors more heavily. .. math:: e_{sq,i} = (y_{true,i} - y_{pred,i})^2 4. **Percentage Error**: Expresses the error as a percentage of the true value. Note that this can be unstable if :math:`y_{true,i}` is close to zero. .. math:: e_{\%,i} = 100 \cdot \frac{y_{true,i} - y_{pred,i}}{y_{true,i}} Examples -------- >>> import pandas as pd >>> from kdiagram.utils.forecast_utils import compute_forecast_errors >>> >>> df = pd.DataFrame({ ... 'actual': [10, 20, 30], ... 'model_A_preds': [12, 18, 33], ... 'model_B_preds': [10, 25, 28], ... }) >>> >>> # Calculate raw and absolute errors for both models >>> df_errors_raw = compute_forecast_errors( ... df, 'actual', 'model_A_preds', 'model_B_preds' ... ) >>> df_errors_abs = compute_forecast_errors( ... df, 'actual', 'model_A_preds', 'model_B_preds', ... error_type='absolute', prefix='abs_error_' ... ) >>> print(df_errors_raw) actual model_A_preds model_B_preds error_model_A_preds error_model_B_preds 0 10 12 10 -2 0 1 20 18 25 2 -5 2 30 33 28 -3 2 """